import pymysql import re from config.db_config import * from connectors.clickhouse_connector import ClickHouseConnector def get_mysql_table_structure(config, table): """获取MySQL表结构信息""" conn = pymysql.connect(host=config["host"], port=config["port"], user=config["user"], password=config["password"], database=config["database"]) # 获取字段信息 with conn.cursor() as cursor: cursor.execute(f""" SELECT column_name, data_type, column_type, is_nullable, column_default FROM information_schema.columns WHERE table_schema = '{config["database"]}' AND table_name = '{table}' """) columns = cursor.fetchall() # 获取索引信息 with conn.cursor() as cursor: cursor.execute(f""" SELECT index_name, column_name, non_unique FROM information_schema.statistics WHERE table_schema = '{config["database"]}' AND table_name = '{table}' """) indexes = cursor.fetchall() # 获取主键信息 with conn.cursor() as cursor: cursor.execute(f""" SELECT column_name FROM information_schema.key_column_usage WHERE table_schema = '{config["database"]}' AND table_name = '{table}' AND constraint_name = 'PRIMARY' """) primary_keys = [row[0] for row in cursor.fetchall()] conn.close() return columns, indexes, primary_keys def convert_to_clickhouse_type(mysql_type): """将MySQL类型转换为ClickHouse类型,并处理无符号类型""" # 去除unsigned标记并转换为小写 type_str = re.sub(r'\sunsigned', '', mysql_type, flags=re.IGNORECASE).lower() # 类型映射 type_map = { 'tinyint': 'Int8', 'smallint': 'Int16', 'mediumint': 'Int32', 'int': 'Int32', 'bigint': 'Int64', 'float': 'Float32', 'double': 'Float64', 'decimal': 'Decimal', 'char': 'String', 'varchar': 'String', 'text': 'String', 'longtext': 'String', 'date': 'Date', 'datetime': 'DateTime', 'timestamp': 'DateTime', 'enum': 'String' } # 处理特殊类型 if 'tinyint(1)' in type_str: return 'Int8' # 处理MySQL的布尔类型 # 匹配数字精度 match = re.match(r'(\w+)(\(\d+,\d+\))?', type_str) if match: base_type = match.group(1) precision = match.group(2) or '' return type_map.get(base_type, 'String') + precision return type_map.get(type_str, 'String') def generate_clickhouse_ddl(table_name, columns, indexes, primary_keys): """生成ClickHouse建表语句""" # 处理字段定义 column_defs = [] for col in columns: name, data_type, column_type, is_nullable, default = col ch_type = convert_to_clickhouse_type(column_type) # 修正浮点型定义 if 'Float64' in ch_type and '(' in ch_type: ch_type = 'Float64' nullable = 'Nullable(' + ch_type + ')' if is_nullable == 'YES' else ch_type column_defs.append(f' {name} {nullable}') # 处理主键 order_by = 'ORDER BY (' + ', '.join(primary_keys) + ')' if primary_keys else '' # 处理索引(新增分组逻辑) index_groups = {} for idx in indexes: index_name, column_name, non_unique = idx if index_name == 'PRIMARY': continue if index_name not in index_groups: index_groups[index_name] = { 'non_unique': non_unique, 'columns': [] } index_groups[index_name]['columns'].append(column_name) index_defs = [] for index_name, info in index_groups.items(): columns = info['columns'] # 修正索引语法(ClickHouse不支持UNIQUE KEY) index_type = 'minmax' # 默认索引类型 # 统一使用INDEX语法,ClickHouse 24.8+ 的MergeTree引擎不支持UNIQUE约束 index_defs.append( f' INDEX {index_name} ({", ".join(columns)}) TYPE {index_type} GRANULARITY 3' ) # 组合DDL语句 ddl = f'CREATE TABLE {table_name} (\n' ddl += ',\n'.join(column_defs) if index_defs: ddl += ',\n' + ',\n'.join(index_defs) ddl += '\n) ENGINE = ReplacingMergeTree()\n' ddl += order_by ddl += '\nSETTINGS index_granularity = 8192;' return ddl # 创建ClickHouse表 def create_ch_table(table): ch_conn = ClickHouseConnector(CH_CONFIG) """创建ClickHouse表""" with open("schemas/" + table + ".sql", 'r', encoding='utf-8') as f: create_sql = f.read() client = ch_conn.connect() client.execute("DROP TABLE IF EXISTS " + table) # 删除表(如果存在) client.execute(create_sql) # 创建新表 ch_conn.disconnect() if __name__ == '__main__': # 输出配置信息 print_config() # tables=['t_station','t_equipment','t_equipment_charge_order'] tables = ['t_station', 't_equipment', 't_equipment_charge_order'] for table in tables: # 获取表结构 columns, indexes, primary_keys = get_mysql_table_structure(MYSQL_CONFIG, table) # 生成ClickHouse DDL ddl = generate_clickhouse_ddl(table, columns, indexes, primary_keys) # 保存到文件中 schemas/t_station.sql with open(f'schemas/{table}.sql', 'w', encoding='utf-8') as f: f.write(ddl) # print(f'✅ DDL语句已保存至:schemas/{table}.sql') # 创建ClickHouse表 create_ch_table(table) print(f'✅ ClickHouse表{table}已创建')