|
|
|
@ -1,19 +1,15 @@
|
|
|
|
|
import logging
|
|
|
|
|
from enum import Enum
|
|
|
|
|
from connectors.mysql_connector import MySQLConnector
|
|
|
|
|
from connectors.clickhouse_connector import ClickHouseConnector
|
|
|
|
|
from mappers.data_mapper import DataMapper
|
|
|
|
|
from services.sync_service import SyncService
|
|
|
|
|
from services.sync_service import SyncService, SyncType
|
|
|
|
|
from utils.logger import configure_logger
|
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
|
|
|
import time
|
|
|
|
|
from config.db_config import MYSQL_CONFIG, CH_CONFIG, print_config
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 定义同步类型的枚举
|
|
|
|
|
class SyncType(Enum):
|
|
|
|
|
FULL = "full"
|
|
|
|
|
INCREMENTAL = "incremental"
|
|
|
|
|
# 要同步的表
|
|
|
|
|
tables = ['t_station', 't_equipment', 't_equipment_charge_order']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 配置日志记录
|
|
|
|
@ -27,48 +23,43 @@ file_handler.setFormatter(formatter)
|
|
|
|
|
logger.addHandler(file_handler)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sync(tableName, sync_type_id):
|
|
|
|
|
logger.info(f"开始同步表: {tableName}, 同步类型: {sync_type_id.value}")
|
|
|
|
|
# 同步函数
|
|
|
|
|
def sync(table, sync_type):
|
|
|
|
|
logger.info(f"开始同步表: {table}, 同步类型: {sync_type}")
|
|
|
|
|
|
|
|
|
|
# 初始化组件
|
|
|
|
|
mysql_conn = MySQLConnector(MYSQL_CONFIG)
|
|
|
|
|
ch_conn = ClickHouseConnector(CH_CONFIG)
|
|
|
|
|
|
|
|
|
|
if sync_type_id == SyncType.FULL:
|
|
|
|
|
if sync_type == SyncType.FULL:
|
|
|
|
|
# 清空目标表
|
|
|
|
|
ch_conn.connect().execute(f"TRUNCATE TABLE {tableName}")
|
|
|
|
|
logger.info(f"已清空目标表: {tableName}")
|
|
|
|
|
ch_conn.connect().execute(f"TRUNCATE TABLE {table}")
|
|
|
|
|
logger.info(f"已清空目标表: {table}")
|
|
|
|
|
ch_conn.disconnect()
|
|
|
|
|
|
|
|
|
|
# 创建数据映射器(不再需要手动指定列)
|
|
|
|
|
mapper = DataMapper(mysql_conn, tableName)
|
|
|
|
|
# 创建数据映射器
|
|
|
|
|
mapper = DataMapper(mysql_conn, table)
|
|
|
|
|
|
|
|
|
|
# 创建同步服务
|
|
|
|
|
service = SyncService(
|
|
|
|
|
mysql_conn=mysql_conn,
|
|
|
|
|
ch_conn=ch_conn,
|
|
|
|
|
mapper=mapper
|
|
|
|
|
)
|
|
|
|
|
service = SyncService(mysql_conn=mysql_conn, ch_conn=ch_conn, mapper=mapper)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# 同步数据
|
|
|
|
|
service.sync_data(batch_size=5000, table=tableName,
|
|
|
|
|
sync_type=sync_type_id) # 确保这里传递的是 SyncType 枚举
|
|
|
|
|
logger.info(f"同步成功: {tableName}")
|
|
|
|
|
service.sync_data(batch_size=5000, table=table, sync_type=sync_type) # 确保这里传递的是 SyncType 枚举
|
|
|
|
|
logger.info(f"同步成功: {table}")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"同步失败: {str(e)}", exc_info=True)
|
|
|
|
|
finally:
|
|
|
|
|
ch_conn.disconnect()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tables = ['t_station', 't_equipment', 't_equipment_charge_order']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 全量同步
|
|
|
|
|
def full_sync():
|
|
|
|
|
for table in tables:
|
|
|
|
|
sync(table, SyncType.FULL)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 增量同步
|
|
|
|
|
def incremental_sync():
|
|
|
|
|
for table in tables:
|
|
|
|
|
sync(table, SyncType.INCREMENTAL)
|
|
|
|
@ -77,23 +68,19 @@ def incremental_sync():
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
# 输出配置信息
|
|
|
|
|
print_config()
|
|
|
|
|
|
|
|
|
|
# 先执行一次增量同步
|
|
|
|
|
incremental_sync()
|
|
|
|
|
|
|
|
|
|
# 创建调度器
|
|
|
|
|
scheduler = BackgroundScheduler()
|
|
|
|
|
# 每天2点执行全量同步
|
|
|
|
|
scheduler.add_job(full_sync, 'cron', hour=2, minute=0)
|
|
|
|
|
# 每小时执行增量同步,排除2点
|
|
|
|
|
scheduler.add_job(incremental_sync, 'cron', hour='*', minute=0, id='incremental_sync_job', replace_existing=True)
|
|
|
|
|
|
|
|
|
|
# 启动调度器
|
|
|
|
|
scheduler.start()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# 保持主线程运行
|
|
|
|
|
while True:
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
except (KeyboardInterrupt, SystemExit):
|
|
|
|
|
scheduler.shutdown()
|
|
|
|
|
scheduler.shutdown()
|
|
|
|
|