main
黄海 5 months ago
parent a9da6c1cb4
commit 1437f51e9f

@ -5,11 +5,12 @@ class MySQLConnector:
def __init__(self, config):
self.config = config
# 读取数据的连接
def connect(self):
return pymysql.connect(
cursorclass=pymysql.cursors.SSCursor,
**self.config
)
# 统计个数的连接
def create_count_connection(self):
return pymysql.connect(**self.config)

@ -40,12 +40,12 @@ def main():
mapper=mapper,
schema_path="schemas/t_equipment_charge_order.sql"
)
tableName = 't_equipment_charge_order'
try:
# 创建ClickHouse表
service.create_ch_table()
# 同步数据
service.sync_data()
service.sync_data(batch_size=5000, table=tableName)
except Exception as e:
logger.error(f"同步失败: {str(e)}", exc_info=True)
finally:

@ -1,5 +1,6 @@
from tqdm import tqdm
class SyncService:
def __init__(self, mysql_conn, ch_conn, mapper, schema_path):
self.mysql_conn = mysql_conn
@ -22,19 +23,21 @@ class SyncService:
result = self.ch_conn.connect().execute("DESCRIBE TABLE t_equipment_charge_order")
return [row[0] for row in result]
def sync_data(self, batch_size=5000):
# 同步数据
def sync_data(self, table, batch_size):
# 加载表的列信息
self.columns = self._load_table_columns()
# 统计个数的连接
count_conn = self.mysql_conn.create_count_connection()
# 读取数据的连接
mysql_conn = self.mysql_conn.connect()
with count_conn.cursor() as count_cursor:
count_cursor.execute("SELECT COUNT(*) FROM t_equipment_charge_order")
count_cursor.execute("SELECT COUNT(*) FROM " + table)
total = count_cursor.fetchone()[0]
with mysql_conn.cursor() as cursor:
cursor.execute("SELECT * FROM t_equipment_charge_order ORDER BY id")
cursor.execute("SELECT * FROM " + table + " ORDER BY id")
progress = tqdm(total=total, desc="同步进度", unit="rec")
batch = []
@ -47,20 +50,20 @@ class SyncService:
batch.append(mapped)
if len(batch) >= batch_size:
self._insert_batch(batch)
self._insert_batch(batch,table)
progress.update(len(batch))
batch = []
if batch:
self._insert_batch(batch)
self._insert_batch(batch,table)
progress.update(len(batch))
progress.close()
def _insert_batch(self, batch):
def _insert_batch(self, batch, table):
"""批量插入数据到ClickHouse"""
self.ch_conn.connect().execute(
'INSERT INTO t_equipment_charge_order VALUES',
'INSERT INTO ' + table + ' VALUES',
batch,
types_check=True,
settings={
@ -68,4 +71,4 @@ class SyncService:
'allow_experimental_analyzer': 0,
'input_format_null_as_default': 1
}
)
)

Loading…
Cancel
Save