main
黄海 5 months ago
parent ba61be7d77
commit 0ed1be3acc

@ -1,16 +1,12 @@
from connectors.mysql_connector import MySQLConnector
from connectors.clickhouse_connector import ClickHouseConnector
from mappers.data_mapper import DataMapper
from services.sync_service import SyncService
from services.sync_service import SyncService, SyncType
from utils.logger import configure_logger
logger = configure_logger()
from config.db_config import MYSQL_CONFIG, CH_CONFIG
# 1 全量
# 2 增量
SYNC_FULL=1
SYNC_INCREMENTAL=2
def sync(tableName, sync_type_id):
# 初始化组件
@ -22,7 +18,6 @@ def sync(tableName, sync_type_id):
ch_conn.connect().execute(f"TRUNCATE TABLE {tableName}")
ch_conn.disconnect()
# 创建数据映射器(不再需要手动指定列)
mapper = DataMapper(mysql_conn, tableName)
@ -35,7 +30,8 @@ def sync(tableName, sync_type_id):
try:
# 同步数据
service.sync_data(batch_size=5000, table=tableName)
service.sync_data(batch_size=5000, table=tableName,
sync_type=sync_type_id)
except Exception as e:
logger.error(f"同步失败: {str(e)}", exc_info=True)
finally:
@ -45,4 +41,4 @@ def sync(tableName, sync_type_id):
if __name__ == "__main__":
# 要处理的表名
tableName = 't_equipment_charge_order'
sync(tableName, SYNC_FULL)
sync(tableName, SyncType.INCREMENTAL)

