diff --git a/ClickHouse.rar b/ClickHouse.rar new file mode 100644 index 00000000..b7a0196d Binary files /dev/null and b/ClickHouse.rar differ diff --git a/ClickHouse/C2_SyncData.py b/ClickHouse/C2_SyncData.py index d37e8e71..cc63ad38 100644 --- a/ClickHouse/C2_SyncData.py +++ b/ClickHouse/C2_SyncData.py @@ -13,31 +13,17 @@ def main(): mysql_conn = MySQLConnector(MYSQL_CONFIG) ch_conn = ClickHouseConnector(CH_CONFIG) - mapper = DataMapper( - date_columns=[ - 'order_time', 'charge_begin_time', 'charge_end_time', - 'pay_time', 'cancel_time', 'finish_time', 'report_time', - 'create_time', 'update_time' - ], - uint64_columns=[ - 'id', 'user_id', 'company_id', 'station_id', 'equipment_id', - 'connector_id', 'user_coupon_id', 'invoice_id', 'charge_market_id', - 'company_activity_id', 'charge_begin_soc', 'charge_end_soc', - 'charge_cur_soc', 'charge_duration', 'charge_settle_type', - 'order_type', 'pay_method', 'coupon_type', 'ticket_status', - 'charge_discount_type', 'settle_method', 'settle_status', - 'cancel_type', 'cancel_code', 'finish_type', 'finish_code', - 'boot_mode', 'settlement_type', 'is_parking_coupon', 'exception_flag', - 'has_shallow_report', 'has_deep_report', 'shallow_report_is_received', - 'com_charge_discount_type', 'special_channel_order_source' - ] - ) # 要处理的表名 tableName = 't_equipment_charge_order' - #tableName="t_station" + # 创建数据映射器(不再需要手动指定列) + mapper = DataMapper(mysql_conn, tableName) + # 创建同步服务 service = SyncService( - mysql_conn=mysql_conn, ch_conn=ch_conn, mapper=mapper, schema_path="schemas/" + tableName + ".sql" + mysql_conn=mysql_conn, + ch_conn=ch_conn, + mapper=mapper, + schema_path="schemas/" + tableName + ".sql" ) try: diff --git a/ClickHouse/mappers/__pycache__/data_mapper.cpython-310.pyc b/ClickHouse/mappers/__pycache__/data_mapper.cpython-310.pyc index 6b456836..f716d9ad 100644 Binary files a/ClickHouse/mappers/__pycache__/data_mapper.cpython-310.pyc and b/ClickHouse/mappers/__pycache__/data_mapper.cpython-310.pyc differ diff --git a/ClickHouse/mappers/data_mapper.py b/ClickHouse/mappers/data_mapper.py index 9afed233..58be1e3d 100644 --- a/ClickHouse/mappers/data_mapper.py +++ b/ClickHouse/mappers/data_mapper.py @@ -1,13 +1,40 @@ from datetime import datetime - +from utils.logger import configure_logger +logger = configure_logger() class DataMapper: - def __init__(self, date_columns, uint64_columns): - self.date_columns = date_columns - self.uint64_columns = uint64_columns + def __init__(self, mysql_conn, table): + self.mysql_conn = mysql_conn + self.table = table + self.date_columns = [] + self.uint64_columns = [] + self._analyze_schema() self.min_date = datetime(1970, 1, 1) self.max_date = datetime(2105, 12, 31, 23, 59, 59) + def _analyze_schema(self): + """分析表结构自动识别字段类型""" + schema_query = f""" + SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = '{self.table}' + """ + + with self.mysql_conn.connect().cursor() as cursor: + cursor.execute(schema_query) + for col_name, data_type, col_type in cursor.fetchall(): + # 识别日期时间字段 + if data_type in ('datetime', 'timestamp', 'date'): + self.date_columns.append(col_name) + + # 识别无符号整数字段(匹配BIGINT UNSIGNED等) + if 'unsigned' in col_type.lower() and 'int' in data_type: + self.uint64_columns.append(col_name) + + + #logger.info(f"自动识别字段类型 - 日期字段: {self.date_columns}") + #logger.info(f"自动识别字段类型 - 无符号整数字段: {self.uint64_columns}") def map_row(self, columns, row): row_dict = dict(zip(columns, row)) return {col: self._map_value(col, val) for col, val in row_dict.items()} diff --git a/ClickHouse/services/__pycache__/sync_service.cpython-310.pyc b/ClickHouse/services/__pycache__/sync_service.cpython-310.pyc index ad87b4f5..aea0f4d9 100644 Binary files a/ClickHouse/services/__pycache__/sync_service.cpython-310.pyc and b/ClickHouse/services/__pycache__/sync_service.cpython-310.pyc differ diff --git a/ClickHouse/services/sync_service.py b/ClickHouse/services/sync_service.py index eb558c78..64ae7751 100644 --- a/ClickHouse/services/sync_service.py +++ b/ClickHouse/services/sync_service.py @@ -18,6 +18,17 @@ class SyncService: client.execute("DROP TABLE IF EXISTS "+table) # 删除表(如果存在) client.execute(create_sql) # 创建新表 + def get_table_schema(self, table): + """获取表的字段类型信息""" + schema_query = f""" + SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = '{table}' + """ + with self.mysql_conn.connect().cursor() as cursor: + cursor.execute(schema_query) + return {row[0]: (row[1], row[2]) for row in cursor.fetchall()} def _load_table_columns(self,table): """加载表的列信息""" result = self.ch_conn.connect().execute("DESCRIBE TABLE "+table)