main
黄海 5 months ago
parent dda2372b11
commit 0cd0156909

@ -1,5 +1,6 @@
import pymysql
import re
from config.db_config import *
from config.db_config import MYSQL_CONFIG
def get_mysql_table_structure(config, table):
@ -134,8 +135,8 @@ def generate_clickhouse_ddl(table_name, columns, indexes, primary_keys):
# 使用示例
if __name__ == '__main__':
# 生成 t_station表的DDL
table = 't_station'
# 获取表结构
columns, indexes, primary_keys = get_mysql_table_structure(MYSQL_CONFIG, table)

@ -7,28 +7,48 @@ from utils.logger import configure_logger
logger = configure_logger()
from config.db_config import MYSQL_CONFIG, CH_CONFIG
if __name__ == "__main__":
def main():
# 初始化组件
mysql_conn = MySQLConnector(MYSQL_CONFIG)
ch_conn = ClickHouseConnector(CH_CONFIG)
mapper = DataMapper(
date_columns=[
'order_time', 'charge_begin_time', 'charge_end_time',
'pay_time', 'cancel_time', 'finish_time', 'report_time',
'create_time', 'update_time'
],
uint64_columns=[
'id', 'user_id', 'company_id', 'station_id', 'equipment_id',
'connector_id', 'user_coupon_id', 'invoice_id', 'charge_market_id',
'company_activity_id', 'charge_begin_soc', 'charge_end_soc',
'charge_cur_soc', 'charge_duration', 'charge_settle_type',
'order_type', 'pay_method', 'coupon_type', 'ticket_status',
'charge_discount_type', 'settle_method', 'settle_status',
'cancel_type', 'cancel_code', 'finish_type', 'finish_code',
'boot_mode', 'settlement_type', 'is_parking_coupon', 'exception_flag',
'has_shallow_report', 'has_deep_report', 'shallow_report_is_received',
'com_charge_discount_type', 'special_channel_order_source'
]
)
# 要处理的表名
tableName = 't_equipment_charge_order'
#tableName='t_station'
# 创建同步服务
service = SyncService(
mysql_conn=mysql_conn, ch_conn=ch_conn, mapper=DataMapper(ch_conn, tableName),
schema_path="schemas/" + tableName + ".sql"
mysql_conn=mysql_conn, ch_conn=ch_conn, mapper=mapper, schema_path="schemas/" + tableName + ".sql"
)
try:
# 创建ClickHouse表
service.create_ch_table(tableName)
# 同步数据
service.sync_data(table=tableName, batch_size=5000)
# 结束
print("同步完成")
service.sync_data(batch_size=5000, table=tableName)
except Exception as e:
logger.error(f"同步失败: {str(e)}", exc_info=True)
finally:
ch_conn.disconnect()
if __name__ == "__main__":
main()

@ -1,25 +1,13 @@
from datetime import datetime
class DataMapper:
def __init__(self, ch_conn, table_name):
self.ch_conn = ch_conn
self.table_name = table_name
self.date_columns = []
self.uint64_columns = []
def __init__(self, date_columns, uint64_columns):
self.date_columns = date_columns
self.uint64_columns = uint64_columns
self.min_date = datetime(1970, 1, 1)
self.max_date = datetime(2105, 12, 31, 23, 59, 59)
def _load_columns(self):
"""从ClickHouse表中加载列信息并设置日期和无符号整数列"""
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {self.table_name}")
for row in result:
column_name = row[0]
column_type = row[1]
if 'DateTime' in column_type or 'Date' in column_type:
self.date_columns.append(column_name)
elif 'UInt' in column_type:
self.uint64_columns.append(column_name)
def map_row(self, columns, row):
row_dict = dict(zip(columns, row))
return {col: self._map_value(col, val) for col, val in row_dict.items()}

