main
黄海 5 months ago
parent 0ed1be3acc
commit 92601f455b

@ -121,6 +121,8 @@ class SyncService:
def _sync_with_cursor(self, cursor, total, table, batch_size):
"""通用同步流程"""
# 根据数据量动态调整批次大小
dynamic_batch_size = max(min(total // 100, 5000), 1000)
progress = tqdm(total=total, desc="同步进度", unit="rec")
batch = []
insert_count = 0
@ -139,7 +141,7 @@ class SyncService:
mapped = self.mapper.map_row(self.columns, row)
batch.append(mapped)
if len(batch) >= batch_size:
if len(batch) >= dynamic_batch_size:
self._insert_batch(batch, table)
insert_count += 1
progress.update(len(batch))
@ -184,11 +186,11 @@ class SyncService:
batch,
types_check=True,
settings={
'date_time_input_format': 'best_effort',
'allow_experimental_analyzer': 0,
'input_format_null_as_default': 1,
'max_partitions_per_insert_block': 1000,
'optimize_on_insert': 1
'max_insert_block_size': 100000, # 增大插入块大小
'insert_deduplicate': 0, # 关闭插入时去重
'max_threads': 8, # 增加处理线程
'parallel_distributed_insert_select': 1,
'optimize_on_insert': 0 # 关闭插入时优化
}
)
except Exception as e:

Loading…
Cancel
Save