You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
5.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import pymysql
import re
from config.db_config import *
from connectors.clickhouse_connector import ClickHouseConnector
def get_mysql_table_structure(config, table):
"""获取MySQL表结构信息"""
conn = pymysql.connect(host=config["host"],
port=config["port"],
user=config["user"],
password=config["password"],
database=config["database"])
# 获取字段信息
with conn.cursor() as cursor:
cursor.execute(f"""
SELECT column_name, data_type, column_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = '{config["database"]}' AND table_name = '{table}'
""")
columns = cursor.fetchall()
# 获取索引信息
with conn.cursor() as cursor:
cursor.execute(f"""
SELECT index_name, column_name, non_unique
FROM information_schema.statistics
WHERE table_schema = '{config["database"]}' AND table_name = '{table}'
""")
indexes = cursor.fetchall()
# 获取主键信息
with conn.cursor() as cursor:
cursor.execute(f"""
SELECT column_name
FROM information_schema.key_column_usage
WHERE table_schema = '{config["database"]}'
AND table_name = '{table}'
AND constraint_name = 'PRIMARY'
""")
primary_keys = [row[0] for row in cursor.fetchall()]
conn.close()
return columns, indexes, primary_keys
def convert_to_clickhouse_type(mysql_type):
"""将MySQL类型转换为ClickHouse类型并处理无符号类型"""
# 去除unsigned标记并转换为小写
type_str = re.sub(r'\sunsigned', '', mysql_type, flags=re.IGNORECASE).lower()
# 类型映射
type_map = {
'tinyint': 'Int8',
'smallint': 'Int16',
'mediumint': 'Int32',
'int': 'Int32',
'bigint': 'Int64',
'float': 'Float32',
'double': 'Float64',
'decimal': 'Decimal',
'char': 'String',
'varchar': 'String',
'text': 'String',
'longtext': 'String',
'date': 'Date',
'datetime': 'DateTime',
'timestamp': 'DateTime',
'enum': 'String'
}
# 处理特殊类型
if 'tinyint(1)' in type_str:
return 'Int8' # 处理MySQL的布尔类型
# 匹配数字精度
match = re.match(r'(\w+)(\(\d+,\d+\))?', type_str)
if match:
base_type = match.group(1)
precision = match.group(2) or ''
return type_map.get(base_type, 'String') + precision
return type_map.get(type_str, 'String')
def generate_clickhouse_ddl(table_name, columns, indexes, primary_keys):
"""生成ClickHouse建表语句"""
# 处理字段定义
column_defs = []
for col in columns:
name, data_type, column_type, is_nullable, default = col
ch_type = convert_to_clickhouse_type(column_type)
# 修正浮点型定义
if 'Float64' in ch_type and '(' in ch_type:
ch_type = 'Float64'
nullable = 'Nullable(' + ch_type + ')' if is_nullable == 'YES' else ch_type
column_defs.append(f' {name} {nullable}')
# 处理主键
order_by = 'ORDER BY (' + ', '.join(primary_keys) + ')' if primary_keys else ''
# 处理索引(新增分组逻辑)
index_groups = {}
for idx in indexes:
index_name, column_name, non_unique = idx
if index_name == 'PRIMARY':
continue
if index_name not in index_groups:
index_groups[index_name] = {
'non_unique': non_unique,
'columns': []
}
index_groups[index_name]['columns'].append(column_name)
index_defs = []
for index_name, info in index_groups.items():
columns = info['columns']
# 修正索引语法ClickHouse不支持UNIQUE KEY
index_type = 'minmax' # 默认索引类型
# 统一使用INDEX语法ClickHouse 24.8+ 的MergeTree引擎不支持UNIQUE约束
index_defs.append(
f' INDEX {index_name} ({", ".join(columns)}) TYPE {index_type} GRANULARITY 3'
)
# 组合DDL语句
ddl = f'CREATE TABLE {table_name} (\n'
ddl += ',\n'.join(column_defs)
if index_defs:
ddl += ',\n' + ',\n'.join(index_defs)
ddl += '\n) ENGINE = ReplacingMergeTree()\n'
ddl += order_by
ddl += '\nSETTINGS index_granularity = 8192;'
return ddl
# 创建ClickHouse表
def create_ch_table(table):
ch_conn = ClickHouseConnector(CH_CONFIG)
"""创建ClickHouse表"""
with open("schemas/" + table + ".sql", 'r', encoding='utf-8') as f:
create_sql = f.read()
client = ch_conn.connect()
client.execute("DROP TABLE IF EXISTS " + table) # 删除表(如果存在)
client.execute(create_sql) # 创建新表
ch_conn.disconnect()
if __name__ == '__main__':
# 输出配置信息
print_config()
# tables=['t_station','t_equipment','t_equipment_charge_order']
tables = ['t_station', 't_equipment', 't_equipment_charge_order']
for table in tables:
# 获取表结构
columns, indexes, primary_keys = get_mysql_table_structure(MYSQL_CONFIG, table)
# 生成ClickHouse DDL
ddl = generate_clickhouse_ddl(table, columns, indexes, primary_keys)
# 保存到文件中 schemas/t_station.sql
with open(f'schemas/{table}.sql', 'w', encoding='utf-8') as f:
f.write(ddl)
# print(f'✅ DDL语句已保存至schemas/{table}.sql')
# 创建ClickHouse表
create_ch_table(table)
print(f'✅ ClickHouse表{table}已创建')