import pymysql from pymysql import OperationalError, InterfaceError from tqdm import tqdm import time import psutil import logging logger = logging.getLogger(__name__) class SyncService: def __init__(self, mysql_conn, ch_conn, mapper): self.mysql_conn = mysql_conn self.ch_conn = ch_conn self.mapper = mapper self.primary_key = 'id' # 主键字段名 self.columns = None def _load_table_columns(self, table): """加载表的列信息""" result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}") return [row[0] for row in result] def _get_last_id_from_ch(self, table): """从ClickHouse获取最大ID""" try: client = self.ch_conn.connect() result = client.execute(f"SELECT max({self.primary_key}) FROM {table}") return result[0][0] or 0 except Exception as e: logger.error(f"获取最大ID失败: {str(e)}") return 0 def sync_data(self, table, batch_size): """执行增量数据同步""" # 加载列信息 self.columns = self._load_table_columns(table) # 获取最后同步ID last_id = self._get_last_id_from_ch(table) logger.info(f"开始增量同步" + table + ",起始ID: {last_id}") # 使用同一个MySQL连接 mysql_conn = self.mysql_conn.connect() try: # 统计需要同步的总数 with mysql_conn.cursor() as count_cursor: count_query = f""" SELECT COUNT(*) FROM {table} WHERE {self.primary_key} > {last_id} """ count_cursor.execute(count_query) total = count_cursor.fetchone()[0] logger.info(f"待同步数据量: {total} 条") # 使用流式游标读取数据 with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor: query = f""" SELECT * FROM {table} WHERE {self.primary_key} > {last_id} ORDER BY {self.primary_key} """ cursor.execute(query) progress = tqdm(total=total, desc="同步进度", unit="rec") batch = [] current_max_id = last_id last_success_time = time.time() while True: try: # 每5分钟检查连接 if time.time() - last_success_time > 300: mysql_conn.ping(reconnect=True) last_success_time = time.time() row = cursor.fetchone() if not row: break # 更新当前最大ID current_id = row[0] if current_id > current_max_id: current_max_id = current_id # 映射数据 mapped = self.mapper.map_row(self.columns, row) batch.append(mapped) # 批量插入 if len(batch) >= batch_size: self._insert_batch(batch, table) progress.update(len(batch)) batch = [] last_success_time = time.time() except (OperationalError, InterfaceError) as e: logger.warning(f"连接中断,从ID {current_max_id} 恢复...") mysql_conn.ping(reconnect=True) time.sleep(5) continue # 插入剩余数据 if batch: self._insert_batch(batch, table) progress.update(len(batch)) progress.close() logger.info(f"同步完成,最后同步ID: {current_max_id}") finally: if mysql_conn: try: mysql_conn.close() except Exception as e: logger.error(f"关闭连接时发生错误: {str(e)}") def _insert_batch(self, batch, table): """带内存监控的批量插入""" # 内存检查 mem = psutil.virtual_memory() if mem.percent > 90: logger.warning("内存使用超过90%,暂停处理60秒") time.sleep(60) # 执行插入 try: self.ch_conn.connect().execute( f'INSERT INTO {table} VALUES', batch, types_check=True, settings={ 'date_time_input_format': 'best_effort', 'allow_experimental_analyzer': 0, 'input_format_null_as_default': 1 } ) except Exception as e: logger.error(f"批量插入失败: {str(e)}") # 可选:实现重试逻辑