diff --git a/AI/ClickHouse/config/__pycache__/db_config.cpython-310.pyc b/AI/ClickHouse/config/__pycache__/db_config.cpython-310.pyc new file mode 100644 index 00000000..9769922c Binary files /dev/null and b/AI/ClickHouse/config/__pycache__/db_config.cpython-310.pyc differ diff --git a/AI/ClickHouse/config/db_config.py b/AI/ClickHouse/config/db_config.py new file mode 100644 index 00000000..af53aef7 --- /dev/null +++ b/AI/ClickHouse/config/db_config.py @@ -0,0 +1,19 @@ +# 数据库配置 +mysql_config = { + 'host': '10.10.14.101', + 'port': 3306, + 'user': 'ylt', + 'password': 'Ycharge666', + 'database': 'yltcharge', + 'charset': 'utf8mb4' +} + +ch_config = { + 'host': '10.10.14.250', + 'port': 9000, + 'settings': { + 'max_insert_block_size': 100000, + 'async_insert': 1, + 'wait_for_async_insert': 0 + } + } \ No newline at end of file diff --git a/AI/ClickHouse/get_mysql_table_structure.py b/AI/ClickHouse/get_mysql_table_structure.py new file mode 100644 index 00000000..838b1967 --- /dev/null +++ b/AI/ClickHouse/get_mysql_table_structure.py @@ -0,0 +1,130 @@ +import pymysql +import re +from config.db_config import * + + +def get_mysql_table_structure(host, port, user, password, database, table): + """获取MySQL表结构信息""" + conn = pymysql.connect(host=host, port=port, user=user, password=password, database=database) + + # 获取字段信息 + with conn.cursor() as cursor: + cursor.execute(f""" + SELECT column_name, data_type, column_type, is_nullable, column_default + FROM information_schema.columns + WHERE table_schema = '{database}' AND table_name = '{table}' + """) + columns = cursor.fetchall() + + # 获取索引信息 + with conn.cursor() as cursor: + cursor.execute(f""" + SELECT index_name, column_name, non_unique + FROM information_schema.statistics + WHERE table_schema = '{database}' AND table_name = '{table}' + """) + indexes = cursor.fetchall() + + # 获取主键信息 + with conn.cursor() as cursor: + cursor.execute(f""" + SELECT column_name + FROM information_schema.key_column_usage + WHERE table_schema = '{database}' + AND table_name = '{table}' + AND constraint_name = 'PRIMARY' + """) + primary_keys = [row[0] for row in cursor.fetchall()] + + conn.close() + return columns, indexes, primary_keys + + +def convert_to_clickhouse_type(mysql_type): + """将MySQL类型转换为ClickHouse类型,并处理无符号类型""" + # 去除unsigned标记并转换为小写 + type_str = re.sub(r'\sunsigned', '', mysql_type, flags=re.IGNORECASE).lower() + + # 类型映射 + type_map = { + 'tinyint': 'Int8', + 'smallint': 'Int16', + 'mediumint': 'Int32', + 'int': 'Int32', + 'bigint': 'Int64', + 'float': 'Float32', + 'double': 'Float64', + 'decimal': 'Decimal', + 'char': 'String', + 'varchar': 'String', + 'text': 'String', + 'longtext': 'String', + 'date': 'Date', + 'datetime': 'DateTime', + 'timestamp': 'DateTime', + 'enum': 'String' + } + + # 处理特殊类型 + if 'tinyint(1)' in type_str: + return 'UInt8' # 处理MySQL的布尔类型 + + # 匹配数字精度 + match = re.match(r'(\w+)(\(\d+,\d+\))?', type_str) + if match: + base_type = match.group(1) + precision = match.group(2) or '' + return type_map.get(base_type, 'String') + precision + + return type_map.get(type_str, 'String') + + +def generate_clickhouse_ddl(table_name, columns, indexes, primary_keys): + """生成ClickHouse建表语句""" + # 处理字段定义 + column_defs = [] + for col in columns: + name, data_type, column_type, is_nullable, default = col + ch_type = convert_to_clickhouse_type(column_type) + nullable = 'Nullable(' + ch_type + ')' if is_nullable == 'YES' else ch_type + column_defs.append(f' {name} {nullable}') + + # 处理主键 + order_by = 'ORDER BY (' + ', '.join(primary_keys) + ')' if primary_keys else '' + + # 处理索引(ClickHouse的索引语法不同) + index_defs = [] + for idx in indexes: + index_name, column_name, non_unique = idx + if index_name == 'PRIMARY': + continue + index_type = 'INDEX' if non_unique else 'UNIQUE' + index_defs.append(f' {index_type} {index_name} {column_name} TYPE minmax GRANULARITY 3') + + # 组合DDL语句 + ddl = f'CREATE TABLE {table_name} (\n' + ddl += ',\n'.join(column_defs) + if index_defs: + ddl += ',\n' + ',\n'.join(index_defs) + ddl += '\n) ENGINE = MergeTree()\n' + ddl += order_by + ddl += '\nSETTINGS index_granularity = 8192;' + + return ddl + + +# 使用示例 +if __name__ == '__main__': + table = 't_station' + host = mysql_config['host'] + port = mysql_config['port'] + user = mysql_config['user'] + password = mysql_config['password'] + database = mysql_config['database'] + + # 获取表结构 + columns, indexes, primary_keys = get_mysql_table_structure(host, port, user, password, database, table) + + # 生成ClickHouse DDL + ddl = generate_clickhouse_ddl(table, columns, indexes, primary_keys) + print(ddl) diff --git a/AI/ClickHouse/main.py b/AI/ClickHouse/main.py index 70c04bd2..7785c819 100644 --- a/AI/ClickHouse/main.py +++ b/AI/ClickHouse/main.py @@ -3,31 +3,11 @@ from connectors.clickhouse_connector import ClickHouseConnector from mappers.data_mapper import DataMapper from services.sync_service import SyncService from utils.logger import configure_logger - +from config.db_config import * logger = configure_logger() def main(): - # 数据库配置 - mysql_config = { - 'host': '10.10.14.101', - 'port': 3306, - 'user': 'ylt', - 'password': 'Ycharge666', - 'db': 'yltcharge', - 'charset': 'utf8mb4' - } - - ch_config = { - 'host': '10.10.14.250', - 'port': 9000, - 'settings': { - 'max_insert_block_size': 100000, - 'async_insert': 1, - 'wait_for_async_insert': 0 - } - } - # 初始化组件 mysql_conn = MySQLConnector(mysql_config) ch_conn = ClickHouseConnector(ch_config)