You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

99 lines
2.7 KiB

5 months ago
import logging
from enum import Enum
5 months ago
from connectors.mysql_connector import MySQLConnector
from connectors.clickhouse_connector import ClickHouseConnector
from mappers.data_mapper import DataMapper
5 months ago
from services.sync_service import SyncService
5 months ago
from utils.logger import configure_logger
5 months ago
from apscheduler.schedulers.background import BackgroundScheduler
import time
5 months ago
from config.db_config import MYSQL_CONFIG, CH_CONFIG, print_config
# 定义同步类型的枚举
class SyncType(Enum):
FULL = "full"
INCREMENTAL = "incremental"
5 months ago
5 months ago
# 配置日志记录
5 months ago
logger = configure_logger()
5 months ago
# 添加文件处理器
file_handler = logging.FileHandler('sync.log')
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
5 months ago
def sync(tableName, sync_type_id):
5 months ago
logger.info(f"开始同步表: {tableName}, 同步类型: {sync_type_id.value}")
5 months ago
# 初始化组件
mysql_conn = MySQLConnector(MYSQL_CONFIG)
ch_conn = ClickHouseConnector(CH_CONFIG)
5 months ago
if sync_type_id == SyncType.FULL:
5 months ago
# 清空目标表
ch_conn.connect().execute(f"TRUNCATE TABLE {tableName}")
5 months ago
logger.info(f"已清空目标表: {tableName}")
5 months ago
ch_conn.disconnect()
# 创建数据映射器(不再需要手动指定列)
mapper = DataMapper(mysql_conn, tableName)
# 创建同步服务
service = SyncService(
mysql_conn=mysql_conn,
ch_conn=ch_conn,
mapper=mapper
)
try:
# 同步数据
service.sync_data(batch_size=5000, table=tableName,
5 months ago
sync_type=sync_type_id) # 确保这里传递的是 SyncType 枚举
logger.info(f"同步成功: {tableName}")
5 months ago
except Exception as e:
logger.error(f"同步失败: {str(e)}", exc_info=True)
finally:
ch_conn.disconnect()
5 months ago
5 months ago
tables = ['t_station', 't_equipment', 't_equipment_charge_order']
5 months ago
5 months ago
def full_sync():
for table in tables:
sync(table, SyncType.FULL)
def incremental_sync():
for table in tables:
sync(table, SyncType.INCREMENTAL)
5 months ago
if __name__ == "__main__":
5 months ago
# 输出配置信息
5 months ago
print_config()
5 months ago
# 先执行一次增量同步
incremental_sync()
# 创建调度器
scheduler = BackgroundScheduler()
# 每天2点执行全量同步
scheduler.add_job(full_sync, 'cron', hour=2, minute=0)
# 每小时执行增量同步排除2点
scheduler.add_job(incremental_sync, 'cron', hour='*', minute=0, id='incremental_sync_job', replace_existing=True)
# 启动调度器
scheduler.start()
try:
# 保持主线程运行
while True:
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()