from connectors.mysql_connector import MySQLConnector from connectors.clickhouse_connector import ClickHouseConnector from mappers.data_mapper import DataMapper from services.sync_service import SyncService, SyncType from utils.logger import configure_logger logger = configure_logger() from config.db_config import MYSQL_CONFIG, CH_CONFIG, print_config def sync(tableName, sync_type_id): # 初始化组件 mysql_conn = MySQLConnector(MYSQL_CONFIG) ch_conn = ClickHouseConnector(CH_CONFIG) if sync_type_id == 1: # 清空目标表 ch_conn.connect().execute(f"TRUNCATE TABLE {tableName}") ch_conn.disconnect() # 创建数据映射器(不再需要手动指定列) mapper = DataMapper(mysql_conn, tableName) # 创建同步服务 service = SyncService( mysql_conn=mysql_conn, ch_conn=ch_conn, mapper=mapper ) try: # 同步数据 service.sync_data(batch_size=5000, table=tableName, sync_type=sync_type_id) except Exception as e: logger.error(f"同步失败: {str(e)}", exc_info=True) finally: ch_conn.disconnect() if __name__ == "__main__": #输出配置信息 print_config() # 要处理的表名 tableName = 't_equipment_charge_order' sync(tableName, SyncType.INCREMENTAL)