@ -9,27 +9,65 @@ from datetime import datetime
logger = logging.getLogger(__name__)
class SyncType:
FULL = 'full'
INCREMENTAL = 'incremental'
class SyncService:
def __init__(self, mysql_conn, ch_conn, mapper):
self.mysql_conn = mysql_conn
self.ch_conn = ch_conn
self.mapper = mapper
self.primary_key = 'id' # 主键字段名
self.columns = None
self.full_sync_interval = 24 * 3600 # 全量同步间隔
self.last_full_sync = 0
self.tracking_columns = ['id', 'update_ts'] # 新增跟踪字段
self.recent_check_count = 5000 # 近期数据检查量
self.optimize_frequency = 100 # 每100批优化一次
def sync_data(self, table, batch_size):
"""智能同步入口"""
current_time = time.time()
def create_ch_table(self, table):
"""创建ClickHouse表"""
with open(self.schema_path, 'r', encoding='utf-8') as f:
create_sql = f.read()
client = self.ch_conn.connect()
client.execute(f"DROP TABLE IF EXISTS {table}")
client.execute(create_sql)
logger.info(f"{table} 创建完成")
def _load_table_columns(self, table):
"""加载表的列信息"""
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}")
return [row[0] for row in result]
def _get_last_sync_point(self, table):
"""获取最后同步点时间戳和ID"""
try:
client = self.ch_conn.connect()
result = client.execute(
f"SELECT max(update_ts), max(id) FROM {table} "
f"WHERE update_ts = (SELECT max(update_ts) FROM {table})"
)
return result[0] or (datetime(1970, 1, 1), 0)
except Exception as e:
logger.error(f"获取同步点失败: {str(e)}")
return (datetime(1970, 1, 1), 0)
def sync_data(self, table, batch_size, sync_type):
"""
执行数据同步
:param table: 表名
:param batch_size: 批量大小
:param sync_type: 同步类型 SyncType.FULL SyncType.INCREMENTAL
"""
self.columns = self._load_table_columns(table)
if current_time - self.last_full_sync > self.full_sync_interval:
if sync_type == SyncType.FULL:
self._full_sync(table, batch_size)
self.last_full_sync = current_time
else:
elif sync_type == SyncType.INCREMENTAL:
self._incremental_sync(table, batch_size)
else:
raise ValueError(f"无效的同步类型: {sync_type},可选值:{SyncType.FULL}/{SyncType.INCREMENTAL}")
def _full_sync(self, table, batch_size):
"""全量数据同步"""
@ -81,61 +119,6 @@ class SyncService:
self._sync_recent_data(table, batch_size)
logger.info(f"✅ 增量同步{table}完成最后ID: {self._get_last_id_from_ch(table)}")
def _sync_recent_data(self, table, batch_size):
"""安全近期数据同步"""
logger.info(f"🔄 开始安全同步{table}近期数据")
mysql_max_id = self._get_mysql_max_id(table)
ch_max_id = self._get_last_id_from_ch(table)
safe_start = max(mysql_max_id - self.recent_check_count, ch_max_id + 1)
if safe_start > mysql_max_id:
logger.info("⏩ 无需近期数据同步")
return
with self.mysql_conn.connect() as mysql_conn:
try:
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
cursor.execute(
f"SELECT * FROM {table} "
f"WHERE id BETWEEN {safe_start} AND {mysql_max_id} "
f"ORDER BY id"
)
total = mysql_max_id - safe_start + 1
progress = tqdm(total=total, desc="近期数据", unit="rec")
batch = []
insert_count = 0
while True:
row = cursor.fetchone()
if not row:
break
mapped = self.mapper.map_row(self.columns, row)
batch.append(mapped)
if len(batch) >= batch_size:
self._insert_batch(batch, table)
insert_count += 1
progress.update(len(batch))
batch = []
if insert_count % self.optimize_frequency == 0:
self._optimize_table(table)
if batch:
self._insert_batch(batch, table)
progress.update(len(batch))
progress.close()
except Exception as e:
logger.error(f"近期数据同步失败: {str(e)}")
raise
logger.info(f"🆗 近期数据同步范围ID: {safe_start}-{mysql_max_id}")
def _sync_with_cursor(self, cursor, total, table, batch_size):
"""通用同步流程"""
progress = tqdm(total=total, desc="同步进度", unit="rec")
@ -145,8 +128,7 @@ class SyncService:
while True:
try:
# 每5分钟检查连接
if time.time() - last_success_time > 300:
if time.time() - last_success_time > 300: # 5分钟连接检查
cursor.connection.ping(reconnect=True)
last_success_time = time.time()
@ -180,8 +162,17 @@ class SyncService:
progress.close()
def _get_last_id_from_ch(self, table):
"""获取ClickHouse最大ID"""
try:
result = self.ch_conn.connect().execute(f"SELECT max(id) FROM {table}")
return result[0][0] or 0
except Exception as e:
logger.error(f"获取最大ID失败: {str(e)}")
return 0
def _insert_batch(self, batch, table):
"""安全批量插入"""
"""带内存监控的批量插入"""
mem = psutil.virtual_memory()
if mem.percent > 90:
logger.warning("🛑 内存使用超过90%暂停处理60秒")
@ -202,24 +193,56 @@ class SyncService:
)
except Exception as e:
logger.error(f"❌ 批量插入失败: {str(e)}")
# 可添加重试逻辑
# 可添加重试逻辑
def _optimize_table(self, table):
"""优化ClickHouse表"""
try:
self.ch_conn.connect().execute(f"OPTIMIZE TABLE {table} FINAL")
logger.debug(f"{table}优化完成")
except Exception as e:
logger.warning(f"表优化失败: {str(e)}")
def _sync_recent_data(self, table, batch_size):
"""安全版近期数据同步"""
logger.info(f"🔄 开始安全同步{table}近期数据")
def _get_last_id_from_ch(self, table):
"""获取ClickHouse最大ID"""
try:
result = self.ch_conn.connect().execute(f"SELECT max(id) FROM {table}")
return result[0][0] or 0
except Exception as e:
logger.error(f"获取最大ID失败: {str(e)}")
return 0
# 获取两端最大ID
mysql_max_id = self._get_mysql_max_id(table)
ch_max_id = self._get_last_id_from_ch(table)
# 计算安全起始ID
safe_start = max(mysql_max_id - self.recent_check_count, ch_max_id + 1)
if safe_start > mysql_max_id:
logger.info("⏩ 无需近期数据同步")
return
# 同步差异区间
with self.mysql_conn.connect() as mysql_conn:
cursor = mysql_conn.cursor(pymysql.cursors.SSCursor)
cursor.execute(
f"SELECT * FROM {table} "
f"WHERE id BETWEEN {safe_start} AND {mysql_max_id} "
f"ORDER BY id"
)
total = mysql_max_id - safe_start + 1
progress = tqdm(total=total, desc="近期数据", unit="rec")
batch = []
while True:
row = cursor.fetchone()
if not row:
break
mapped = self.mapper.map_row(self.columns, row)
batch.append(mapped)
if len(batch) >= batch_size:
self._insert_batch(batch, table)
progress.update(len(batch))
batch = []
if batch:
self._insert_batch(batch, table)
progress.update(len(batch))
progress.close()
logger.info(f"🆗 近期数据同步范围ID: {safe_start}-{mysql_max_id}")
def _get_mysql_max_id(self, table):
"""获取MySQL最大ID"""
@ -228,7 +251,10 @@ class SyncService:
cursor.execute(f"SELECT MAX(id) FROM {table}")
return cursor.fetchone()[0] or 0
def _load_table_columns(self, table):
"""加载表结构"""
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}")
return [row[0] for row in result]
def _optimize_table(self, table):
"""优化ClickHouse表"""
try:
self.ch_conn.connect().execute(f"OPTIMIZE TABLE {table} FINAL")
logger.debug(f"{table}优化完成")
except Exception as e:
logger.warning(f"表优化失败: {str(e)}")
Loading…
Cancel
Save