main
黄海 5 months ago
parent 2b7908798f
commit 3b41ed639d

@ -7,21 +7,20 @@ from utils.logger import configure_logger
logger = configure_logger()
from config.db_config import MYSQL_CONFIG, CH_CONFIG
# 1 全量
# 2 增量
SYNC_FULL=1
SYNC_INCREMENTAL=2
# type_id = 1 全量
# type_id = 2 增量
def sync(tableName, type_id):
def sync(tableName, sync_type_id):
# 初始化组件
mysql_conn = MySQLConnector(MYSQL_CONFIG)
ch_conn = ClickHouseConnector(CH_CONFIG)
if type_id == 1:
logger.info(f"开始全量同步表{tableName}")
if sync_type_id == 1:
# 清空目标表
ch_conn.connect().execute(f"TRUNCATE TABLE {tableName}")
ch_conn.disconnect()
else:
logger.info(f"开始增量同步表{tableName}")
# 创建数据映射器(不再需要手动指定列)
@ -46,4 +45,4 @@ def sync(tableName, type_id):
if __name__ == "__main__":
# 要处理的表名
tableName = 't_equipment_charge_order'
sync(tableName, 1)
sync(tableName, SYNC_FULL)

@ -13,121 +13,127 @@ class SyncService:
self.mysql_conn = mysql_conn
self.ch_conn = ch_conn
self.mapper = mapper
self.primary_key = 'id' # 主键字段名
self.columns = None
self.full_sync_interval = 24 * 3600 # 24小时全量同步一次
self.last_full_sync = 0
def sync_data(self, table, batch_size):
"""智能同步策略"""
current_time = time.time()
self.columns = self._load_table_columns(table)
def _load_table_columns(self, table):
"""加载表的列信息"""
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}")
return [row[0] for row in result]
if current_time - self.last_full_sync > self.full_sync_interval:
self._full_sync(table, batch_size)
self.last_full_sync = current_time
else:
self._incremental_sync_by_id(table, batch_size)
def _get_last_id_from_ch(self, table):
"""从ClickHouse获取最大ID"""
def _full_sync(self, table, batch_size):
"""全量同步"""
logger.info(f"开始全量同步{table}")
mysql_conn = self.mysql_conn.connect()
try:
client = self.ch_conn.connect()
result = client.execute(f"SELECT max({self.primary_key}) FROM {table}")
return result[0][0] or 0
except Exception as e:
logger.error(f"获取最大ID失败: {str(e)}")
return 0
with mysql_conn.cursor() as count_cursor:
count_cursor.execute(f"SELECT COUNT(*) FROM {table}")
total = count_cursor.fetchone()[0]
def sync_data(self, table, batch_size):
"""执行增量数据同步"""
# 加载列信息
self.columns = self._load_table_columns(table)
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
cursor.execute(f"SELECT * FROM {table} ORDER BY id")
self._sync_with_cursor(cursor, total, table, batch_size)
except Exception as e:
logger.error(f"全量同步失败: {str(e)}")
finally:
mysql_conn.close()
logger.info(f"全量同步{table}完成")
# 获取最后同步ID
def _incremental_sync_by_id(self, table, batch_size):
"""基于ID的增量同步"""
last_id = self._get_last_id_from_ch(table)
logger.info(f"开始增量同步{table}起始ID: {last_id}")
# 使用同一个MySQL连接
mysql_conn = self.mysql_conn.connect()
try:
# 统计需要同步的总数
with mysql_conn.cursor() as count_cursor:
count_query = f"""
SELECT COUNT(*)
FROM {table}
WHERE {self.primary_key} > {last_id}
"""
count_cursor.execute(count_query)
count_cursor.execute(
f"SELECT COUNT(*) FROM {table} WHERE id > {last_id}"
)
total = count_cursor.fetchone()[0]
logger.info(f"待同步数据量: {total}")
# 使用流式游标读取数据
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
query = f"""
SELECT *
FROM {table}
WHERE {self.primary_key} > {last_id}
ORDER BY {self.primary_key}
"""
cursor.execute(query)
progress = tqdm(total=total, desc="同步进度", unit="rec")
batch = []
current_max_id = last_id
last_success_time = time.time()
while True:
try:
# 每5分钟检查连接
if time.time() - last_success_time > 300:
mysql_conn.ping(reconnect=True)
last_success_time = time.time()
row = cursor.fetchone()
if not row:
break
# 更新当前最大ID
current_id = row[0]
if current_id > current_max_id:
current_max_id = current_id
# 映射数据
mapped = self.mapper.map_row(self.columns, row)
batch.append(mapped)
# 批量插入
if len(batch) >= batch_size:
self._insert_batch(batch, table)
progress.update(len(batch))
batch = []
last_success_time = time.time()
except (OperationalError, InterfaceError) as e:
logger.warning(f"连接中断从ID {current_max_id} 恢复...")
mysql_conn.ping(reconnect=True)
time.sleep(5)
continue
# 插入剩余数据
if batch:
cursor.execute(
f"SELECT * FROM {table} WHERE id > {last_id} ORDER BY id"
)
self._sync_with_cursor(cursor, total, table, batch_size)
except Exception as e:
logger.error(f"增量同步失败: {str(e)}")
finally:
mysql_conn.close()
logger.info(f"增量同步{table}完成最后ID: {self._get_last_id_from_ch(table)}")
def _sync_with_cursor(self, cursor, total, table, batch_size):
"""通用同步流程"""
progress = tqdm(total=total, desc="同步进度", unit="rec")
batch = []
last_success_time = time.time()
while True:
try:
# 每5分钟检查连接
if time.time() - last_success_time > 300:
cursor.connection.ping(reconnect=True)
last_success_time = time.time()
row = cursor.fetchone()
if not row:
break
mapped = self.mapper.map_row(self.columns, row)
batch.append(mapped)
if len(batch) >= batch_size:
self._insert_batch(batch, table)
progress.update(len(batch))
batch = []
last_success_time = time.time()
progress.close()
logger.info(f"同步完成最后同步ID: {current_max_id}")
except (OperationalError, InterfaceError) as e:
logger.warning(f"连接中断,尝试重新连接...")
cursor.connection.ping(reconnect=True)
time.sleep(5)
continue
finally:
if mysql_conn:
try:
mysql_conn.close()
except Exception as e:
logger.error(f"关闭连接时发生错误: {str(e)}")
if batch:
self._insert_batch(batch, table)
progress.update(len(batch))
progress.close()
def _get_last_id_from_ch(self, table):
"""获取ClickHouse最大ID"""
try:
result = self.ch_conn.connect().execute(
f"SELECT max(id) FROM {table}"
)
return result[0][0] or 0
except Exception as e:
logger.error(f"获取最大ID失败: {str(e)}")
return 0
def _load_table_columns(self, table):
"""加载表的列信息"""
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}")
return [row[0] for row in result]
def _insert_batch(self, batch, table):
"""带内存监控的批量插入"""
# 内存检查
mem = psutil.virtual_memory()
if mem.percent > 90:
logger.warning("内存使用超过90%暂停处理60秒")
time.sleep(60)
# 执行插入
try:
self.ch_conn.connect().execute(
f'INSERT INTO {table} VALUES',

Loading…
Cancel
Save