from connectors.mysql_connector import MySQLConnector from connectors.clickhouse_connector import ClickHouseConnector from mappers.data_mapper import DataMapper from services.sync_service import SyncService from utils.logger import configure_logger logger = configure_logger() from config.db_config import MYSQL_CONFIG, CH_CONFIG # type_id = 1 全量 # type_id = 2 增量 def sync(tableName, type_id): # 初始化组件 mysql_conn = MySQLConnector(MYSQL_CONFIG) ch_conn = ClickHouseConnector(CH_CONFIG) if type_id == 1: logger.info(f"开始全量同步表{tableName}") # 清空目标表 ch_conn.connect().execute(f"TRUNCATE TABLE {tableName}") ch_conn.disconnect() else: logger.info(f"开始增量同步表{tableName}") # 创建数据映射器(不再需要手动指定列) mapper = DataMapper(mysql_conn, tableName) # 创建同步服务 service = SyncService( mysql_conn=mysql_conn, ch_conn=ch_conn, mapper=mapper ) try: # 同步数据 service.sync_data(batch_size=5000, table=tableName) except Exception as e: logger.error(f"同步失败: {str(e)}", exc_info=True) finally: ch_conn.disconnect() if __name__ == "__main__": # 要处理的表名 tableName = 't_equipment_charge_order' sync(tableName, 1)