main
黄海 5 months ago
parent a43ae99313
commit 3dd8770e0b

@ -0,0 +1,19 @@
# 数据库配置
mysql_config = {
'host': '10.10.14.101',
'port': 3306,
'user': 'ylt',
'password': 'Ycharge666',
'database': 'yltcharge',
'charset': 'utf8mb4'
}
ch_config = {
'host': '10.10.14.250',
'port': 9000,
'settings': {
'max_insert_block_size': 100000,
'async_insert': 1,
'wait_for_async_insert': 0
}
}

@ -0,0 +1,130 @@
import pymysql
import re
from config.db_config import *
def get_mysql_table_structure(host, port, user, password, database, table):
"""获取MySQL表结构信息"""
conn = pymysql.connect(host=host, port=port, user=user, password=password, database=database)
# 获取字段信息
with conn.cursor() as cursor:
cursor.execute(f"""
SELECT column_name, data_type, column_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = '{database}' AND table_name = '{table}'
""")
columns = cursor.fetchall()
# 获取索引信息
with conn.cursor() as cursor:
cursor.execute(f"""
SELECT index_name, column_name, non_unique
FROM information_schema.statistics
WHERE table_schema = '{database}' AND table_name = '{table}'
""")
indexes = cursor.fetchall()
# 获取主键信息
with conn.cursor() as cursor:
cursor.execute(f"""
SELECT column_name
FROM information_schema.key_column_usage
WHERE table_schema = '{database}'
AND table_name = '{table}'
AND constraint_name = 'PRIMARY'
""")
primary_keys = [row[0] for row in cursor.fetchall()]
conn.close()
return columns, indexes, primary_keys
def convert_to_clickhouse_type(mysql_type):
"""将MySQL类型转换为ClickHouse类型并处理无符号类型"""
# 去除unsigned标记并转换为小写
type_str = re.sub(r'\sunsigned', '', mysql_type, flags=re.IGNORECASE).lower()
# 类型映射
type_map = {
'tinyint': 'Int8',
'smallint': 'Int16',
'mediumint': 'Int32',
'int': 'Int32',
'bigint': 'Int64',
'float': 'Float32',
'double': 'Float64',
'decimal': 'Decimal',
'char': 'String',
'varchar': 'String',
'text': 'String',
'longtext': 'String',
'date': 'Date',
'datetime': 'DateTime',
'timestamp': 'DateTime',
'enum': 'String'
}
# 处理特殊类型
if 'tinyint(1)' in type_str:
return 'UInt8' # 处理MySQL的布尔类型
# 匹配数字精度
match = re.match(r'(\w+)(\(\d+,\d+\))?', type_str)
if match:
base_type = match.group(1)
precision = match.group(2) or ''
return type_map.get(base_type, 'String') + precision
return type_map.get(type_str, 'String')
def generate_clickhouse_ddl(table_name, columns, indexes, primary_keys):
"""生成ClickHouse建表语句"""
# 处理字段定义
column_defs = []
for col in columns:
name, data_type, column_type, is_nullable, default = col
ch_type = convert_to_clickhouse_type(column_type)
nullable = 'Nullable(' + ch_type + ')' if is_nullable == 'YES' else ch_type
column_defs.append(f' {name} {nullable}')
# 处理主键
order_by = 'ORDER BY (' + ', '.join(primary_keys) + ')' if primary_keys else ''
# 处理索引ClickHouse的索引语法不同
index_defs = []
for idx in indexes:
index_name, column_name, non_unique = idx
if index_name == 'PRIMARY':
continue
index_type = 'INDEX' if non_unique else 'UNIQUE'
index_defs.append(f' {index_type} {index_name} {column_name} TYPE minmax GRANULARITY 3')
# 组合DDL语句
ddl = f'CREATE TABLE {table_name} (\n'
ddl += ',\n'.join(column_defs)
if index_defs:
ddl += ',\n' + ',\n'.join(index_defs)
ddl += '\n) ENGINE = MergeTree()\n'
ddl += order_by
ddl += '\nSETTINGS index_granularity = 8192;'
return ddl
# 使用示例
if __name__ == '__main__':
table = 't_station'
host = mysql_config['host']
port = mysql_config['port']
user = mysql_config['user']
password = mysql_config['password']
database = mysql_config['database']
# 获取表结构
columns, indexes, primary_keys = get_mysql_table_structure(host, port, user, password, database, table)
# 生成ClickHouse DDL
ddl = generate_clickhouse_ddl(table, columns, indexes, primary_keys)
print(ddl)

@ -3,31 +3,11 @@ from connectors.clickhouse_connector import ClickHouseConnector
from mappers.data_mapper import DataMapper
from services.sync_service import SyncService
from utils.logger import configure_logger
from config.db_config import *
logger = configure_logger()
def main():
# 数据库配置
mysql_config = {
'host': '10.10.14.101',
'port': 3306,
'user': 'ylt',
'password': 'Ycharge666',
'db': 'yltcharge',
'charset': 'utf8mb4'
}
ch_config = {
'host': '10.10.14.250',
'port': 9000,
'settings': {
'max_insert_block_size': 100000,
'async_insert': 1,
'wait_for_async_insert': 0
}
}
# 初始化组件
mysql_conn = MySQLConnector(mysql_config)
ch_conn = ClickHouseConnector(ch_config)

Loading…
Cancel
Save