You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

73 lines
2.3 KiB

5 months ago
from connectors.mysql_connector import MySQLConnector
from connectors.clickhouse_connector import ClickHouseConnector
from mappers.data_mapper import DataMapper
from services.sync_service import SyncService
from utils.logger import configure_logger
logger = configure_logger()
def main():
# 数据库配置
mysql_config = {
'host': '10.10.14.101',
'port': 3306,
'user': 'ylt',
'password': 'Ycharge666',
'db': 'yltcharge',
'charset': 'utf8mb4'
}
ch_config = {
'host': '10.10.14.250',
'port': 9000,
'settings': {
'max_insert_block_size': 100000,
'async_insert': 1,
'wait_for_async_insert': 0
}
}
# 初始化组件
mysql_conn = MySQLConnector(mysql_config)
ch_conn = ClickHouseConnector(ch_config)
mapper = DataMapper(
date_columns=[
'order_time', 'charge_begin_time', 'charge_end_time',
'pay_time', 'cancel_time', 'finish_time', 'report_time',
'create_time', 'update_time'
],
uint64_columns=[
'id', 'user_id', 'company_id', 'station_id', 'equipment_id',
'connector_id', 'user_coupon_id', 'invoice_id', 'charge_market_id',
'company_activity_id', 'charge_begin_soc', 'charge_end_soc',
'charge_cur_soc', 'charge_duration', 'charge_settle_type',
'order_type', 'pay_method', 'coupon_type', 'ticket_status',
'charge_discount_type', 'settle_method', 'settle_status',
'cancel_type', 'cancel_code', 'finish_type', 'finish_code',
'boot_mode', 'settlement_type', 'is_parking_coupon', 'exception_flag',
'has_shallow_report', 'has_deep_report', 'shallow_report_is_received',
'com_charge_discount_type', 'special_channel_order_source'
]
)
# 创建同步服务
service = SyncService(
mysql_conn=mysql_conn,
ch_conn=ch_conn,
mapper=mapper,
schema_path="schemas/table_schema.sql"
)
try:
service.create_ch_table()
service.sync_data()
except Exception as e:
logger.error(f"同步失败: {str(e)}", exc_info=True)
finally:
ch_conn.disconnect()
if __name__ == "__main__":
main()