@ -1,20 +1,20 @@
CREATE TABLE t_equipment_charge_order (
id Int64,
user_id Int64,
company_id Int64,
station_id Int64,
equipment_id Int64,
connector_id Int64,
id UInt64,
user_id UInt64,
company_id UInt64,
station_id UInt64,
equipment_id UInt64,
connector_id UInt64,
user_card_no Nullable(String),
state Nullable(Int8),
operate_state Int8 DEFAULT 0,
state Nullable(UInt8),
operate_state UInt8 DEFAULT 0,
order_no String,
platform_order_no Nullable(String),
platform_sale_type Int8 DEFAULT 1,
platform_sale_type UInt8 DEFAULT 1,
platform_driver_id Nullable(String),
platform_phone Nullable(String),
charging_amt Decimal(20,2) DEFAULT 0.00,
order_type Nullable(Int8),
order_type Nullable(UInt8),
order_time DateTime,
charge_begin_time DateTime,
charge_end_time DateTime,
@ -23,11 +23,11 @@ CREATE TABLE t_equipment_charge_order (
charge_degree Float64 DEFAULT 0.000,
charge_ah Float64 DEFAULT 0.00,
charge_times_degree Nullable(String),
charge_begin_soc Int32 DEFAULT 0,
charge_end_soc Int32 DEFAULT 0,
charge_cur_soc Int32 DEFAULT 0,
charge_duration Int32 DEFAULT 0,
charge_settle_type Nullable(Int8),
charge_begin_soc UInt32 DEFAULT 0,
charge_end_soc UInt32 DEFAULT 0,
charge_cur_soc UInt32 DEFAULT 0,
charge_duration UInt32 DEFAULT 0,
charge_settle_type Nullable(UInt8),
charge_unit_price Decimal(10,7),
charge_unit_cost Decimal(10,7),
charge_unit_service_fee Decimal(20,7),
@ -40,76 +40,76 @@ CREATE TABLE t_equipment_charge_order (
actual_pay_amount Decimal(20,2) DEFAULT 0.00,
pay_order_no Nullable(String),
pay_amount Decimal(20,2) DEFAULT 0.00,
pay_method Nullable(Int8),
pay_status Int8 DEFAULT 0,
pay_method Nullable(UInt8),
pay_status UInt8 DEFAULT 0,
pay_time Nullable(DateTime),
user_coupon_id Nullable(Int64),
coupon_name Nullable(String),
coupon_type Nullable(Int8),
coupon_type Nullable(UInt8),
coupon_amount Decimal(20,2) DEFAULT 0.00,
elecfee_coupon_amount Decimal(20,2) DEFAULT 0.00,
servicefee_coupon_amount Decimal(20,2) DEFAULT 0.00,
invoice_id Nullable(Int64),
invoice_id Nullable(UInt64),
invoice_fee Decimal(20,2) DEFAULT 0.00,
ticket_status Int8 DEFAULT 0,
ticket_status UInt8 DEFAULT 0,
activity_electric_fee Decimal(20,7) DEFAULT 0.0000000,
activity_service_fee Decimal(20,2) DEFAULT 0.00,
charge_plate_no Nullable(String),
charge_vin Nullable(String),
charge_strategy Nullable(String),
charge_strategy_param Nullable(String),
operator_id Nullable(Int8),
operator_id Nullable(UInt8),
operator_income Decimal(20,2) DEFAULT 0.00,
plat_service_fee Decimal(20,2) DEFAULT 0.00,
charge_discount_type Int8 DEFAULT 0,
charge_discount_type UInt8 DEFAULT 0,
settle_electric_fee Decimal(20,2) DEFAULT 0.00,
settle_service_fee Decimal(20,2) DEFAULT 0.00,
settle_fee Decimal(20,2) DEFAULT 0.00,
settle_method Int8 DEFAULT 2,
settle_method UInt8 DEFAULT 2,
settle_coupon_amount Decimal(20,2) DEFAULT 0.00,
settle_coupon_electric_fee Decimal(20,2) DEFAULT 0.00,
settle_coupon_service_fee Decimal(20,2) DEFAULT 0.00,
settle_status Int8 DEFAULT 0,
cancel_type Nullable(Int32),
cancel_code Nullable(Int32),
settle_status UInt8 DEFAULT 0,
cancel_type Nullable(UInt32),
cancel_code Nullable(UInt32),
cancel_msg Nullable(String),
cancel_time Nullable(DateTime),
finish_type Nullable(Int32),
finish_code Nullable(Int32),
finish_type Nullable(UInt32),
finish_code Nullable(UInt32),
finish_msg Nullable(String),
finish_time Nullable(DateTime),
boot_mode Int8 DEFAULT 1,
settlement_type Int8 DEFAULT 0,
is_parking_coupon Int8 DEFAULT 0,
boot_mode UInt8 DEFAULT 1,
settlement_type UInt8 DEFAULT 0,
is_parking_coupon UInt8 DEFAULT 0,
control_source Nullable(String),
exception_flag Int8 DEFAULT 0,
exception_flag UInt8 DEFAULT 0,
exception_msg Nullable(String),
report_time DateTime,
com_service_fee Decimal(10,2) DEFAULT 0.00,
com_electric_fee Decimal(10,2) DEFAULT 0.00,
com_total_fee Decimal(10,2) DEFAULT 0.00,
com_charge_discount_type Nullable(Int8),
com_charge_discount_type Nullable(UInt8),
com_charge_discount Float64 DEFAULT 0.00,
has_shallow_report Int8 DEFAULT 0,
has_deep_report Int8 DEFAULT 0,
is_exception Int8 DEFAULT 0,
shallow_report_is_received Int8 DEFAULT 0,
has_shallow_report UInt8 DEFAULT 0,
has_deep_report UInt8 DEFAULT 0,
is_exception UInt8 DEFAULT 0,
shallow_report_is_received UInt8 DEFAULT 0,
create_time DateTime,
update_time DateTime,
charge_market_id Nullable(Int64),
charge_market_id Nullable(UInt64),
coupon_total_amount Decimal(20,2) DEFAULT 0.00,
coupon_service_total_amount Decimal(20,2) DEFAULT 0.00,
coupon_elecfee_total_amount Decimal(20,2) DEFAULT 0.00,
activity_name String DEFAULT '',
activity_total_fee Decimal(20,2) DEFAULT 0.00,
company_activity_id Nullable(Int64),
company_activity_id Nullable(UInt64),
company_activity_name Nullable(String),
company_activity_total_fee Decimal(20,2) DEFAULT 0.00,
company_activity_service_fee Decimal(20,2) DEFAULT 0.00,
company_activity_electric_fee Decimal(32,7) DEFAULT 0.0000000,
subsidy_degree Decimal(20,3) DEFAULT 0.000,
subsidy_fee Decimal(20,2) DEFAULT 0.00,
special_channel_order_source Nullable(Int8)
special_channel_order_source Nullable(UInt8)
) ENGINE = MergeTree()
ORDER BY (create_time, id)
SETTINGS index_granularity = 8192

@ -18,7 +18,7 @@ class SyncService:
client.execute("DROP TABLE IF EXISTS "+table) # 删除表(如果存在)
client.execute(create_sql) # 创建新表
def load_table_columns(self,table):
def _load_table_columns(self,table):
"""加载表的列信息"""
result = self.ch_conn.connect().execute("DESCRIBE TABLE "+table)
return [row[0] for row in result]
@ -26,7 +26,7 @@ class SyncService:
# 同步数据
def sync_data(self, table, batch_size):
# 加载表的列信息
self.columns = self.load_table_columns(table)
self.columns = self._load_table_columns(table)
# 统计个数的连接
count_conn = self.mysql_conn.create_count_connection()
# 读取数据的连接

Loading…
Cancel
Save