|
|
|
@ -1,4 +1,11 @@
|
|
|
|
|
import pymysql
|
|
|
|
|
from pymysql import OperationalError, InterfaceError
|
|
|
|
|
from tqdm import tqdm
|
|
|
|
|
import time
|
|
|
|
|
import psutil
|
|
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SyncService:
|
|
|
|
@ -6,70 +13,132 @@ class SyncService:
|
|
|
|
|
self.mysql_conn = mysql_conn
|
|
|
|
|
self.ch_conn = ch_conn
|
|
|
|
|
self.mapper = mapper
|
|
|
|
|
self.columns = None # 初始化为None,稍后加载
|
|
|
|
|
|
|
|
|
|
def get_table_schema(self, table):
|
|
|
|
|
"""获取表的字段类型信息"""
|
|
|
|
|
schema_query = f"""
|
|
|
|
|
SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE
|
|
|
|
|
FROM INFORMATION_SCHEMA.COLUMNS
|
|
|
|
|
WHERE TABLE_SCHEMA = DATABASE()
|
|
|
|
|
AND TABLE_NAME = '{table}'
|
|
|
|
|
"""
|
|
|
|
|
with self.mysql_conn.connect().cursor() as cursor:
|
|
|
|
|
cursor.execute(schema_query)
|
|
|
|
|
return {row[0]: (row[1], row[2]) for row in cursor.fetchall()}
|
|
|
|
|
def _load_table_columns(self,table):
|
|
|
|
|
self.primary_key = 'id' # 主键字段名
|
|
|
|
|
self.columns = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _load_table_columns(self, table):
|
|
|
|
|
"""加载表的列信息"""
|
|
|
|
|
result = self.ch_conn.connect().execute("DESCRIBE TABLE "+table)
|
|
|
|
|
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}")
|
|
|
|
|
return [row[0] for row in result]
|
|
|
|
|
|
|
|
|
|
# 同步数据
|
|
|
|
|
def _get_last_id_from_ch(self, table):
|
|
|
|
|
"""从ClickHouse获取最大ID"""
|
|
|
|
|
try:
|
|
|
|
|
client = self.ch_conn.connect()
|
|
|
|
|
result = client.execute(f"SELECT max({self.primary_key}) FROM {table}")
|
|
|
|
|
return result[0][0] or 0
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"获取最大ID失败: {str(e)}")
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
def sync_data(self, table, batch_size):
|
|
|
|
|
# 加载表的列信息
|
|
|
|
|
"""执行增量数据同步"""
|
|
|
|
|
# 加载列信息
|
|
|
|
|
self.columns = self._load_table_columns(table)
|
|
|
|
|
# 统计个数的连接
|
|
|
|
|
count_conn = self.mysql_conn.create_count_connection()
|
|
|
|
|
# 读取数据的连接
|
|
|
|
|
|
|
|
|
|
# 获取最后同步ID
|
|
|
|
|
last_id = self._get_last_id_from_ch(table)
|
|
|
|
|
logger.info(f"开始增量同步,起始ID: {last_id}")
|
|
|
|
|
|
|
|
|
|
# 使用同一个MySQL连接
|
|
|
|
|
mysql_conn = self.mysql_conn.connect()
|
|
|
|
|
|
|
|
|
|
with count_conn.cursor() as count_cursor:
|
|
|
|
|
count_cursor.execute("SELECT COUNT(*) FROM " + table)
|
|
|
|
|
total = count_cursor.fetchone()[0]
|
|
|
|
|
try:
|
|
|
|
|
# 统计需要同步的总数
|
|
|
|
|
with mysql_conn.cursor() as count_cursor:
|
|
|
|
|
count_query = f"""
|
|
|
|
|
SELECT COUNT(*)
|
|
|
|
|
FROM {table}
|
|
|
|
|
WHERE {self.primary_key} > {last_id}
|
|
|
|
|
"""
|
|
|
|
|
count_cursor.execute(count_query)
|
|
|
|
|
total = count_cursor.fetchone()[0]
|
|
|
|
|
logger.info(f"待同步数据量: {total} 条")
|
|
|
|
|
|
|
|
|
|
with mysql_conn.cursor() as cursor:
|
|
|
|
|
cursor.execute("SELECT * FROM " + table + " ORDER BY id")
|
|
|
|
|
progress = tqdm(total=total, desc="同步进度", unit="rec")
|
|
|
|
|
batch = []
|
|
|
|
|
# 使用流式游标读取数据
|
|
|
|
|
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
|
|
|
|
|
query = f"""
|
|
|
|
|
SELECT *
|
|
|
|
|
FROM {table}
|
|
|
|
|
WHERE {self.primary_key} > {last_id}
|
|
|
|
|
ORDER BY {self.primary_key}
|
|
|
|
|
"""
|
|
|
|
|
cursor.execute(query)
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
row = cursor.fetchone()
|
|
|
|
|
if not row:
|
|
|
|
|
break
|
|
|
|
|
progress = tqdm(total=total, desc="同步进度", unit="rec")
|
|
|
|
|
batch = []
|
|
|
|
|
current_max_id = last_id
|
|
|
|
|
last_success_time = time.time()
|
|
|
|
|
|
|
|
|
|
mapped = self.mapper.map_row(self.columns, row)
|
|
|
|
|
batch.append(mapped)
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
# 每5分钟检查连接
|
|
|
|
|
if time.time() - last_success_time > 300:
|
|
|
|
|
mysql_conn.ping(reconnect=True)
|
|
|
|
|
last_success_time = time.time()
|
|
|
|
|
|
|
|
|
|
if len(batch) >= batch_size:
|
|
|
|
|
self._insert_batch(batch,table)
|
|
|
|
|
row = cursor.fetchone()
|
|
|
|
|
if not row:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
# 更新当前最大ID
|
|
|
|
|
current_id = row[0]
|
|
|
|
|
if current_id > current_max_id:
|
|
|
|
|
current_max_id = current_id
|
|
|
|
|
|
|
|
|
|
# 映射数据
|
|
|
|
|
mapped = self.mapper.map_row(self.columns, row)
|
|
|
|
|
batch.append(mapped)
|
|
|
|
|
|
|
|
|
|
# 批量插入
|
|
|
|
|
if len(batch) >= batch_size:
|
|
|
|
|
self._insert_batch(batch, table)
|
|
|
|
|
progress.update(len(batch))
|
|
|
|
|
batch = []
|
|
|
|
|
last_success_time = time.time()
|
|
|
|
|
|
|
|
|
|
except (OperationalError, InterfaceError) as e:
|
|
|
|
|
logger.warning(f"连接中断,从ID {current_max_id} 恢复...")
|
|
|
|
|
mysql_conn.ping(reconnect=True)
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# 插入剩余数据
|
|
|
|
|
if batch:
|
|
|
|
|
self._insert_batch(batch, table)
|
|
|
|
|
progress.update(len(batch))
|
|
|
|
|
batch = []
|
|
|
|
|
|
|
|
|
|
if batch:
|
|
|
|
|
self._insert_batch(batch,table)
|
|
|
|
|
progress.update(len(batch))
|
|
|
|
|
progress.close()
|
|
|
|
|
logger.info(f"同步完成,最后同步ID: {current_max_id}")
|
|
|
|
|
|
|
|
|
|
progress.close()
|
|
|
|
|
finally:
|
|
|
|
|
if mysql_conn:
|
|
|
|
|
try:
|
|
|
|
|
mysql_conn.close()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"关闭连接时发生错误: {str(e)}")
|
|
|
|
|
|
|
|
|
|
def _insert_batch(self, batch, table):
|
|
|
|
|
"""批量插入数据到ClickHouse"""
|
|
|
|
|
self.ch_conn.connect().execute(
|
|
|
|
|
'INSERT INTO ' + table + ' VALUES',
|
|
|
|
|
batch,
|
|
|
|
|
types_check=True,
|
|
|
|
|
settings={
|
|
|
|
|
'date_time_input_format': 'best_effort',
|
|
|
|
|
'allow_experimental_analyzer': 0,
|
|
|
|
|
'input_format_null_as_default': 1
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
"""带内存监控的批量插入"""
|
|
|
|
|
# 内存检查
|
|
|
|
|
mem = psutil.virtual_memory()
|
|
|
|
|
if mem.percent > 90:
|
|
|
|
|
logger.warning("内存使用超过90%,暂停处理60秒")
|
|
|
|
|
time.sleep(60)
|
|
|
|
|
|
|
|
|
|
# 执行插入
|
|
|
|
|
try:
|
|
|
|
|
self.ch_conn.connect().execute(
|
|
|
|
|
f'INSERT INTO {table} VALUES',
|
|
|
|
|
batch,
|
|
|
|
|
types_check=True,
|
|
|
|
|
settings={
|
|
|
|
|
'date_time_input_format': 'best_effort',
|
|
|
|
|
'allow_experimental_analyzer': 0,
|
|
|
|
|
'input_format_null_as_default': 1
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"批量插入失败: {str(e)}")
|
|
|
|
|
# 可选:实现重试逻辑
|