From ea738b77e24ebc664458aa58781cd495ac56a873 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Mon, 24 Feb 2025 08:07:05 +0800 Subject: [PATCH] 'commit' --- AI/SyncData/C2_SyncData.py | 47 +++++++----------- .../__pycache__/sync_service.cpython-310.pyc | Bin 8516 -> 8508 bytes AI/SyncData/services/sync_service.py | 4 +- 3 files changed, 19 insertions(+), 32 deletions(-) diff --git a/AI/SyncData/C2_SyncData.py b/AI/SyncData/C2_SyncData.py index dcdb0f94..cfd05beb 100644 --- a/AI/SyncData/C2_SyncData.py +++ b/AI/SyncData/C2_SyncData.py @@ -1,19 +1,15 @@ import logging -from enum import Enum from connectors.mysql_connector import MySQLConnector from connectors.clickhouse_connector import ClickHouseConnector from mappers.data_mapper import DataMapper -from services.sync_service import SyncService +from services.sync_service import SyncService, SyncType from utils.logger import configure_logger from apscheduler.schedulers.background import BackgroundScheduler import time from config.db_config import MYSQL_CONFIG, CH_CONFIG, print_config - -# 定义同步类型的枚举 -class SyncType(Enum): - FULL = "full" - INCREMENTAL = "incremental" +# 要同步的表 +tables = ['t_station', 't_equipment', 't_equipment_charge_order'] # 配置日志记录 @@ -27,48 +23,43 @@ file_handler.setFormatter(formatter) logger.addHandler(file_handler) -def sync(tableName, sync_type_id): - logger.info(f"开始同步表: {tableName}, 同步类型: {sync_type_id.value}") +# 同步函数 +def sync(table, sync_type): + logger.info(f"开始同步表: {table}, 同步类型: {sync_type}") # 初始化组件 mysql_conn = MySQLConnector(MYSQL_CONFIG) ch_conn = ClickHouseConnector(CH_CONFIG) - if sync_type_id == SyncType.FULL: + if sync_type == SyncType.FULL: # 清空目标表 - ch_conn.connect().execute(f"TRUNCATE TABLE {tableName}") - logger.info(f"已清空目标表: {tableName}") + ch_conn.connect().execute(f"TRUNCATE TABLE {table}") + logger.info(f"已清空目标表: {table}") ch_conn.disconnect() - # 创建数据映射器(不再需要手动指定列) - mapper = DataMapper(mysql_conn, tableName) + # 创建数据映射器 + mapper = DataMapper(mysql_conn, table) # 创建同步服务 - service = SyncService( - mysql_conn=mysql_conn, - ch_conn=ch_conn, - mapper=mapper - ) + service = SyncService(mysql_conn=mysql_conn, ch_conn=ch_conn, mapper=mapper) try: # 同步数据 - service.sync_data(batch_size=5000, table=tableName, - sync_type=sync_type_id) # 确保这里传递的是 SyncType 枚举 - logger.info(f"同步成功: {tableName}") + service.sync_data(batch_size=5000, table=table, sync_type=sync_type) # 确保这里传递的是 SyncType 枚举 + logger.info(f"同步成功: {table}") except Exception as e: logger.error(f"同步失败: {str(e)}", exc_info=True) finally: ch_conn.disconnect() -tables = ['t_station', 't_equipment', 't_equipment_charge_order'] - - +# 全量同步 def full_sync(): for table in tables: sync(table, SyncType.FULL) +# 增量同步 def incremental_sync(): for table in tables: sync(table, SyncType.INCREMENTAL) @@ -77,23 +68,19 @@ def incremental_sync(): if __name__ == "__main__": # 输出配置信息 print_config() - # 先执行一次增量同步 incremental_sync() - # 创建调度器 scheduler = BackgroundScheduler() # 每天2点执行全量同步 scheduler.add_job(full_sync, 'cron', hour=2, minute=0) # 每小时执行增量同步,排除2点 scheduler.add_job(incremental_sync, 'cron', hour='*', minute=0, id='incremental_sync_job', replace_existing=True) - # 启动调度器 scheduler.start() - try: # 保持主线程运行 while True: time.sleep(1) except (KeyboardInterrupt, SystemExit): - scheduler.shutdown() \ No newline at end of file + scheduler.shutdown() diff --git a/AI/SyncData/services/__pycache__/sync_service.cpython-310.pyc b/AI/SyncData/services/__pycache__/sync_service.cpython-310.pyc index 4299a0d952826f05380fb8d6d2bfb750d0092c0f..f9d70fd921dd260b60154ecdcfc7a0776c51981a 100644 GIT binary patch delta 711 zcmZ9|&ubGw6bJBqlcdqawZW_*O+u0udWiUg3bsUTo5a*MwrbTvHKvJ?g;2LP?IZ}r z1woIZ2j5A(w10ufUIe`eiYKAHR6Kg{YL5znINvFvxQEZa-S3;7ee2x&mo~^X8sz%A;+D@5 zGOFJCU2?^SAj9@wF?148NmeC2|0dm_t=2s^Y=O1vj&B6m-upAWM;=id$hd-6L-zwA z5uuRxBeX6k&ri*?e;mnM>mUt#TjnpK)4bUm?-|9SG5*$jCxZq#01BW8O5hAA^MlCo z{=6!y!KG%W+gP-vh2Sih1b=qmP1En~M2dnYc_{jV3j8H{o+kL1bvU9o(cObsl4r+q zsO){Ro)JxXGBzv13kc>woj=BN)a38+0jhe5J}GFFA0;!%F{J0fv|>(PM0ob~1+omGC4 z8ch71^afg*;1IaUTd62jysxP`g>~2Nq4R}wjK+1i;;p5pi5B>MCZ`T=CsRLq3QLE< m2q*(Rrv9x89#_Bw*pIL9d)l^#w0&56RA-rM_b}yQE zH8Jm|m-a6(W^YF0#fvB7(I!1;j2`uFV*0+%rkd{Iv)_5&nR##KP4<0O>uZ`<5ug9d z>(ksbn zw)e5irv}Jp|MG6Dl;+R0I2pVgj?k3-LHk3A%Od-sPhT$NZ{+p1sb_RX;^;cdpw|rl zKDwDOAd|o>kOi&)IUvus0_WmsAy+)>^^IYzZL$!It3Uzxd*ihIEs#^Gz*E7OG|N8+ zuhATz2%QNShl!`4(!3e!Qr`X;dQMcdJK;suUxjfCs0naAe4Ot^#;C!6Mv}B@kDp*F zW%!e5B07Vl1)wBwlPyAC0LuJbwB#;AF7dHgaLs@`GL0P52}8~rhPn-4t?Ul40;~cR zU=6q{kkVKb`e2AhvB^mTv0OnNssWq^nt=ZhqfqnqUaU$N4QTlcyddTL*L2s}?Du+1 zCLF09*n{{2QI)?