main
黄海 5 months ago
parent fbc1b7083b
commit a2bb5c4ec1

@ -2,6 +2,8 @@ import pymysql
import re
from config.db_config import *
from config.db_config import MYSQL_CONFIG
from connectors.clickhouse_connector import ClickHouseConnector
def get_mysql_table_structure(config, table):
"""获取MySQL表结构信息"""
@ -82,6 +84,7 @@ def convert_to_clickhouse_type(mysql_type):
return type_map.get(type_str, 'String')
def generate_clickhouse_ddl(table_name, columns, indexes, primary_keys):
"""生成ClickHouse建表语句"""
# 处理字段定义
@ -132,10 +135,22 @@ def generate_clickhouse_ddl(table_name, columns, indexes, primary_keys):
return ddl
# 创建ClickHouse表
def create_ch_table(table):
ch_conn = ClickHouseConnector(CH_CONFIG)
"""创建ClickHouse表"""
with open("schemas/" + table + ".sql", 'r', encoding='utf-8') as f:
create_sql = f.read()
client = ch_conn.connect()
client.execute("DROP TABLE IF EXISTS " + table) # 删除表(如果存在)
client.execute(create_sql) # 创建新表
ch_conn.disconnect()
# 使用示例
if __name__ == '__main__':
table = 't_station'
#table='t_equipment_charge_order'
# 获取表结构
columns, indexes, primary_keys = get_mysql_table_structure(MYSQL_CONFIG, table)
@ -144,6 +159,9 @@ if __name__ == '__main__':
ddl = generate_clickhouse_ddl(table, columns, indexes, primary_keys)
print(ddl)
# 保存到文件中 schemas/t_station.sql
with open(f'schemas/{table}.sql', 'w',encoding='utf-8') as f:
with open(f'schemas/{table}.sql', 'w', encoding='utf-8') as f:
f.write(ddl)
print(f'✅ DDL语句已保存至schemas/{table}.sql')
# 创建ClickHouse表
create_ch_table(table)

@ -22,13 +22,10 @@ def main():
service = SyncService(
mysql_conn=mysql_conn,
ch_conn=ch_conn,
mapper=mapper,
schema_path="schemas/" + tableName + ".sql"
mapper=mapper
)
try:
# 创建ClickHouse表
service.create_ch_table(tableName)
# 同步数据
service.sync_data(batch_size=5000, table=tableName)
except Exception as e:

