You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
5.6 KiB

5 months ago
import pymysql
import re
from config.db_config import *
5 months ago
from connectors.clickhouse_connector import ClickHouseConnector
5 months ago
def get_mysql_table_structure(config, table):
"""获取MySQL表结构信息"""
conn = pymysql.connect(host=config["host"],
port=config["port"],
user=config["user"],
password=config["password"],
database=config["database"])
# 获取字段信息
with conn.cursor() as cursor:
cursor.execute(f"""
SELECT column_name, data_type, column_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = '{config["database"]}' AND table_name = '{table}'
""")
columns = cursor.fetchall()
# 获取索引信息
with conn.cursor() as cursor:
cursor.execute(f"""
SELECT index_name, column_name, non_unique
FROM information_schema.statistics
WHERE table_schema = '{config["database"]}' AND table_name = '{table}'
""")
indexes = cursor.fetchall()
# 获取主键信息
with conn.cursor() as cursor:
cursor.execute(f"""
SELECT column_name
FROM information_schema.key_column_usage
WHERE table_schema = '{config["database"]}'
AND table_name = '{table}'
AND constraint_name = 'PRIMARY'
""")
primary_keys = [row[0] for row in cursor.fetchall()]
conn.close()
return columns, indexes, primary_keys
def convert_to_clickhouse_type(mysql_type):
"""将MySQL类型转换为ClickHouse类型并处理无符号类型"""
# 去除unsigned标记并转换为小写
type_str = re.sub(r'\sunsigned', '', mysql_type, flags=re.IGNORECASE).lower()
# 类型映射
type_map = {
'tinyint': 'Int8',
'smallint': 'Int16',
'mediumint': 'Int32',
'int': 'Int32',
'bigint': 'Int64',
'float': 'Float32',
'double': 'Float64',
'decimal': 'Decimal',
'char': 'String',
'varchar': 'String',
'text': 'String',
'longtext': 'String',
'date': 'Date',
'datetime': 'DateTime',
'timestamp': 'DateTime',
'enum': 'String'
}
# 处理特殊类型
if 'tinyint(1)' in type_str:
return 'Int8' # 处理MySQL的布尔类型
# 匹配数字精度
match = re.match(r'(\w+)(\(\d+,\d+\))?', type_str)
if match:
base_type = match.group(1)
precision = match.group(2) or ''
return type_map.get(base_type, 'String') + precision
return type_map.get(type_str, 'String')
def generate_clickhouse_ddl(table_name, columns, indexes, primary_keys):
"""生成ClickHouse建表语句"""
# 处理字段定义
column_defs = []
for col in columns:
name, data_type, column_type, is_nullable, default = col
ch_type = convert_to_clickhouse_type(column_type)
# 修正浮点型定义
if 'Float64' in ch_type and '(' in ch_type:
ch_type = 'Float64'
nullable = 'Nullable(' + ch_type + ')' if is_nullable == 'YES' else ch_type
column_defs.append(f' {name} {nullable}')
# 处理主键
order_by = 'ORDER BY (' + ', '.join(primary_keys) + ')' if primary_keys else ''
# 处理索引(新增分组逻辑)
index_groups = {}
for idx in indexes:
index_name, column_name, non_unique = idx
if index_name == 'PRIMARY':
continue
if index_name not in index_groups:
index_groups[index_name] = {
'non_unique': non_unique,
'columns': []
}
index_groups[index_name]['columns'].append(column_name)
index_defs = []
for index_name, info in index_groups.items():
columns = info['columns']
# 修正索引语法ClickHouse不支持UNIQUE KEY
index_type = 'minmax' # 默认索引类型
# 统一使用INDEX语法ClickHouse 24.8+ 的MergeTree引擎不支持UNIQUE约束
index_defs.append(
f' INDEX {index_name} ({", ".join(columns)}) TYPE {index_type} GRANULARITY 3'
)
# 组合DDL语句
ddl = f'CREATE TABLE {table_name} (\n'
ddl += ',\n'.join(column_defs)
if index_defs:
ddl += ',\n' + ',\n'.join(index_defs)
ddl += '\n) ENGINE = ReplacingMergeTree()\n'
ddl += order_by
ddl += '\nSETTINGS index_granularity = 8192;'
return ddl
5 months ago
5 months ago
# 创建ClickHouse表
def create_ch_table(table):
ch_conn = ClickHouseConnector(CH_CONFIG)
"""创建ClickHouse表"""
with open("schemas/" + table + ".sql", 'r', encoding='utf-8') as f:
create_sql = f.read()
client = ch_conn.connect()
client.execute("DROP TABLE IF EXISTS " + table) # 删除表(如果存在)
client.execute(create_sql) # 创建新表
ch_conn.disconnect()
if __name__ == '__main__':
5 months ago
# 输出配置信息
print_config()
5 months ago
# tables=['t_station','t_equipment','t_equipment_charge_order']
tables = ['t_station', 't_equipment', 't_equipment_charge_order']
for table in tables:
# 获取表结构
columns, indexes, primary_keys = get_mysql_table_structure(MYSQL_CONFIG, table)
# 生成ClickHouse DDL
ddl = generate_clickhouse_ddl(table, columns, indexes, primary_keys)
# 保存到文件中 schemas/t_station.sql
with open(f'schemas/{table}.sql', 'w', encoding='utf-8') as f:
f.write(ddl)
# print(f'✅ DDL语句已保存至schemas/{table}.sql')
# 创建ClickHouse表
create_ch_table(table)
print(f'✅ ClickHouse表{table}已创建')