You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

84 lines
2.9 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import logging
from connectors.mysql_connector import MySQLConnector
from connectors.clickhouse_connector import ClickHouseConnector
from mappers.data_mapper import DataMapper
from services.sync_service import SyncService, SyncType
from utils.logger import configure_logger
from apscheduler.schedulers.background import BackgroundScheduler
import time
from config.db_config import MYSQL_CONFIG, CH_CONFIG, print_config
from concurrent.futures import ThreadPoolExecutor
# 要同步的表
tables = ['t_station', 't_equipment', 't_equipment_charge_order']
# 线程数量
MAX_WORKERS =8
# 配置日志记录
logger = configure_logger()
# 添加文件处理器
file_handler = logging.FileHandler('sync.log')
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 同步函数
def sync(table, sync_type):
logger.info(f"开始同步表: {table}, 同步类型: {sync_type}")
# 初始化组件
mysql_conn = MySQLConnector(MYSQL_CONFIG)
ch_conn = ClickHouseConnector(CH_CONFIG)
if sync_type == SyncType.FULL:
# 清空目标表
ch_conn.connect().execute(f"TRUNCATE TABLE {table}")
logger.info(f"已清空目标表: {table}")
ch_conn.disconnect()
# 创建数据映射器
mapper = DataMapper(mysql_conn, table)
# 创建同步服务
service = SyncService(mysql_conn=mysql_conn, ch_conn=ch_conn, mapper=mapper)
try:
# 同步数据
service.sync_data(batch_size=5000, table=table, sync_type=sync_type) # 确保这里传递的是 SyncType 枚举
logger.info(f"同步成功: {table}")
except Exception as e:
logger.error(f"同步失败: {str(e)}", exc_info=True)
finally:
ch_conn.disconnect()
# 全量同步
def full_sync():
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # 设置最大线程数
executor.map(lambda table: sync(table, SyncType.FULL), tables)
# 增量同步
def incremental_sync():
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # 设置最大线程数
executor.map(lambda table: sync(table, SyncType.INCREMENTAL), tables)
if __name__ == "__main__":
# 输出配置信息
print_config()
# 先执行一次增量同步
incremental_sync()
# 创建调度器
scheduler = BackgroundScheduler()
# 每天2点执行全量同步
scheduler.add_job(full_sync, 'cron', hour=2, minute=0)
# 每小时执行增量同步排除2点
scheduler.add_job(incremental_sync, 'cron', hour='*', minute=0, id='incremental_sync_job', replace_existing=True)
# 启动调度器
scheduler.start()
try:
# 保持主线程运行
while True:
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()