@ -1,115 +1,137 @@
CREATE TABLE t_equipment_charge_order (
id UInt64,
user_id UInt64,
company_id UInt64,
station_id UInt64,
equipment_id UInt64,
connector_id UInt64,
id Int64,
user_id Int64,
company_id Nullable(Int64),
station_id Nullable(Int64),
equipment_id Int64,
connector_id Int64,
user_card_no Nullable(String),
state Nullable(UInt8),
operate_state UInt8 DEFAULT 0,
state Nullable(Int8),
operate_state Nullable(Int8),
order_no String,
platform_order_no Nullable(String),
platform_sale_type UInt8 DEFAULT 1,
platform_sale_type Nullable(Int8),
platform_driver_id Nullable(String),
platform_phone Nullable(String),
charging_amt Decimal(20,2) DEFAULT 0.00,
order_type Nullable(UInt8),
order_time DateTime,
charge_begin_time DateTime,
charge_end_time DateTime,
charge_begin_degree Float64 DEFAULT 0.00,
charge_end_degree Float64 DEFAULT 0.00,
charge_degree Float64 DEFAULT 0.000,
charge_ah Float64 DEFAULT 0.00,
charging_amt Nullable(Decimal(20,2)),
order_type Nullable(Int8),
order_time Nullable(DateTime),
charge_begin_time Nullable(DateTime),
charge_end_time Nullable(DateTime),
charge_begin_degree Nullable(Float64),
charge_end_degree Nullable(Float64),
charge_degree Nullable(Float64),
charge_ah Nullable(Float64),
charge_times_degree Nullable(String),
charge_begin_soc UInt32 DEFAULT 0,
charge_end_soc UInt32 DEFAULT 0,
charge_cur_soc UInt32 DEFAULT 0,
charge_duration UInt32 DEFAULT 0,
charge_settle_type Nullable(UInt8),
charge_unit_price Decimal(10,7),
charge_unit_cost Decimal(10,7),
charge_unit_service_fee Decimal(20,7),
receivable_electric_fee Decimal(20,7) DEFAULT 0.0000000,
receivable_service_fee Decimal(20,7) DEFAULT 0.0000000,
receivable_total_fee Decimal(20,7) DEFAULT 0.0000000,
charge_elecfee_amount Decimal(20,7) DEFAULT 0.0000000,
charge_elecfee_cost_amount Decimal(20,7) DEFAULT 0.0000000,
charge_service_amount Decimal(20,7) DEFAULT 0.0000000,
actual_pay_amount Decimal(20,2) DEFAULT 0.00,
charge_begin_soc Nullable(Int32),
charge_end_soc Nullable(Int32),
charge_cur_soc Nullable(Int32),
charge_duration Nullable(Int32),
charge_settle_type Nullable(Int8),
charge_unit_price Nullable(Decimal(10,7)),
charge_unit_cost Nullable(Decimal(10,7)),
charge_unit_service_fee Nullable(Decimal(20,7)),
receivable_electric_fee Nullable(Decimal(20,7)),
receivable_service_fee Nullable(Decimal(20,7)),
receivable_total_fee Nullable(Decimal(20,7)),
charge_elecfee_amount Nullable(Decimal(20,7)),
charge_elecfee_cost_amount Nullable(Decimal(20,7)),
charge_service_amount Nullable(Decimal(20,7)),
actual_pay_amount Nullable(Decimal(20,2)),
pay_order_no Nullable(String),
pay_amount Decimal(20,2) DEFAULT 0.00,
pay_method Nullable(UInt8),
pay_status UInt8 DEFAULT 0,
pay_amount Nullable(Decimal(20,2)),
pay_method Nullable(Int8),
pay_status Nullable(Int8),
pay_time Nullable(DateTime),
user_coupon_id Nullable(Int64),
coupon_name Nullable(String),
coupon_type Nullable(UInt8),
coupon_amount Decimal(20,2) DEFAULT 0.00,
elecfee_coupon_amount Decimal(20,2) DEFAULT 0.00,
servicefee_coupon_amount Decimal(20,2) DEFAULT 0.00,
invoice_id Nullable(UInt64),
invoice_fee Decimal(20,2) DEFAULT 0.00,
ticket_status UInt8 DEFAULT 0,
activity_electric_fee Decimal(20,7) DEFAULT 0.0000000,
activity_service_fee Decimal(20,2) DEFAULT 0.00,
coupon_type Nullable(Int8),
coupon_amount Nullable(Decimal(20,2)),
elecfee_coupon_amount Nullable(Decimal(20,2)),
servicefee_coupon_amount Nullable(Decimal(20,2)),
invoice_id Nullable(Int64),
invoice_fee Nullable(Decimal(20,2)),
ticket_status Nullable(Int8),
activity_electric_fee Nullable(Decimal(20,7)),
activity_service_fee Nullable(Decimal(20,2)),
charge_plate_no Nullable(String),
charge_vin Nullable(String),
charge_strategy Nullable(String),
charge_strategy_param Nullable(String),
operator_id Nullable(UInt8),
operator_income Decimal(20,2) DEFAULT 0.00,
plat_service_fee Decimal(20,2) DEFAULT 0.00,
charge_discount_type UInt8 DEFAULT 0,
settle_electric_fee Decimal(20,2) DEFAULT 0.00,
settle_service_fee Decimal(20,2) DEFAULT 0.00,
settle_fee Decimal(20,2) DEFAULT 0.00,
settle_method UInt8 DEFAULT 2,
settle_coupon_amount Decimal(20,2) DEFAULT 0.00,
settle_coupon_electric_fee Decimal(20,2) DEFAULT 0.00,
settle_coupon_service_fee Decimal(20,2) DEFAULT 0.00,
settle_status UInt8 DEFAULT 0,
cancel_type Nullable(UInt32),
cancel_code Nullable(UInt32),
operator_id Nullable(Int64),
operator_income Nullable(Decimal(20,2)),
plat_service_fee Nullable(Decimal(20,2)),
charge_discount_type Nullable(Int8),
settle_electric_fee Nullable(Decimal(20,2)),
settle_service_fee Nullable(Decimal(20,2)),
settle_fee Nullable(Decimal(20,2)),
settle_method Nullable(Int8),
settle_coupon_amount Nullable(Decimal(20,2)),
settle_coupon_electric_fee Nullable(Decimal(20,2)),
settle_coupon_service_fee Nullable(Decimal(20,2)),
settle_status Nullable(Int8),
cancel_type Nullable(Int32),
cancel_code Nullable(Int32),
cancel_msg Nullable(String),
cancel_time Nullable(DateTime),
finish_type Nullable(UInt32),
finish_code Nullable(UInt32),
finish_type Nullable(Int32),
finish_code Nullable(Int32),
finish_msg Nullable(String),
finish_time Nullable(DateTime),
boot_mode UInt8 DEFAULT 1,
settlement_type UInt8 DEFAULT 0,
is_parking_coupon UInt8 DEFAULT 0,
boot_mode Nullable(Int8),
settlement_type Nullable(Int8),
is_parking_coupon Nullable(Int8),
control_source Nullable(String),
exception_flag UInt8 DEFAULT 0,
exception_flag Nullable(Int8),
exception_msg Nullable(String),
report_time DateTime,
com_service_fee Decimal(10,2) DEFAULT 0.00,
com_electric_fee Decimal(10,2) DEFAULT 0.00,
com_total_fee Decimal(10,2) DEFAULT 0.00,
com_charge_discount_type Nullable(UInt8),
com_charge_discount Float64 DEFAULT 0.00,
has_shallow_report UInt8 DEFAULT 0,
has_deep_report UInt8 DEFAULT 0,
is_exception UInt8 DEFAULT 0,
shallow_report_is_received UInt8 DEFAULT 0,
create_time DateTime,
update_time DateTime,
charge_market_id Nullable(UInt64),
coupon_total_amount Decimal(20,2) DEFAULT 0.00,
coupon_service_total_amount Decimal(20,2) DEFAULT 0.00,
coupon_elecfee_total_amount Decimal(20,2) DEFAULT 0.00,
activity_name String DEFAULT '',
activity_total_fee Decimal(20,2) DEFAULT 0.00,
company_activity_id Nullable(UInt64),
report_time Nullable(DateTime),
com_service_fee Nullable(Decimal(10,2)),
com_electric_fee Nullable(Decimal(10,2)),
com_total_fee Nullable(Decimal(10,2)),
com_charge_discount_type Nullable(Int8),
com_charge_discount Nullable(Float64),
has_shallow_report Nullable(Int8),
has_deep_report Nullable(Int8),
is_exception Nullable(Int8),
shallow_report_is_received Nullable(Int8),
create_time Nullable(DateTime),
update_time Nullable(DateTime),
charge_market_id Nullable(Int64),
coupon_total_amount Decimal(20,2),
coupon_service_total_amount Decimal(20,2),
coupon_elecfee_total_amount Decimal(20,2),
activity_name String,
activity_total_fee Decimal(20,2),
company_activity_id Nullable(Int64),
company_activity_name Nullable(String),
company_activity_total_fee Decimal(20,2) DEFAULT 0.00,
company_activity_service_fee Decimal(20,2) DEFAULT 0.00,
company_activity_electric_fee Decimal(32,7) DEFAULT 0.0000000,
subsidy_degree Decimal(20,3) DEFAULT 0.000,
subsidy_fee Decimal(20,2) DEFAULT 0.00,
special_channel_order_source Nullable(UInt8)
company_activity_total_fee Decimal(20,2),
company_activity_service_fee Decimal(20,2),
company_activity_electric_fee Decimal(20,7),
subsidy_degree Decimal(20,3),
subsidy_fee Decimal(20,2),
special_channel_order_source Nullable(Int8),
INDEX idx_uounique (equipment_id, charge_begin_time, charge_end_time, connector_id) TYPE minmax GRANULARITY 3,
INDEX id (id) TYPE minmax GRANULARITY 3,
INDEX order_no (order_no) TYPE minmax GRANULARITY 3,
INDEX user_coupon_id (user_coupon_id) TYPE minmax GRANULARITY 3,
INDEX user_id (user_id) TYPE minmax GRANULARITY 3,
INDEX user_card_no (user_card_no) TYPE minmax GRANULARITY 3,
INDEX INDEX_ORDER_TIME (order_time) TYPE minmax GRANULARITY 3,
INDEX station_id (station_id) TYPE minmax GRANULARITY 3,
INDEX exception_flag (exception_flag) TYPE minmax GRANULARITY 3,
INDEX state (state) TYPE minmax GRANULARITY 3,
INDEX idx_state_reporttime (state, report_time) TYPE minmax GRANULARITY 3,
INDEX idx_actualpayamount_userid (user_id, actual_pay_amount) TYPE minmax GRANULARITY 3,
INDEX idx_stationid_state_chargeendtime_chargebeginsoc (station_id, state, charge_end_time, charge_begin_soc) TYPE minmax GRANULARITY 3,
INDEX idx_e40168caafb4d724214ad41b4d52b666 (charge_end_time, state, pay_status, actual_pay_amount, charge_degree, station_id) TYPE minmax GRANULARITY 3,
INDEX create_time (create_time) TYPE minmax GRANULARITY 3,
INDEX index_company_id (company_id) TYPE minmax GRANULARITY 3,
INDEX vin (charge_vin) TYPE minmax GRANULARITY 3,
INDEX idx_reporttime (report_time) TYPE minmax GRANULARITY 3,
INDEX idx_state_chargedegree (state, charge_degree) TYPE minmax GRANULARITY 3,
INDEX idx_stationid_chargedegree (station_id, charge_degree) TYPE minmax GRANULARITY 3,
INDEX platform_order_no (platform_order_no) TYPE minmax GRANULARITY 3,
INDEX idx_cover_condition (station_id, report_time, state, charge_degree) TYPE minmax GRANULARITY 3
) ENGINE = MergeTree()
ORDER BY (create_time, id)
SETTINGS index_granularity = 8192
ORDER BY (id)
SETTINGS index_granularity = 8192;

@ -2,22 +2,12 @@ from tqdm import tqdm
class SyncService:
def __init__(self, mysql_conn, ch_conn, mapper, schema_path):
def __init__(self, mysql_conn, ch_conn, mapper):
self.mysql_conn = mysql_conn
self.ch_conn = ch_conn
self.mapper = mapper
self.schema_path = schema_path
self.columns = None # 初始化为None稍后加载
def create_ch_table(self,table):
"""创建ClickHouse表"""
with open(self.schema_path, 'r', encoding='utf-8') as f:
create_sql = f.read()
client = self.ch_conn.connect()
client.execute("DROP TABLE IF EXISTS "+table) # 删除表(如果存在)
client.execute(create_sql) # 创建新表
def get_table_schema(self, table):
"""获取表的字段类型信息"""
schema_query = f"""

Loading…
Cancel
Save