main
黄海 5 months ago
parent 8443387a6e
commit fbc1b7083b

Binary file not shown.

@ -13,31 +13,17 @@ def main():
mysql_conn = MySQLConnector(MYSQL_CONFIG)
ch_conn = ClickHouseConnector(CH_CONFIG)
mapper = DataMapper(
date_columns=[
'order_time', 'charge_begin_time', 'charge_end_time',
'pay_time', 'cancel_time', 'finish_time', 'report_time',
'create_time', 'update_time'
],
uint64_columns=[
'id', 'user_id', 'company_id', 'station_id', 'equipment_id',
'connector_id', 'user_coupon_id', 'invoice_id', 'charge_market_id',
'company_activity_id', 'charge_begin_soc', 'charge_end_soc',
'charge_cur_soc', 'charge_duration', 'charge_settle_type',
'order_type', 'pay_method', 'coupon_type', 'ticket_status',
'charge_discount_type', 'settle_method', 'settle_status',
'cancel_type', 'cancel_code', 'finish_type', 'finish_code',
'boot_mode', 'settlement_type', 'is_parking_coupon', 'exception_flag',
'has_shallow_report', 'has_deep_report', 'shallow_report_is_received',
'com_charge_discount_type', 'special_channel_order_source'
]
)
# 要处理的表名
tableName = 't_equipment_charge_order'
#tableName="t_station"
# 创建数据映射器(不再需要手动指定列)
mapper = DataMapper(mysql_conn, tableName)
# 创建同步服务
service = SyncService(
mysql_conn=mysql_conn, ch_conn=ch_conn, mapper=mapper, schema_path="schemas/" + tableName + ".sql"
mysql_conn=mysql_conn,
ch_conn=ch_conn,
mapper=mapper,
schema_path="schemas/" + tableName + ".sql"
)
try:

@ -1,13 +1,40 @@
from datetime import datetime
from utils.logger import configure_logger
logger = configure_logger()
class DataMapper:
def __init__(self, date_columns, uint64_columns):
self.date_columns = date_columns
self.uint64_columns = uint64_columns
def __init__(self, mysql_conn, table):
self.mysql_conn = mysql_conn
self.table = table
self.date_columns = []
self.uint64_columns = []
self._analyze_schema()
self.min_date = datetime(1970, 1, 1)
self.max_date = datetime(2105, 12, 31, 23, 59, 59)
def _analyze_schema(self):
"""分析表结构自动识别字段类型"""
schema_query = f"""
SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = '{self.table}'
"""
with self.mysql_conn.connect().cursor() as cursor:
cursor.execute(schema_query)
for col_name, data_type, col_type in cursor.fetchall():
# 识别日期时间字段
if data_type in ('datetime', 'timestamp', 'date'):
self.date_columns.append(col_name)
# 识别无符号整数字段匹配BIGINT UNSIGNED等
if 'unsigned' in col_type.lower() and 'int' in data_type:
self.uint64_columns.append(col_name)
#logger.info(f"自动识别字段类型 - 日期字段: {self.date_columns}")
#logger.info(f"自动识别字段类型 - 无符号整数字段: {self.uint64_columns}")
def map_row(self, columns, row):
row_dict = dict(zip(columns, row))
return {col: self._map_value(col, val) for col, val in row_dict.items()}

@ -18,6 +18,17 @@ class SyncService:
client.execute("DROP TABLE IF EXISTS "+table) # 删除表(如果存在)
client.execute(create_sql) # 创建新表
def get_table_schema(self, table):
"""获取表的字段类型信息"""
schema_query = f"""
SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = '{table}'
"""
with self.mysql_conn.connect().cursor() as cursor:
cursor.execute(schema_query)
return {row[0]: (row[1], row[2]) for row in cursor.fetchall()}
def _load_table_columns(self,table):
"""加载表的列信息"""
result = self.ch_conn.connect().execute("DESCRIBE TABLE "+table)

Loading…
Cancel
Save