You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

264 lines
10 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import pymysql
from pymysql import OperationalError, InterfaceError
from tqdm import tqdm
import time
import psutil
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class SyncType:
FULL = 'full'
INCREMENTAL = 'incremental'
class SyncService:
def __init__(self, mysql_conn, ch_conn, mapper):
self.mysql_conn = mysql_conn
self.ch_conn = ch_conn
self.mapper = mapper
self.primary_key = 'id' # 主键字段名
self.columns = None
self.tracking_columns = ['id', 'update_ts'] # 新增跟踪字段
self.recent_check_count = 5000 # 近期数据检查量
self.optimize_frequency = 100 # 每100批优化一次
def create_ch_table(self, table):
"""创建ClickHouse表"""
with open(self.schema_path, 'r', encoding='utf-8') as f:
create_sql = f.read()
client = self.ch_conn.connect()
client.execute(f"DROP TABLE IF EXISTS {table}")
client.execute(create_sql)
logger.info(f"{table} 创建完成")
def _load_table_columns(self, table):
"""加载表的列信息"""
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}")
return [row[0] for row in result]
def _get_last_sync_point(self, table):
"""获取最后同步点时间戳和ID"""
try:
client = self.ch_conn.connect()
result = client.execute(
f"SELECT max(update_ts), max(id) FROM {table} "
f"WHERE update_ts = (SELECT max(update_ts) FROM {table})"
)
return result[0] or (datetime(1970, 1, 1), 0)
except Exception as e:
logger.error(f"获取同步点失败: {str(e)}")
return (datetime(1970, 1, 1), 0)
def sync_data(self, table, batch_size, sync_type):
"""
执行数据同步
:param table: 表名
:param batch_size: 批量大小
:param sync_type: 同步类型 SyncType.FULL 或 SyncType.INCREMENTAL
"""
self.columns = self._load_table_columns(table)
if sync_type == SyncType.FULL:
self._full_sync(table, batch_size)
elif sync_type == SyncType.INCREMENTAL:
self._incremental_sync(table, batch_size)
else:
raise ValueError(f"无效的同步类型: {sync_type},可选值:{SyncType.FULL}/{SyncType.INCREMENTAL}")
def _full_sync(self, table, batch_size):
"""全量数据同步"""
logger.info(f"🚀 开始全量同步{table}")
with self.mysql_conn.connect() as mysql_conn:
try:
with mysql_conn.cursor() as count_cursor:
count_cursor.execute(f"SELECT COUNT(*) FROM {table}")
total = count_cursor.fetchone()[0]
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
cursor.execute(f"SELECT * FROM {table} ORDER BY id")
self._sync_with_cursor(cursor, total, table, batch_size)
except Exception as e:
logger.error(f"全量同步失败: {str(e)}")
raise
finally:
self._optimize_table(table)
logger.info(f"✅ 全量同步{table}完成")
def _incremental_sync(self, table, batch_size):
"""增量数据同步"""
last_id = self._get_last_id_from_ch(table)
logger.info(f"🔁 开始增量同步{table}起始ID: {last_id}")
# 标准增量同步
with self.mysql_conn.connect() as mysql_conn:
try:
with mysql_conn.cursor() as count_cursor:
count_cursor.execute(
f"SELECT COUNT(*) FROM {table} WHERE id > {last_id}"
)
total = count_cursor.fetchone()[0]
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
cursor.execute(
f"SELECT * FROM {table} WHERE id > {last_id} ORDER BY id"
)
self._sync_with_cursor(cursor, total, table, batch_size)
except Exception as e:
logger.error(f"增量同步失败: {str(e)}")
raise
# 安全近期数据同步
self._sync_recent_data(table, batch_size)
logger.info(f"✅ 增量同步{table}完成最后ID: {self._get_last_id_from_ch(table)}")
def _sync_with_cursor(self, cursor, total, table, batch_size):
"""通用同步流程"""
# 根据数据量动态调整批次大小
dynamic_batch_size = max(min(total // 100, 5000), 1000)
progress = tqdm(total=total, desc="同步进度", unit="rec")
batch = []
insert_count = 0
last_success_time = time.time()
while True:
try:
if time.time() - last_success_time > 300: # 5分钟连接检查
cursor.connection.ping(reconnect=True)
last_success_time = time.time()
row = cursor.fetchone()
if not row:
break
mapped = self.mapper.map_row(self.columns, row)
batch.append(mapped)
if len(batch) >= dynamic_batch_size:
self._insert_batch(batch, table)
insert_count += 1
progress.update(len(batch))
logger.info(f"已处理 {progress.n}/{total} 条记录") # 记录进度
batch = []
last_success_time = time.time()
# 定期优化表
if insert_count % self.optimize_frequency == 0:
self._optimize_table(table)
except (OperationalError, InterfaceError) as e:
logger.warning("⚠️ 连接中断,尝试重新连接...")
cursor.connection.ping(reconnect=True)
time.sleep(5)
continue
if batch:
self._insert_batch(batch, table)
progress.update(len(batch))
logger.info(f"已处理 {progress.n}/{total} 条记录") # 记录进度
progress.close()
def _get_last_id_from_ch(self, table):
"""获取ClickHouse最大ID"""
try:
result = self.ch_conn.connect().execute(f"SELECT max(id) FROM {table}")
return result[0][0] or 0
except Exception as e:
logger.error(f"获取最大ID失败: {str(e)}")
return 0
def _insert_batch(self, batch, table):
"""带内存监控的批量插入"""
mem = psutil.virtual_memory()
if mem.percent > 90:
logger.warning("🛑 内存使用超过90%暂停处理60秒")
time.sleep(60)
try:
self.ch_conn.connect().execute(
f'INSERT INTO {table} VALUES',
batch,
types_check=True,
settings={
'max_insert_block_size': 100000, # 增大插入块大小
'insert_deduplicate': 0, # 关闭插入时去重
'max_threads': 8, # 增加处理线程
'parallel_distributed_insert_select': 1,
'optimize_on_insert': 0 # 关闭插入时优化
}
)
except Exception as e:
logger.error(f"❌ 批量插入失败: {str(e)}")
# 可选添加重试逻辑
def _sync_recent_data(self, table, batch_size):
"""安全版近期数据同步"""
logger.info(f"🔄 开始安全同步{table}近期数据")
# 获取两端最大ID
mysql_max_id = self._get_mysql_max_id(table)
ch_max_id = self._get_last_id_from_ch(table)
# 计算安全起始ID
safe_start = max(mysql_max_id - self.recent_check_count, ch_max_id + 1)
if safe_start > mysql_max_id:
logger.info("⏩ 无需近期数据同步")
return
# 同步差异区间
with self.mysql_conn.connect() as mysql_conn:
cursor = mysql_conn.cursor(pymysql.cursors.SSCursor)
cursor.execute(
f"SELECT * FROM {table} "
f"WHERE id BETWEEN {safe_start} AND {mysql_max_id} "
f"ORDER BY id"
)
total = mysql_max_id - safe_start + 1
progress = tqdm(total=total, desc="近期数据", unit="rec")
batch = []
while True:
row = cursor.fetchone()
if not row:
break
mapped = self.mapper.map_row(self.columns, row)
batch.append(mapped)
if len(batch) >= batch_size:
self._insert_batch(batch, table)
progress.update(len(batch))
logger.info(f"已处理 {progress.n}/{total} 条记录") # 记录进度
batch = []
if batch:
self._insert_batch(batch, table)
progress.update(len(batch))
logger.info(f"已处理 {progress.n}/{total} 条记录") # 记录进度
progress.close()
logger.info(f"🆗 近期数据同步范围ID: {safe_start}-{mysql_max_id}")
def _get_mysql_max_id(self, table):
"""获取MySQL最大ID"""
with self.mysql_conn.connect() as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT MAX(id) FROM {table}")
return cursor.fetchone()[0] or 0
def _optimize_table(self, table):
"""优化ClickHouse表"""
try:
self.ch_conn.connect().execute(f"OPTIMIZE TABLE {table} FINAL")
logger.debug(f"{table}优化完成")
except Exception as e:
logger.warning(f"表优化失败: {str(e)}")