from tqdm import tqdm class SyncService: def __init__(self, mysql_conn, ch_conn, mapper, schema_path): self.mysql_conn = mysql_conn self.ch_conn = ch_conn self.mapper = mapper self.schema_path = schema_path self.columns = None # 初始化为None,稍后加载 def create_ch_table(self): """创建ClickHouse表""" with open(self.schema_path, 'r', encoding='utf-8') as f: create_sql = f.read() client = self.ch_conn.connect() client.execute("DROP TABLE IF EXISTS t_equipment_charge_order") # 删除表(如果存在) client.execute(create_sql) # 创建新表 def _load_table_columns(self): """加载表的列信息""" result = self.ch_conn.connect().execute("DESCRIBE TABLE t_equipment_charge_order") return [row[0] for row in result] def sync_data(self, batch_size=5000): # 加载表的列信息 self.columns = self._load_table_columns() count_conn = self.mysql_conn.create_count_connection() mysql_conn = self.mysql_conn.connect() with count_conn.cursor() as count_cursor: count_cursor.execute("SELECT COUNT(*) FROM t_equipment_charge_order") total = count_cursor.fetchone()[0] with mysql_conn.cursor() as cursor: cursor.execute("SELECT * FROM t_equipment_charge_order ORDER BY id") progress = tqdm(total=total, desc="同步进度", unit="rec") batch = [] while True: row = cursor.fetchone() if not row: break mapped = self.mapper.map_row(self.columns, row) batch.append(mapped) if len(batch) >= batch_size: self._insert_batch(batch) progress.update(len(batch)) batch = [] if batch: self._insert_batch(batch) progress.update(len(batch)) progress.close() def _insert_batch(self, batch): """批量插入数据到ClickHouse""" self.ch_conn.connect().execute( 'INSERT INTO t_equipment_charge_order VALUES', batch, types_check=True, settings={ 'date_time_input_format': 'best_effort', 'allow_experimental_analyzer': 0, 'input_format_null_as_default': 1 } )