You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

77 lines
2.2 KiB

2 years ago
import json
import os
from urllib.parse import urlencode
from kafka import KafkaProducer
import requests
# 网站获取数据Api
from Util import CommonUtil
url = "http://10.10.14.199/dsideal_yy/admin/new_base/personInfo_GetPersonInfoList"
# POST构建请求
headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'} # 一种请求头,需要携带
# Kafka
bootstrap_servers = '10.10.14.237:9092'
# Topic
Topic = "SyncData"
# 记录的最后上报时间(文件名称不能重复噢~~)
logFile = "./Log/PJ.log"
# 获取指定页码的数据
def getData(pageNumber):
# 最后一次更新时间
update_ts = CommonUtil.read_txt(logFile)
# 构造数据
data = {
'bureau_id': 400195,
'pageNumber': pageNumber,
'pageSize': 20,
'org_id': 400195,
'person_name': '',
'query_child': True,
'query_bureau_child': False,
'b_use': 1,
'update_ts': update_ts
}
# 构建POST请求
response = requests.request("POST", url, headers=headers, data=urlencode(data))
decodejson = json.loads(response.text)
# 总页数
totalPage = decodejson['totalPage']
# 遍历每一条进行打入Kafka
for x in decodejson['list']:
# 向Kafka发送的内容
parmas_message = json.dumps(x, ensure_ascii=False)
producer.send(Topic, parmas_message.encode('utf-8'))
# 完成一个列表的数据上报
print("%s 成功获取%d" % (CommonUtil.getCurrentTime(), len(decodejson['list'])))
return totalPage
if __name__ == '__main__':
# 判断上报日志文件是否存在,不存在则创建
if not os.path.exists(logFile):
CommonUtil.write_txt(logFile, "2000-01-01 00:00:00")
# KafkaClient生产者
global producer
producer = KafkaProducer(bootstrap_servers=bootstrap_servers) # 连接kafka
# 调用一次
totalPage = getData(1)
for i in range(2, totalPage + 1):
getData(i)
# 关闭KafkaClient
producer.close()
# 写入成功上报的时间戳
CommonUtil.write_txt(logFile, CommonUtil.getCurrentTime())