main
黄海 5 months ago
parent d6d5085041
commit a8051455a8

@ -1,16 +1,35 @@
import logging
from enum import Enum
from connectors.mysql_connector import MySQLConnector
from connectors.clickhouse_connector import ClickHouseConnector
from mappers.data_mapper import DataMapper
from services.sync_service import SyncService, SyncType
from services.sync_service import SyncService
from utils.logger import configure_logger
from apscheduler.schedulers.background import BackgroundScheduler
import time
from config.db_config import MYSQL_CONFIG, CH_CONFIG, print_config
# 定义同步类型的枚举
class SyncType(Enum):
FULL = "full"
INCREMENTAL = "incremental"
# 配置日志记录
logger = configure_logger()
from config.db_config import MYSQL_CONFIG, CH_CONFIG, print_config
# 添加文件处理器
file_handler = logging.FileHandler('sync.log')
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
def sync(tableName, sync_type_id):
logger.info(f"开始同步表: {tableName}, 同步类型: {sync_type_id.value}")
# 初始化组件
mysql_conn = MySQLConnector(MYSQL_CONFIG)
ch_conn = ClickHouseConnector(CH_CONFIG)
@ -18,6 +37,7 @@ def sync(tableName, sync_type_id):
if sync_type_id == SyncType.FULL:
# 清空目标表
ch_conn.connect().execute(f"TRUNCATE TABLE {tableName}")
logger.info(f"已清空目标表: {tableName}")
ch_conn.disconnect()
# 创建数据映射器(不再需要手动指定列)
@ -33,13 +53,17 @@ def sync(tableName, sync_type_id):
try:
# 同步数据
service.sync_data(batch_size=5000, table=tableName,
sync_type=sync_type_id)
sync_type=sync_type_id) # 确保这里传递的是 SyncType 枚举
logger.info(f"同步成功: {tableName}")
except Exception as e:
logger.error(f"同步失败: {str(e)}", exc_info=True)
finally:
ch_conn.disconnect()
tables = ['t_station', 't_equipment', 't_equipment_charge_order']
def full_sync():
for table in tables:
sync(table, SyncType.FULL)

@ -92,10 +92,6 @@ def main():
# 激活虚拟环境并安装依赖
cmd = f'{venv_path}/bin/pip install -r {remote_path}/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple'
execute_remote_command(ssh, cmd)
cmd = f'/usr/local/SyncData/venv/bin/python3 -m pip install --upgrade pip'
execute_remote_command(ssh, cmd)
finally:
# 关闭 SFTP 和 SSH 连接
sftp.close()

@ -8,12 +8,10 @@ from datetime import datetime
logger = logging.getLogger(__name__)
class SyncType:
FULL = 'full'
INCREMENTAL = 'incremental'
class SyncService:
def __init__(self, mysql_conn, ch_conn, mapper):
self.mysql_conn = mysql_conn
@ -62,9 +60,9 @@ class SyncService:
"""
self.columns = self._load_table_columns(table)
if sync_type == SyncType.FULL:
if sync_type.value == SyncType.FULL:
self._full_sync(table, batch_size)
elif sync_type == SyncType.INCREMENTAL:
elif sync_type.value == SyncType.INCREMENTAL:
self._incremental_sync(table, batch_size)
else:
raise ValueError(f"无效的同步类型: {sync_type},可选值:{SyncType.FULL}/{SyncType.INCREMENTAL}")
@ -259,4 +257,4 @@ class SyncService:
self.ch_conn.connect().execute(f"OPTIMIZE TABLE {table} FINAL")
logger.debug(f"{table}优化完成")
except Exception as e:
logger.warning(f"表优化失败: {str(e)}")
logger.warning(f"表优化失败: {str(e)}")
Loading…
Cancel
Save