main
黄海 5 months ago
parent c59c68393b
commit 48aa4aea69

@ -13,42 +13,23 @@ def main():
mysql_conn = MySQLConnector(MYSQL_CONFIG)
ch_conn = ClickHouseConnector(CH_CONFIG)
mapper = DataMapper(
date_columns=[
'order_time', 'charge_begin_time', 'charge_end_time',
'pay_time', 'cancel_time', 'finish_time', 'report_time',
'create_time', 'update_time'
],
uint64_columns=[
'id', 'user_id', 'company_id', 'station_id', 'equipment_id',
'connector_id', 'user_coupon_id', 'invoice_id', 'charge_market_id',
'company_activity_id', 'charge_begin_soc', 'charge_end_soc',
'charge_cur_soc', 'charge_duration', 'charge_settle_type',
'order_type', 'pay_method', 'coupon_type', 'ticket_status',
'charge_discount_type', 'settle_method', 'settle_status',
'cancel_type', 'cancel_code', 'finish_type', 'finish_code',
'boot_mode', 'settlement_type', 'is_parking_coupon', 'exception_flag',
'has_shallow_report', 'has_deep_report', 'shallow_report_is_received',
'com_charge_discount_type', 'special_channel_order_source'
]
)
# 要处理的表名
tableName = 't_equipment_charge_order'
# 创建同步服务
service = SyncService(
mysql_conn=mysql_conn, ch_conn=ch_conn, mapper=mapper, schema_path="schemas/" + tableName + ".sql"
mysql_conn=mysql_conn, ch_conn=ch_conn, mapper=DataMapper(ch_conn, tableName),
schema_path="schemas/" + tableName + ".sql"
)
try:
# 创建ClickHouse表
service.create_ch_table()
# 同步数据
service.sync_data(batch_size=5000, table=tableName)
service.sync_data(table=tableName, batch_size=5000)
except Exception as e:
logger.error(f"同步失败: {str(e)}", exc_info=True)
finally:
ch_conn.disconnect()
if __name__ == "__main__":
main()

@ -1,12 +1,25 @@
from datetime import datetime
class DataMapper:
def __init__(self, date_columns, uint64_columns):
self.date_columns = date_columns
self.uint64_columns = uint64_columns
def __init__(self, ch_conn, table_name):
self.ch_conn = ch_conn
self.table_name = table_name
self.date_columns = []
self.uint64_columns = []
self.min_date = datetime(1970, 1, 1)
self.max_date = datetime(2105, 12, 31, 23, 59, 59)
self._load_columns()
def _load_columns(self):
"""从ClickHouse表中加载列信息并设置日期和无符号整数列"""
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {self.table_name}")
for row in result:
column_name = row[0]
column_type = row[1]
if 'DateTime' in column_type or 'Date' in column_type:
self.date_columns.append(column_name)
elif 'UInt' in column_type:
self.uint64_columns.append(column_name)
def map_row(self, columns, row):
row_dict = dict(zip(columns, row))

Binary file not shown.
Loading…
Cancel
Save