You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

66 lines
2.2 KiB

from tqdm import tqdm
class SyncService:
def __init__(self, mysql_conn, ch_conn, mapper, schema_path):
self.mysql_conn = mysql_conn
self.ch_conn = ch_conn
self.mapper = mapper
self.schema_path = schema_path
self.columns = self._load_table_columns()
def _load_table_columns(self):
result = self.ch_conn.connect().execute("DESCRIBE TABLE t_equipment_charge_order")
return [row[0] for row in result]
def create_ch_table(self):
with open(self.schema_path, 'r', encoding='utf-8') as f:
create_sql = f.read()
client = self.ch_conn.connect()
client.execute("DROP TABLE IF EXISTS t_equipment_charge_order")
client.execute(create_sql)
def sync_data(self, batch_size=5000):
count_conn = self.mysql_conn.create_count_connection()
mysql_conn = self.mysql_conn.connect()
with count_conn.cursor() as count_cursor:
count_cursor.execute("SELECT COUNT(*) FROM t_equipment_charge_order")
total = count_cursor.fetchone()[0]
with mysql_conn.cursor() as cursor:
cursor.execute("SELECT * FROM t_equipment_charge_order ORDER BY id")
progress = tqdm(total=total, desc="同步进度", unit="rec")
batch = []
while True:
row = cursor.fetchone()
if not row:
break
mapped = self.mapper.map_row(self.columns, row)
batch.append(mapped)
if len(batch) >= batch_size:
self._insert_batch(batch)
progress.update(len(batch))
batch = []
if batch:
self._insert_batch(batch)
progress.update(len(batch))
progress.close()
def _insert_batch(self, batch):
self.ch_conn.connect().execute(
'INSERT INTO t_equipment_charge_order VALUES',
batch,
types_check=True,
settings={
'date_time_input_format': 'best_effort',
'allow_experimental_analyzer': 0,
'input_format_null_as_default': 1
}
)