|
|
import pymysql
|
|
|
import re
|
|
|
from config.db_config import *
|
|
|
from connectors.clickhouse_connector import ClickHouseConnector
|
|
|
|
|
|
|
|
|
def get_mysql_table_structure(config, table):
|
|
|
"""获取MySQL表结构信息"""
|
|
|
conn = pymysql.connect(host=config["host"],
|
|
|
port=config["port"],
|
|
|
user=config["user"],
|
|
|
password=config["password"],
|
|
|
database=config["database"])
|
|
|
|
|
|
# 获取字段信息
|
|
|
with conn.cursor() as cursor:
|
|
|
cursor.execute(f"""
|
|
|
SELECT column_name, data_type, column_type, is_nullable, column_default
|
|
|
FROM information_schema.columns
|
|
|
WHERE table_schema = '{config["database"]}' AND table_name = '{table}'
|
|
|
""")
|
|
|
columns = cursor.fetchall()
|
|
|
|
|
|
# 获取索引信息
|
|
|
with conn.cursor() as cursor:
|
|
|
cursor.execute(f"""
|
|
|
SELECT index_name, column_name, non_unique
|
|
|
FROM information_schema.statistics
|
|
|
WHERE table_schema = '{config["database"]}' AND table_name = '{table}'
|
|
|
""")
|
|
|
indexes = cursor.fetchall()
|
|
|
|
|
|
# 获取主键信息
|
|
|
with conn.cursor() as cursor:
|
|
|
cursor.execute(f"""
|
|
|
SELECT column_name
|
|
|
FROM information_schema.key_column_usage
|
|
|
WHERE table_schema = '{config["database"]}'
|
|
|
AND table_name = '{table}'
|
|
|
AND constraint_name = 'PRIMARY'
|
|
|
""")
|
|
|
primary_keys = [row[0] for row in cursor.fetchall()]
|
|
|
|
|
|
conn.close()
|
|
|
return columns, indexes, primary_keys
|
|
|
|
|
|
|
|
|
def convert_to_clickhouse_type(mysql_type):
|
|
|
"""将MySQL类型转换为ClickHouse类型,并处理无符号类型"""
|
|
|
# 去除unsigned标记并转换为小写
|
|
|
type_str = re.sub(r'\sunsigned', '', mysql_type, flags=re.IGNORECASE).lower()
|
|
|
|
|
|
# 类型映射
|
|
|
type_map = {
|
|
|
'tinyint': 'Int8',
|
|
|
'smallint': 'Int16',
|
|
|
'mediumint': 'Int32',
|
|
|
'int': 'Int32',
|
|
|
'bigint': 'Int64',
|
|
|
'float': 'Float32',
|
|
|
'double': 'Float64',
|
|
|
'decimal': 'Decimal',
|
|
|
'char': 'String',
|
|
|
'varchar': 'String',
|
|
|
'text': 'String',
|
|
|
'longtext': 'String',
|
|
|
'date': 'Date',
|
|
|
'datetime': 'DateTime',
|
|
|
'timestamp': 'DateTime',
|
|
|
'enum': 'String'
|
|
|
}
|
|
|
|
|
|
# 处理特殊类型
|
|
|
if 'tinyint(1)' in type_str:
|
|
|
return 'Int8' # 处理MySQL的布尔类型
|
|
|
|
|
|
# 匹配数字精度
|
|
|
match = re.match(r'(\w+)(\(\d+,\d+\))?', type_str)
|
|
|
if match:
|
|
|
base_type = match.group(1)
|
|
|
precision = match.group(2) or ''
|
|
|
return type_map.get(base_type, 'String') + precision
|
|
|
|
|
|
return type_map.get(type_str, 'String')
|
|
|
|
|
|
|
|
|
def generate_clickhouse_ddl(table_name, columns, indexes, primary_keys):
|
|
|
"""生成ClickHouse建表语句"""
|
|
|
# 处理字段定义
|
|
|
column_defs = []
|
|
|
for col in columns:
|
|
|
name, data_type, column_type, is_nullable, default = col
|
|
|
ch_type = convert_to_clickhouse_type(column_type)
|
|
|
# 修正浮点型定义
|
|
|
if 'Float64' in ch_type and '(' in ch_type:
|
|
|
ch_type = 'Float64'
|
|
|
nullable = 'Nullable(' + ch_type + ')' if is_nullable == 'YES' else ch_type
|
|
|
column_defs.append(f' {name} {nullable}')
|
|
|
|
|
|
# 处理主键
|
|
|
order_by = 'ORDER BY (' + ', '.join(primary_keys) + ')' if primary_keys else ''
|
|
|
|
|
|
# 处理索引(新增分组逻辑)
|
|
|
index_groups = {}
|
|
|
for idx in indexes:
|
|
|
index_name, column_name, non_unique = idx
|
|
|
if index_name == 'PRIMARY':
|
|
|
continue
|
|
|
if index_name not in index_groups:
|
|
|
index_groups[index_name] = {
|
|
|
'non_unique': non_unique,
|
|
|
'columns': []
|
|
|
}
|
|
|
index_groups[index_name]['columns'].append(column_name)
|
|
|
|
|
|
index_defs = []
|
|
|
for index_name, info in index_groups.items():
|
|
|
columns = info['columns']
|
|
|
# 修正索引语法(ClickHouse不支持UNIQUE KEY)
|
|
|
index_type = 'minmax' # 默认索引类型
|
|
|
# 统一使用INDEX语法,ClickHouse 24.8+ 的MergeTree引擎不支持UNIQUE约束
|
|
|
index_defs.append(
|
|
|
f' INDEX {index_name} ({", ".join(columns)}) TYPE {index_type} GRANULARITY 3'
|
|
|
)
|
|
|
|
|
|
# 组合DDL语句
|
|
|
ddl = f'CREATE TABLE {table_name} (\n'
|
|
|
ddl += ',\n'.join(column_defs)
|
|
|
if index_defs:
|
|
|
ddl += ',\n' + ',\n'.join(index_defs)
|
|
|
ddl += '\n) ENGINE = ReplacingMergeTree()\n'
|
|
|
ddl += order_by
|
|
|
ddl += '\nSETTINGS index_granularity = 8192;'
|
|
|
|
|
|
return ddl
|
|
|
|
|
|
|
|
|
# 创建ClickHouse表
|
|
|
def create_ch_table(table):
|
|
|
ch_conn = ClickHouseConnector(CH_CONFIG)
|
|
|
"""创建ClickHouse表"""
|
|
|
with open("schemas/" + table + ".sql", 'r', encoding='utf-8') as f:
|
|
|
create_sql = f.read()
|
|
|
|
|
|
client = ch_conn.connect()
|
|
|
client.execute("DROP TABLE IF EXISTS " + table) # 删除表(如果存在)
|
|
|
client.execute(create_sql) # 创建新表
|
|
|
ch_conn.disconnect()
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
# 输出配置信息
|
|
|
print_config()
|
|
|
|
|
|
# tables=['t_station','t_equipment','t_equipment_charge_order']
|
|
|
tables = ['t_station', 't_equipment', 't_equipment_charge_order']
|
|
|
for table in tables:
|
|
|
# 获取表结构
|
|
|
columns, indexes, primary_keys = get_mysql_table_structure(MYSQL_CONFIG, table)
|
|
|
|
|
|
# 生成ClickHouse DDL
|
|
|
ddl = generate_clickhouse_ddl(table, columns, indexes, primary_keys)
|
|
|
# 保存到文件中 schemas/t_station.sql
|
|
|
with open(f'schemas/{table}.sql', 'w', encoding='utf-8') as f:
|
|
|
f.write(ddl)
|
|
|
# print(f'✅ DDL语句已保存至:schemas/{table}.sql')
|
|
|
|
|
|
# 创建ClickHouse表
|
|
|
create_ch_table(table)
|
|
|
print(f'✅ ClickHouse表{table}已创建')
|