|
|
import pymysql
|
|
|
from pymysql import OperationalError, InterfaceError
|
|
|
from tqdm import tqdm
|
|
|
import time
|
|
|
import psutil
|
|
|
import logging
|
|
|
from datetime import datetime
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SyncType:
|
|
|
FULL = 'full'
|
|
|
INCREMENTAL = 'incremental'
|
|
|
|
|
|
class SyncService:
|
|
|
def __init__(self, mysql_conn, ch_conn, mapper):
|
|
|
self.mysql_conn = mysql_conn
|
|
|
self.ch_conn = ch_conn
|
|
|
self.mapper = mapper
|
|
|
self.primary_key = 'id' # 主键字段名
|
|
|
self.columns = None
|
|
|
self.tracking_columns = ['id', 'update_ts'] # 新增跟踪字段
|
|
|
self.recent_check_count = 5000 # 近期数据检查量
|
|
|
self.optimize_frequency = 100 # 每100批优化一次
|
|
|
|
|
|
def create_ch_table(self, table):
|
|
|
"""创建ClickHouse表"""
|
|
|
with open(self.schema_path, 'r', encoding='utf-8') as f:
|
|
|
create_sql = f.read()
|
|
|
|
|
|
client = self.ch_conn.connect()
|
|
|
client.execute(f"DROP TABLE IF EXISTS {table}")
|
|
|
client.execute(create_sql)
|
|
|
logger.info(f"表 {table} 创建完成")
|
|
|
|
|
|
def _load_table_columns(self, table):
|
|
|
"""加载表的列信息"""
|
|
|
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}")
|
|
|
return [row[0] for row in result]
|
|
|
|
|
|
def _get_last_sync_point(self, table):
|
|
|
"""获取最后同步点(时间戳和ID)"""
|
|
|
try:
|
|
|
client = self.ch_conn.connect()
|
|
|
result = client.execute(
|
|
|
f"SELECT max(update_ts), max(id) FROM {table} "
|
|
|
f"WHERE update_ts = (SELECT max(update_ts) FROM {table})"
|
|
|
)
|
|
|
return result[0] or (datetime(1970, 1, 1), 0)
|
|
|
except Exception as e:
|
|
|
logger.error(f"获取同步点失败: {str(e)}")
|
|
|
return (datetime(1970, 1, 1), 0)
|
|
|
|
|
|
def sync_data(self, table, batch_size, sync_type):
|
|
|
"""
|
|
|
执行数据同步
|
|
|
:param table: 表名
|
|
|
:param batch_size: 批量大小
|
|
|
:param sync_type: 同步类型 SyncType.FULL 或 SyncType.INCREMENTAL
|
|
|
"""
|
|
|
self.columns = self._load_table_columns(table)
|
|
|
|
|
|
if sync_type == SyncType.FULL:
|
|
|
self._full_sync(table, batch_size)
|
|
|
elif sync_type == SyncType.INCREMENTAL:
|
|
|
self._incremental_sync(table, batch_size)
|
|
|
else:
|
|
|
raise ValueError(f"无效的同步类型: {sync_type},可选值:{SyncType.FULL}/{SyncType.INCREMENTAL}")
|
|
|
|
|
|
def _full_sync(self, table, batch_size):
|
|
|
"""全量数据同步"""
|
|
|
logger.info(f"🚀 开始全量同步{table}")
|
|
|
|
|
|
with self.mysql_conn.connect() as mysql_conn:
|
|
|
try:
|
|
|
with mysql_conn.cursor() as count_cursor:
|
|
|
count_cursor.execute(f"SELECT COUNT(*) FROM {table}")
|
|
|
total = count_cursor.fetchone()[0]
|
|
|
|
|
|
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
|
|
|
cursor.execute(f"SELECT * FROM {table} ORDER BY id")
|
|
|
self._sync_with_cursor(cursor, total, table, batch_size)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"全量同步失败: {str(e)}")
|
|
|
raise
|
|
|
finally:
|
|
|
self._optimize_table(table)
|
|
|
|
|
|
logger.info(f"✅ 全量同步{table}完成")
|
|
|
|
|
|
def _incremental_sync(self, table, batch_size):
|
|
|
"""增量数据同步"""
|
|
|
last_id = self._get_last_id_from_ch(table)
|
|
|
logger.info(f"🔁 开始增量同步{table},起始ID: {last_id}")
|
|
|
|
|
|
# 标准增量同步
|
|
|
with self.mysql_conn.connect() as mysql_conn:
|
|
|
try:
|
|
|
with mysql_conn.cursor() as count_cursor:
|
|
|
count_cursor.execute(
|
|
|
f"SELECT COUNT(*) FROM {table} WHERE id > {last_id}"
|
|
|
)
|
|
|
total = count_cursor.fetchone()[0]
|
|
|
|
|
|
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
|
|
|
cursor.execute(
|
|
|
f"SELECT * FROM {table} WHERE id > {last_id} ORDER BY id"
|
|
|
)
|
|
|
self._sync_with_cursor(cursor, total, table, batch_size)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"增量同步失败: {str(e)}")
|
|
|
raise
|
|
|
|
|
|
# 安全近期数据同步
|
|
|
self._sync_recent_data(table, batch_size)
|
|
|
logger.info(f"✅ 增量同步{table}完成,最后ID: {self._get_last_id_from_ch(table)}")
|
|
|
|
|
|
def _sync_with_cursor(self, cursor, total, table, batch_size):
|
|
|
"""通用同步流程"""
|
|
|
# 根据数据量动态调整批次大小
|
|
|
dynamic_batch_size = max(min(total // 100, 5000), 1000)
|
|
|
progress = tqdm(total=total, desc="同步进度", unit="rec")
|
|
|
batch = []
|
|
|
insert_count = 0
|
|
|
last_success_time = time.time()
|
|
|
|
|
|
while True:
|
|
|
try:
|
|
|
if time.time() - last_success_time > 300: # 5分钟连接检查
|
|
|
cursor.connection.ping(reconnect=True)
|
|
|
last_success_time = time.time()
|
|
|
|
|
|
row = cursor.fetchone()
|
|
|
if not row:
|
|
|
break
|
|
|
|
|
|
mapped = self.mapper.map_row(self.columns, row)
|
|
|
batch.append(mapped)
|
|
|
|
|
|
if len(batch) >= dynamic_batch_size:
|
|
|
self._insert_batch(batch, table)
|
|
|
insert_count += 1
|
|
|
progress.update(len(batch))
|
|
|
logger.info(f"已处理 {progress.n}/{total} 条记录") # 记录进度
|
|
|
batch = []
|
|
|
last_success_time = time.time()
|
|
|
|
|
|
# 定期优化表
|
|
|
if insert_count % self.optimize_frequency == 0:
|
|
|
self._optimize_table(table)
|
|
|
|
|
|
except (OperationalError, InterfaceError) as e:
|
|
|
logger.warning("⚠️ 连接中断,尝试重新连接...")
|
|
|
cursor.connection.ping(reconnect=True)
|
|
|
time.sleep(5)
|
|
|
continue
|
|
|
|
|
|
if batch:
|
|
|
self._insert_batch(batch, table)
|
|
|
progress.update(len(batch))
|
|
|
logger.info(f"已处理 {progress.n}/{total} 条记录") # 记录进度
|
|
|
|
|
|
progress.close()
|
|
|
|
|
|
def _get_last_id_from_ch(self, table):
|
|
|
"""获取ClickHouse最大ID"""
|
|
|
try:
|
|
|
result = self.ch_conn.connect().execute(f"SELECT max(id) FROM {table}")
|
|
|
return result[0][0] or 0
|
|
|
except Exception as e:
|
|
|
logger.error(f"获取最大ID失败: {str(e)}")
|
|
|
return 0
|
|
|
|
|
|
def _insert_batch(self, batch, table):
|
|
|
"""带内存监控的批量插入"""
|
|
|
mem = psutil.virtual_memory()
|
|
|
if mem.percent > 90:
|
|
|
logger.warning("🛑 内存使用超过90%,暂停处理60秒")
|
|
|
time.sleep(60)
|
|
|
|
|
|
try:
|
|
|
self.ch_conn.connect().execute(
|
|
|
f'INSERT INTO {table} VALUES',
|
|
|
batch,
|
|
|
types_check=True,
|
|
|
settings={
|
|
|
'max_insert_block_size': 100000, # 增大插入块大小
|
|
|
'insert_deduplicate': 0, # 关闭插入时去重
|
|
|
'max_threads': 8, # 增加处理线程
|
|
|
'parallel_distributed_insert_select': 1,
|
|
|
'optimize_on_insert': 0 # 关闭插入时优化
|
|
|
}
|
|
|
)
|
|
|
except Exception as e:
|
|
|
logger.error(f"❌ 批量插入失败: {str(e)}")
|
|
|
# 可选添加重试逻辑
|
|
|
|
|
|
def _sync_recent_data(self, table, batch_size):
|
|
|
"""安全版近期数据同步"""
|
|
|
logger.info(f"🔄 开始安全同步{table}近期数据")
|
|
|
|
|
|
# 获取两端最大ID
|
|
|
mysql_max_id = self._get_mysql_max_id(table)
|
|
|
ch_max_id = self._get_last_id_from_ch(table)
|
|
|
|
|
|
# 计算安全起始ID
|
|
|
safe_start = max(mysql_max_id - self.recent_check_count, ch_max_id + 1)
|
|
|
|
|
|
if safe_start > mysql_max_id:
|
|
|
logger.info("⏩ 无需近期数据同步")
|
|
|
return
|
|
|
|
|
|
# 同步差异区间
|
|
|
with self.mysql_conn.connect() as mysql_conn:
|
|
|
cursor = mysql_conn.cursor(pymysql.cursors.SSCursor)
|
|
|
cursor.execute(
|
|
|
f"SELECT * FROM {table} "
|
|
|
f"WHERE id BETWEEN {safe_start} AND {mysql_max_id} "
|
|
|
f"ORDER BY id"
|
|
|
)
|
|
|
|
|
|
total = mysql_max_id - safe_start + 1
|
|
|
progress = tqdm(total=total, desc="近期数据", unit="rec")
|
|
|
batch = []
|
|
|
|
|
|
while True:
|
|
|
row = cursor.fetchone()
|
|
|
if not row:
|
|
|
break
|
|
|
|
|
|
mapped = self.mapper.map_row(self.columns, row)
|
|
|
batch.append(mapped)
|
|
|
|
|
|
if len(batch) >= batch_size:
|
|
|
self._insert_batch(batch, table)
|
|
|
progress.update(len(batch))
|
|
|
logger.info(f"已处理 {progress.n}/{total} 条记录") # 记录进度
|
|
|
batch = []
|
|
|
|
|
|
if batch:
|
|
|
self._insert_batch(batch, table)
|
|
|
progress.update(len(batch))
|
|
|
logger.info(f"已处理 {progress.n}/{total} 条记录") # 记录进度
|
|
|
|
|
|
progress.close()
|
|
|
|
|
|
logger.info(f"🆗 近期数据同步范围ID: {safe_start}-{mysql_max_id}")
|
|
|
|
|
|
def _get_mysql_max_id(self, table):
|
|
|
"""获取MySQL最大ID"""
|
|
|
with self.mysql_conn.connect() as conn:
|
|
|
with conn.cursor() as cursor:
|
|
|
cursor.execute(f"SELECT MAX(id) FROM {table}")
|
|
|
return cursor.fetchone()[0] or 0
|
|
|
|
|
|
def _optimize_table(self, table):
|
|
|
"""优化ClickHouse表"""
|
|
|
try:
|
|
|
self.ch_conn.connect().execute(f"OPTIMIZE TABLE {table} FINAL")
|
|
|
logger.debug(f"表{table}优化完成")
|
|
|
except Exception as e:
|
|
|
logger.warning(f"表优化失败: {str(e)}") |