You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

75 lines
2.5 KiB

5 months ago
from tqdm import tqdm
5 months ago
5 months ago
class SyncService:
def __init__(self, mysql_conn, ch_conn, mapper, schema_path):
self.mysql_conn = mysql_conn
self.ch_conn = ch_conn
self.mapper = mapper
self.schema_path = schema_path
self.columns = None # 初始化为None稍后加载
5 months ago
def create_ch_table(self,table):
5 months ago
"""创建ClickHouse表"""
with open(self.schema_path, 'r', encoding='utf-8') as f:
create_sql = f.read()
client = self.ch_conn.connect()
5 months ago
client.execute("DROP TABLE IF EXISTS "+table) # 删除表(如果存在)
5 months ago
client.execute(create_sql) # 创建新表
5 months ago
def load_table_columns(self,table):
5 months ago
"""加载表的列信息"""
5 months ago
result = self.ch_conn.connect().execute("DESCRIBE TABLE "+table)
5 months ago
return [row[0] for row in result]
5 months ago
# 同步数据
def sync_data(self, table, batch_size):
5 months ago
# 加载表的列信息
5 months ago
self.columns = self.load_table_columns(table)
5 months ago
# 统计个数的连接
5 months ago
count_conn = self.mysql_conn.create_count_connection()
5 months ago
# 读取数据的连接
5 months ago
mysql_conn = self.mysql_conn.connect()
with count_conn.cursor() as count_cursor:
5 months ago
count_cursor.execute("SELECT COUNT(*) FROM " + table)
5 months ago
total = count_cursor.fetchone()[0]
with mysql_conn.cursor() as cursor:
5 months ago
cursor.execute("SELECT * FROM " + table + " ORDER BY id")
5 months ago
progress = tqdm(total=total, desc="同步进度", unit="rec")
batch = []
while True:
row = cursor.fetchone()
if not row:
break
mapped = self.mapper.map_row(self.columns, row)
batch.append(mapped)
if len(batch) >= batch_size:
5 months ago
self._insert_batch(batch,table)
5 months ago
progress.update(len(batch))
batch = []
if batch:
5 months ago
self._insert_batch(batch,table)
5 months ago
progress.update(len(batch))
progress.close()
5 months ago
def _insert_batch(self, batch, table):
5 months ago
"""批量插入数据到ClickHouse"""
self.ch_conn.connect().execute(
5 months ago
'INSERT INTO ' + table + ' VALUES',
5 months ago
batch,
types_check=True,
settings={
'date_time_input_format': 'best_effort',
'allow_experimental_analyzer': 0,
'input_format_null_as_default': 1
}
5 months ago
)