import sys from kafka import KafkaProducer from kafka.admin import KafkaAdminClient, NewTopic import json # 配置信息 from Util import GlobalManager as gm, CommonUtil if __name__ == '__main__': # 检查参数输入 if len(sys.argv) == 1: # print("请输入需要处理的配置json文件名称,注意不用带目录,只有文件名称即可!") # sys.exit() # 目前测试阶段,这里先这么写,以后就要求必须输入指定的JSON文件,不同的厂商不同的文件。不是按表来的,是按业务厂商来的配置文件。 sys.argv.append("dsideal_ypt.json") # 初始化Kafka全局配置 gm._init_() bootstrap_servers = gm.get("bootstrap_servers") c = KafkaAdminClient(bootstrap_servers=bootstrap_servers) # 扫描所有表,创建Topic # 读取Json文件配置,获取到Mysql的连接办法 jo = [] with open("./json/" + sys.argv[1], 'r', encoding='utf-8') as load_f: jo = json.load(load_f) # Kafka的Topic topic_prefix = jo["topic_prefix"] # 要处理哪些表 tables = jo["tables"] topic_list = [] # 遍历这些表 for table in tables: # topic的全名 topic = topic_prefix + table["name"] topic = CommonUtil.convert(topic, '_') if not topic in c.list_topics(): topic_list.append(NewTopic(name=topic, num_partitions=1, replication_factor=1)) else: print("发现Topic名称:%s已存在,无需创建!" % topic) if len(topic_list) > 0: c.create_topics(new_topics=topic_list, validate_only=False) # 提示 print("恭喜,所有Topic创建成功,个数:%s" % len(topic_list))