You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

201 lines
7.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import pymysql
from pymysql import OperationalError, InterfaceError
from tqdm import tqdm
import time
import psutil
import logging
logger = logging.getLogger(__name__)
class SyncService:
def __init__(self, mysql_conn, ch_conn, mapper):
self.mysql_conn = mysql_conn
self.ch_conn = ch_conn
self.mapper = mapper
self.columns = None
self.full_sync_interval = 24 * 3600 # 24小时全量同步一次
self.last_full_sync = 0
self.recent_check_count = 5000 # 近期数据检查条数
def sync_data(self, table, batch_size):
"""智能同步策略"""
current_time = time.time()
self.columns = self._load_table_columns(table)
if current_time - self.last_full_sync > self.full_sync_interval:
self._full_sync(table, batch_size)
self.last_full_sync = current_time
else:
self._incremental_sync(table, batch_size)
def _full_sync(self, table, batch_size):
"""全量同步"""
logger.info(f"🚀 开始全量同步{table}")
mysql_conn = self.mysql_conn.connect()
try:
with mysql_conn.cursor() as count_cursor:
count_cursor.execute(f"SELECT COUNT(*) FROM {table}")
total = count_cursor.fetchone()[0]
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
cursor.execute(f"SELECT * FROM {table} ORDER BY id")
self._sync_with_cursor(cursor, total, table, batch_size)
except Exception as e:
logger.error(f"全量同步失败: {str(e)}")
finally:
mysql_conn.close()
logger.info(f"✅ 全量同步{table}完成")
def _incremental_sync(self, table, batch_size):
"""增量同步(含近期数据保障)"""
last_id = self._get_last_id_from_ch(table)
logger.info(f"🔁 开始增量同步{table}起始ID: {last_id}")
# 标准增量同步
mysql_conn = self.mysql_conn.connect()
try:
with mysql_conn.cursor() as count_cursor:
count_cursor.execute(
f"SELECT COUNT(*) FROM {table} WHERE id > {last_id}"
)
total = count_cursor.fetchone()[0]
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
cursor.execute(
f"SELECT * FROM {table} WHERE id > {last_id} ORDER BY id"
)
self._sync_with_cursor(cursor, total, table, batch_size)
finally:
mysql_conn.close()
# 近期数据保障同步
self._sync_recent_data(table, batch_size)
logger.info(f"✅ 增量同步{table}完成最后ID: {self._get_last_id_from_ch(table)}")
def _sync_recent_data(self, table, batch_size):
"""近期数据保障同步"""
logger.info(f"🔄 开始同步{table}最近{self.recent_check_count}条数据")
mysql_conn = self.mysql_conn.connect()
try:
with mysql_conn.cursor() as cursor:
cursor.execute(f"SELECT MAX(id) FROM {table}")
max_id = cursor.fetchone()[0] or 0
if max_id <= 0:
return
start_id = max(max_id - self.recent_check_count, 0)
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
cursor.execute(
f"SELECT * FROM {table} WHERE id >= {start_id} ORDER BY id"
)
total = max_id - start_id + 1
progress = tqdm(total=total, desc="近期数据", unit="rec")
batch = []
while True:
row = cursor.fetchone()
if not row:
break
mapped = self.mapper.map_row(self.columns, row)
batch.append(mapped)
if len(batch) >= batch_size:
self._insert_batch(batch, table)
progress.update(len(batch))
batch = []
if batch:
self._insert_batch(batch, table)
progress.update(len(batch))
progress.close()
except Exception as e:
logger.error(f"近期数据同步失败: {str(e)}")
finally:
mysql_conn.close()
logger.info(f"🆗 近期数据同步范围ID: {start_id}-{max_id}")
def _sync_with_cursor(self, cursor, total, table, batch_size):
"""通用同步流程"""
progress = tqdm(total=total, desc="同步进度", unit="rec")
batch = []
last_success_time = time.time()
while True:
try:
if time.time() - last_success_time > 300: # 5分钟连接检查
cursor.connection.ping(reconnect=True)
last_success_time = time.time()
row = cursor.fetchone()
if not row:
break
mapped = self.mapper.map_row(self.columns, row)
batch.append(mapped)
if len(batch) >= batch_size:
self._insert_batch(batch, table)
progress.update(len(batch))
batch = []
last_success_time = time.time()
except (OperationalError, InterfaceError) as e:
logger.warning("连接中断,尝试重新连接...")
cursor.connection.ping(reconnect=True)
time.sleep(5)
continue
if batch:
self._insert_batch(batch, table)
progress.update(len(batch))
progress.close()
def _get_last_id_from_ch(self, table):
"""获取ClickHouse最大ID"""
try:
result = self.ch_conn.connect().execute(f"SELECT max(id) FROM {table}")
return result[0][0] or 0
except Exception as e:
logger.error(f"获取最大ID失败: {str(e)}")
return 0
def _load_table_columns(self, table):
"""加载表的列信息"""
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}")
return [row[0] for row in result]
def _insert_batch(self, batch, table):
"""带内存监控的批量插入"""
mem = psutil.virtual_memory()
if mem.percent > 90:
logger.warning("内存使用超过90%暂停处理60秒")
time.sleep(60)
try:
self.ch_conn.connect().execute(
f'INSERT INTO {table} VALUES',
batch,
types_check=True,
settings={
'date_time_input_format': 'best_effort',
'allow_experimental_analyzer': 0,
'input_format_null_as_default': 1,
'max_partitions_per_insert_block': 500
}
)
except Exception as e:
logger.error(f"批量插入失败: {str(e)}")
# 可选添加重试逻辑