You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

48 lines
1.7 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import sys
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
import json
# 配置信息
from Util import GlobalManager as gm, CommonUtil
if __name__ == '__main__':
# 检查参数输入
if len(sys.argv) == 1:
# print("请输入需要处理的配置json文件名称注意不用带目录只有文件名称即可")
# sys.exit()
# 目前测试阶段这里先这么写以后就要求必须输入指定的JSON文件不同的厂商不同的文件。不是按表来的是按业务厂商来的配置文件。
sys.argv.append("dsideal_ypt.json")
# 初始化Kafka全局配置
gm._init_()
bootstrap_servers = gm.get("bootstrap_servers")
c = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
# 扫描所有表创建Topic
# 读取Json文件配置获取到Mysql的连接办法
jo = []
with open("./json/" + sys.argv[1], 'r', encoding='utf-8') as load_f:
jo = json.load(load_f)
# Kafka的Topic
topic_prefix = jo["topic_prefix"]
# 要处理哪些表
tables = jo["tables"]
topic_list = []
# 遍历这些表
for table in tables:
# topic的全名
topic = topic_prefix + table["name"]
topic = CommonUtil.convert(topic, '_')
if not topic in c.list_topics():
topic_list.append(NewTopic(name=topic, num_partitions=1, replication_factor=1))
else:
print("发现Topic名称%s已存在,无需创建!" % topic)
if len(topic_list) > 0:
c.create_topics(new_topics=topic_list, validate_only=False)
# 提示
print("恭喜所有Topic创建成功个数%s" % len(topic_list))