You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.
import sys
from kafka import KafkaProducer
from kafka . admin import KafkaAdminClient , NewTopic
import json
# 配置信息
from Util import GlobalManager as gm , CommonUtil
if __name__ == ' __main__ ' :
# 检查参数输入
if len ( sys . argv ) == 1 :
# print("请输入需要处理的配置json文件名称, 注意不用带目录, 只有文件名称即可! ")
# sys.exit()
# 目前测试阶段, 这里先这么写, 以后就要求必须输入指定的JSON文件, 不同的厂商不同的文件。不是按表来的, 是按业务厂商来的配置文件。
sys . argv . append ( " dsideal_ypt.json " )
# 初始化Kafka全局配置
gm . _init_ ( )
bootstrap_servers = gm . get ( " bootstrap_servers " )
c = KafkaAdminClient ( bootstrap_servers = bootstrap_servers )
# 扫描所有表, 创建Topic
# 读取Json文件配置, 获取到Mysql的连接办法
jo = [ ]
with open ( " ./json/ " + sys . argv [ 1 ] , ' r ' , encoding = ' utf-8 ' ) as load_f :
jo = json . load ( load_f )
# Kafka的Topic
topic_prefix = jo [ " topic_prefix " ]
# 要处理哪些表
tables = jo [ " tables " ]
topic_list = [ ]
# 遍历这些表
for table in tables :
# topic的全名
topic = topic_prefix + table [ " name " ]
topic = CommonUtil . convert ( topic , ' _ ' )
if not topic in c . list_topics ( ) :
topic_list . append ( NewTopic ( name = topic , num_partitions = 1 , replication_factor = 1 ) )
else :
print ( " 发现Topic名称: %s 已存在,无需创建! " % topic )
if len ( topic_list ) > 0 :
c . create_topics ( new_topics = topic_list , validate_only = False )
# 提示
print ( " 恭喜, 所有Topic创建成功, 个数: %s " % len ( topic_list ) )