|
|
import json
|
|
|
import os
|
|
|
from urllib.parse import urlencode
|
|
|
from kafka import KafkaProducer
|
|
|
import requests
|
|
|
|
|
|
|
|
|
# 网站获取数据Api
|
|
|
from Util import CommonUtil
|
|
|
|
|
|
url = "http://10.10.14.199/dsideal_yy/admin/new_base/personInfo_GetPersonInfoList"
|
|
|
|
|
|
# POST构建请求
|
|
|
headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'} # 一种请求头,需要携带
|
|
|
|
|
|
# Kafka
|
|
|
bootstrap_servers = '10.10.14.237:9092'
|
|
|
|
|
|
# Topic
|
|
|
Topic = "SyncData"
|
|
|
|
|
|
# 记录的最后上报时间(文件名称不能重复噢~~)
|
|
|
logFile = "./Log/PJ.log"
|
|
|
|
|
|
|
|
|
# 获取指定页码的数据
|
|
|
def getData(pageNumber):
|
|
|
# 最后一次更新时间
|
|
|
update_ts = CommonUtil.read_txt(logFile)
|
|
|
|
|
|
# 构造数据
|
|
|
data = {
|
|
|
'bureau_id': 400195,
|
|
|
'pageNumber': pageNumber,
|
|
|
'pageSize': 20,
|
|
|
'org_id': 400195,
|
|
|
'person_name': '',
|
|
|
'query_child': True,
|
|
|
'query_bureau_child': False,
|
|
|
'b_use': 1,
|
|
|
'update_ts': update_ts
|
|
|
}
|
|
|
# 构建POST请求
|
|
|
response = requests.request("POST", url, headers=headers, data=urlencode(data))
|
|
|
decodejson = json.loads(response.text)
|
|
|
|
|
|
# 总页数
|
|
|
totalPage = decodejson['totalPage']
|
|
|
# 遍历每一条,进行打入Kafka
|
|
|
for x in decodejson['list']:
|
|
|
# 向Kafka发送的内容
|
|
|
parmas_message = json.dumps(x, ensure_ascii=False)
|
|
|
producer.send(Topic, parmas_message.encode('utf-8'))
|
|
|
|
|
|
# 完成一个列表的数据上报
|
|
|
print("%s 成功获取%d条" % (CommonUtil.getCurrentTime(), len(decodejson['list'])))
|
|
|
return totalPage
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
# 判断上报日志文件是否存在,不存在则创建
|
|
|
if not os.path.exists(logFile):
|
|
|
CommonUtil.write_txt(logFile, "2000-01-01 00:00:00")
|
|
|
|
|
|
# KafkaClient生产者
|
|
|
global producer
|
|
|
producer = KafkaProducer(bootstrap_servers=bootstrap_servers) # 连接kafka
|
|
|
# 调用一次
|
|
|
totalPage = getData(1)
|
|
|
for i in range(2, totalPage + 1):
|
|
|
getData(i)
|
|
|
# 关闭KafkaClient
|
|
|
producer.close()
|
|
|
|
|
|
# 写入成功上报的时间戳
|
|
|
CommonUtil.write_txt(logFile, CommonUtil.getCurrentTime())
|