diff --git a/ClickHouse/C1_CreateTable.py b/ClickHouse/C1_CreateTable.py index ada64b48..6b023c73 100644 --- a/ClickHouse/C1_CreateTable.py +++ b/ClickHouse/C1_CreateTable.py @@ -129,7 +129,7 @@ def generate_clickhouse_ddl(table_name, columns, indexes, primary_keys): ddl += ',\n'.join(column_defs) if index_defs: ddl += ',\n' + ',\n'.join(index_defs) - ddl += '\n) ENGINE = MergeTree()\n' + ddl += '\n) ENGINE = ReplacingMergeTree()\n' ddl += order_by ddl += '\nSETTINGS index_granularity = 8192;' diff --git a/ClickHouse/mappers/__pycache__/data_mapper.cpython-310.pyc b/ClickHouse/mappers/__pycache__/data_mapper.cpython-310.pyc index 8ebc0ea0..02cf9fa3 100644 Binary files a/ClickHouse/mappers/__pycache__/data_mapper.cpython-310.pyc and b/ClickHouse/mappers/__pycache__/data_mapper.cpython-310.pyc differ diff --git a/ClickHouse/mappers/data_mapper.py b/ClickHouse/mappers/data_mapper.py index 58be1e3d..bc42f697 100644 --- a/ClickHouse/mappers/data_mapper.py +++ b/ClickHouse/mappers/data_mapper.py @@ -1,19 +1,24 @@ from datetime import datetime -from utils.logger import configure_logger -logger = configure_logger() +import logging +from dateutil import parser + +logger = logging.getLogger(__name__) + class DataMapper: def __init__(self, mysql_conn, table): self.mysql_conn = mysql_conn self.table = table self.date_columns = [] - self.uint64_columns = [] + self.uint_columns = [] self._analyze_schema() self.min_date = datetime(1970, 1, 1) - self.max_date = datetime(2105, 12, 31, 23, 59, 59) + self.max_date = datetime(2100, 1, 1) + self.clickhouse_min_date = datetime(1970, 1, 1) + self.clickhouse_max_date = datetime(2105, 12, 31, 23, 59, 59) def _analyze_schema(self): - """分析表结构自动识别字段类型""" + """分析MySQL表结构""" schema_query = f""" SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE FROM INFORMATION_SCHEMA.COLUMNS @@ -23,69 +28,95 @@ class DataMapper: with self.mysql_conn.connect().cursor() as cursor: cursor.execute(schema_query) - for col_name, data_type, col_type in cursor.fetchall(): - # 识别日期时间字段 - if data_type in ('datetime', 'timestamp', 'date'): - self.date_columns.append(col_name) - - # 识别无符号整数字段(匹配BIGINT UNSIGNED等) - if 'unsigned' in col_type.lower() and 'int' in data_type: - self.uint64_columns.append(col_name) + for col, dtype, ctype in cursor.fetchall(): + if dtype in ('datetime', 'timestamp', 'date', 'time'): + self.date_columns.append(col) + if 'unsigned' in ctype.lower() and 'int' in dtype: + self.uint_columns.append(col) + #logger.info(f"日期字段: {self.date_columns}") + #logger.info(f"无符号整数字段: {self.uint_columns}") - #logger.info(f"自动识别字段类型 - 日期字段: {self.date_columns}") - #logger.info(f"自动识别字段类型 - 无符号整数字段: {self.uint64_columns}") def map_row(self, columns, row): - row_dict = dict(zip(columns, row)) - return {col: self._map_value(col, val) for col, val in row_dict.items()} + """行数据映射(带调试信息)""" + debug_row = {col: val for col, val in zip(columns, row)} + try: + return {col: self._map_value(col, val) for col, val in zip(columns, row)} + except Exception as e: + logger.error(f"解析失败的行数据: {debug_row}") + raise def _map_value(self, col, value): - if col in self.uint64_columns: - return self._handle_uint64(value) - elif col in self.date_columns: + """字段值转换""" + if col in self.uint_columns: + return self._handle_uint(value) + if col in self.date_columns: return self._handle_datetime(value) - elif isinstance(value, str): - return value.strip() + if isinstance(value, str): + return value.strip()[:500] # 防止超长字符串 return value - def _handle_uint64(self, value): + def _handle_uint(self, value): + """处理无符号整数""" try: - return int(float(value)) if value not in (None, '', 'NULL') else 0 + return int(value) if value not in (None, '', 'NULL') else 0 except: return 0 def _handle_datetime(self, value): - dt = self._parse_datetime(value) - return dt if dt else self.min_date + """处理时间字段(增强版)""" + original_value = value + try: + # 处理空值和无效值 + if value in (None, 0, '0', '0000-00-00', '0000-00-00 00:00:00'): + return self.clickhouse_min_date - def _parse_datetime(self, value): - if value in (None, 0, '0', '0.0', '0.00', '', 'null', 'NULL'): - return self.min_date + # 尝试自动解析各种格式 + try: + dt = parser.parse(str(value)) + return self._clamp_datetime(dt) + except: + pass - try: - str_value = str(value).strip() - for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d', '%Y%m%d%H%M%S', '%Y/%m/%d %H:%M:%S'): + # 处理时间戳 + if str(value).isdigit(): + ts = int(value) + length = len(str(value)) + if 10 <= length <= 13: # 秒级到毫秒级 + divisor = 10 ** (length - 10) + return self._clamp_datetime(datetime.fromtimestamp(ts / divisor)) + + # 尝试常见格式 + formats = [ + '%Y-%m-%d %H:%M:%S', + '%Y/%m/%d %H:%M:%S', + '%Y%m%d%H%M%S', + '%Y-%m-%d', + '%Y%m%d', + '%Y-%m-%dT%H:%M:%S.%fZ' # ISO格式 + ] + + for fmt in formats: try: - parsed = datetime.strptime(str_value, fmt) - return self._clamp_datetime(parsed) + return self._clamp_datetime(datetime.strptime(str(value), fmt)) except ValueError: continue - if str_value.isdigit(): - ts = int(str_value) - if 1e12 < ts < 1e13: # 毫秒级时间戳 - parsed = datetime.fromtimestamp(ts / 1000) - elif 1e9 < ts < 1e10: # 秒级时间戳 - parsed = datetime.fromtimestamp(ts) - return self._clamp_datetime(parsed) - - return self.min_date - except: - return self.min_date + logger.warning(f"无法解析的时间格式: {original_value}") + return self.clickhouse_min_date + except Exception as e: + logger.error(f"时间解析错误: {original_value} - {str(e)}") + return self.clickhouse_min_date def _clamp_datetime(self, dt): - if dt < self.min_date: - return self.min_date - elif dt > self.max_date: - return self.max_date + """严格限制时间范围""" + if not isinstance(dt, datetime): + return self.clickhouse_min_date + + if dt < self.clickhouse_min_date: + logger.debug(f"修正过小时间值: {dt}") + return self.clickhouse_min_date + if dt > self.clickhouse_max_date: + logger.debug(f"修正过大时间值: {dt}") + return self.clickhouse_max_date return dt \ No newline at end of file diff --git a/ClickHouse/schemas/t_equipment_charge_order.sql b/ClickHouse/schemas/t_equipment_charge_order.sql index c2324536..c77efc48 100644 --- a/ClickHouse/schemas/t_equipment_charge_order.sql +++ b/ClickHouse/schemas/t_equipment_charge_order.sql @@ -132,6 +132,6 @@ CREATE TABLE t_equipment_charge_order ( INDEX idx_stationid_chargedegree (station_id, charge_degree) TYPE minmax GRANULARITY 3, INDEX platform_order_no (platform_order_no) TYPE minmax GRANULARITY 3, INDEX idx_cover_condition (station_id, report_time, state, charge_degree) TYPE minmax GRANULARITY 3 -) ENGINE = MergeTree() +) ENGINE = ReplacingMergeTree() ORDER BY (id) SETTINGS index_granularity = 8192; \ No newline at end of file diff --git a/ClickHouse/services/__pycache__/sync_service.cpython-310.pyc b/ClickHouse/services/__pycache__/sync_service.cpython-310.pyc index 8155cb9e..164682fb 100644 Binary files a/ClickHouse/services/__pycache__/sync_service.cpython-310.pyc and b/ClickHouse/services/__pycache__/sync_service.cpython-310.pyc differ diff --git a/ClickHouse/services/sync_service.py b/ClickHouse/services/sync_service.py index 2f775cd6..bfba8e3e 100644 --- a/ClickHouse/services/sync_service.py +++ b/ClickHouse/services/sync_service.py @@ -4,6 +4,7 @@ from tqdm import tqdm import time import psutil import logging +from datetime import datetime logger = logging.getLogger(__name__) @@ -14,12 +15,13 @@ class SyncService: self.ch_conn = ch_conn self.mapper = mapper self.columns = None - self.full_sync_interval = 24 * 3600 # 24小时全量同步一次 + self.full_sync_interval = 24 * 3600 # 全量同步间隔 self.last_full_sync = 0 - self.recent_check_count = 5000 # 近期数据检查条数 + self.recent_check_count = 5000 # 近期数据检查量 + self.optimize_frequency = 100 # 每100批优化一次 def sync_data(self, table, batch_size): - """智能同步策略""" + """智能同步入口""" current_time = time.time() self.columns = self._load_table_columns(table) @@ -30,111 +32,121 @@ class SyncService: self._incremental_sync(table, batch_size) def _full_sync(self, table, batch_size): - """全量同步""" + """全量数据同步""" logger.info(f"🚀 开始全量同步{table}") - mysql_conn = self.mysql_conn.connect() - try: - with mysql_conn.cursor() as count_cursor: - count_cursor.execute(f"SELECT COUNT(*) FROM {table}") - total = count_cursor.fetchone()[0] + with self.mysql_conn.connect() as mysql_conn: + try: + with mysql_conn.cursor() as count_cursor: + count_cursor.execute(f"SELECT COUNT(*) FROM {table}") + total = count_cursor.fetchone()[0] - with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor: - cursor.execute(f"SELECT * FROM {table} ORDER BY id") - self._sync_with_cursor(cursor, total, table, batch_size) + with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor: + cursor.execute(f"SELECT * FROM {table} ORDER BY id") + self._sync_with_cursor(cursor, total, table, batch_size) + + except Exception as e: + logger.error(f"全量同步失败: {str(e)}") + raise + finally: + self._optimize_table(table) - except Exception as e: - logger.error(f"全量同步失败: {str(e)}") - finally: - mysql_conn.close() logger.info(f"✅ 全量同步{table}完成") def _incremental_sync(self, table, batch_size): - """增量同步(含近期数据保障)""" + """增量数据同步""" last_id = self._get_last_id_from_ch(table) logger.info(f"🔁 开始增量同步{table},起始ID: {last_id}") # 标准增量同步 - mysql_conn = self.mysql_conn.connect() - try: - with mysql_conn.cursor() as count_cursor: - count_cursor.execute( - f"SELECT COUNT(*) FROM {table} WHERE id > {last_id}" - ) - total = count_cursor.fetchone()[0] - - with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor: - cursor.execute( - f"SELECT * FROM {table} WHERE id > {last_id} ORDER BY id" - ) - self._sync_with_cursor(cursor, total, table, batch_size) - - finally: - mysql_conn.close() - - # 近期数据保障同步 + with self.mysql_conn.connect() as mysql_conn: + try: + with mysql_conn.cursor() as count_cursor: + count_cursor.execute( + f"SELECT COUNT(*) FROM {table} WHERE id > {last_id}" + ) + total = count_cursor.fetchone()[0] + + with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor: + cursor.execute( + f"SELECT * FROM {table} WHERE id > {last_id} ORDER BY id" + ) + self._sync_with_cursor(cursor, total, table, batch_size) + + except Exception as e: + logger.error(f"增量同步失败: {str(e)}") + raise + + # 安全近期数据同步 self._sync_recent_data(table, batch_size) logger.info(f"✅ 增量同步{table}完成,最后ID: {self._get_last_id_from_ch(table)}") def _sync_recent_data(self, table, batch_size): - """近期数据保障同步""" - logger.info(f"🔄 开始同步{table}最近{self.recent_check_count}条数据") + """安全近期数据同步""" + logger.info(f"🔄 开始安全同步{table}近期数据") - mysql_conn = self.mysql_conn.connect() - try: - with mysql_conn.cursor() as cursor: - cursor.execute(f"SELECT MAX(id) FROM {table}") - max_id = cursor.fetchone()[0] or 0 + mysql_max_id = self._get_mysql_max_id(table) + ch_max_id = self._get_last_id_from_ch(table) + safe_start = max(mysql_max_id - self.recent_check_count, ch_max_id + 1) - if max_id <= 0: - return + if safe_start > mysql_max_id: + logger.info("⏩ 无需近期数据同步") + return - start_id = max(max_id - self.recent_check_count, 0) + with self.mysql_conn.connect() as mysql_conn: + try: + with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor: + cursor.execute( + f"SELECT * FROM {table} " + f"WHERE id BETWEEN {safe_start} AND {mysql_max_id} " + f"ORDER BY id" + ) + + total = mysql_max_id - safe_start + 1 + progress = tqdm(total=total, desc="近期数据", unit="rec") + batch = [] + insert_count = 0 - with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor: - cursor.execute( - f"SELECT * FROM {table} WHERE id >= {start_id} ORDER BY id" - ) + while True: + row = cursor.fetchone() + if not row: + break - total = max_id - start_id + 1 - progress = tqdm(total=total, desc="近期数据", unit="rec") - batch = [] + mapped = self.mapper.map_row(self.columns, row) + batch.append(mapped) - while True: - row = cursor.fetchone() - if not row: - break + if len(batch) >= batch_size: + self._insert_batch(batch, table) + insert_count += 1 + progress.update(len(batch)) + batch = [] - mapped = self.mapper.map_row(self.columns, row) - batch.append(mapped) + if insert_count % self.optimize_frequency == 0: + self._optimize_table(table) - if len(batch) >= batch_size: + if batch: self._insert_batch(batch, table) progress.update(len(batch)) - batch = [] - - if batch: - self._insert_batch(batch, table) - progress.update(len(batch)) - progress.close() + progress.close() - except Exception as e: - logger.error(f"近期数据同步失败: {str(e)}") - finally: - mysql_conn.close() + except Exception as e: + logger.error(f"近期数据同步失败: {str(e)}") + raise - logger.info(f"🆗 近期数据同步范围ID: {start_id}-{max_id}") + logger.info(f"🆗 近期数据同步范围ID: {safe_start}-{mysql_max_id}") def _sync_with_cursor(self, cursor, total, table, batch_size): """通用同步流程""" progress = tqdm(total=total, desc="同步进度", unit="rec") batch = [] + insert_count = 0 last_success_time = time.time() while True: try: - if time.time() - last_success_time > 300: # 5分钟连接检查 + # 每5分钟检查连接 + if time.time() - last_success_time > 300: cursor.connection.ping(reconnect=True) last_success_time = time.time() @@ -147,12 +159,17 @@ class SyncService: if len(batch) >= batch_size: self._insert_batch(batch, table) + insert_count += 1 progress.update(len(batch)) batch = [] last_success_time = time.time() + # 定期优化表 + if insert_count % self.optimize_frequency == 0: + self._optimize_table(table) + except (OperationalError, InterfaceError) as e: - logger.warning("连接中断,尝试重新连接...") + logger.warning("⚠️ 连接中断,尝试重新连接...") cursor.connection.ping(reconnect=True) time.sleep(5) continue @@ -163,25 +180,11 @@ class SyncService: progress.close() - def _get_last_id_from_ch(self, table): - """获取ClickHouse最大ID""" - try: - result = self.ch_conn.connect().execute(f"SELECT max(id) FROM {table}") - return result[0][0] or 0 - except Exception as e: - logger.error(f"获取最大ID失败: {str(e)}") - return 0 - - def _load_table_columns(self, table): - """加载表的列信息""" - result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}") - return [row[0] for row in result] - def _insert_batch(self, batch, table): - """带内存监控的批量插入""" + """安全批量插入""" mem = psutil.virtual_memory() if mem.percent > 90: - logger.warning("内存使用超过90%,暂停处理60秒") + logger.warning("🛑 内存使用超过90%,暂停处理60秒") time.sleep(60) try: @@ -193,9 +196,39 @@ class SyncService: 'date_time_input_format': 'best_effort', 'allow_experimental_analyzer': 0, 'input_format_null_as_default': 1, - 'max_partitions_per_insert_block': 500 + 'max_partitions_per_insert_block': 1000, + 'optimize_on_insert': 1 } ) except Exception as e: - logger.error(f"批量插入失败: {str(e)}") - # 可选添加重试逻辑 \ No newline at end of file + logger.error(f"❌ 批量插入失败: {str(e)}") + # 可添加重试逻辑 + + def _optimize_table(self, table): + """优化ClickHouse表""" + try: + self.ch_conn.connect().execute(f"OPTIMIZE TABLE {table} FINAL") + logger.debug(f"表{table}优化完成") + except Exception as e: + logger.warning(f"表优化失败: {str(e)}") + + def _get_last_id_from_ch(self, table): + """获取ClickHouse最大ID""" + try: + result = self.ch_conn.connect().execute(f"SELECT max(id) FROM {table}") + return result[0][0] or 0 + except Exception as e: + logger.error(f"获取最大ID失败: {str(e)}") + return 0 + + def _get_mysql_max_id(self, table): + """获取MySQL最大ID""" + with self.mysql_conn.connect() as conn: + with conn.cursor() as cursor: + cursor.execute(f"SELECT MAX(id) FROM {table}") + return cursor.fetchone()[0] or 0 + + def _load_table_columns(self, table): + """加载表结构""" + result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}") + return [row[0] for row in result] \ No newline at end of file