from connectors.mysql_connector import MySQLConnector from connectors.clickhouse_connector import ClickHouseConnector from mappers.data_mapper import DataMapper from services.sync_service import SyncService from utils.logger import configure_logger logger = configure_logger() def main(): # 数据库配置 mysql_config = { 'host': '10.10.14.101', 'port': 3306, 'user': 'ylt', 'password': 'Ycharge666', 'db': 'yltcharge', 'charset': 'utf8mb4' } ch_config = { 'host': '10.10.14.250', 'port': 9000, 'settings': { 'max_insert_block_size': 100000, 'async_insert': 1, 'wait_for_async_insert': 0 } } # 初始化组件 mysql_conn = MySQLConnector(mysql_config) ch_conn = ClickHouseConnector(ch_config) mapper = DataMapper( date_columns=[ 'order_time', 'charge_begin_time', 'charge_end_time', 'pay_time', 'cancel_time', 'finish_time', 'report_time', 'create_time', 'update_time' ], uint64_columns=[ 'id', 'user_id', 'company_id', 'station_id', 'equipment_id', 'connector_id', 'user_coupon_id', 'invoice_id', 'charge_market_id', 'company_activity_id', 'charge_begin_soc', 'charge_end_soc', 'charge_cur_soc', 'charge_duration', 'charge_settle_type', 'order_type', 'pay_method', 'coupon_type', 'ticket_status', 'charge_discount_type', 'settle_method', 'settle_status', 'cancel_type', 'cancel_code', 'finish_type', 'finish_code', 'boot_mode', 'settlement_type', 'is_parking_coupon', 'exception_flag', 'has_shallow_report', 'has_deep_report', 'shallow_report_is_received', 'com_charge_discount_type', 'special_channel_order_source' ] ) # 创建同步服务 service = SyncService( mysql_conn=mysql_conn, ch_conn=ch_conn, mapper=mapper, schema_path="schemas/t_equipment_charge_order.sql" ) try: service.create_ch_table() service.sync_data() except Exception as e: logger.error(f"同步失败: {str(e)}", exc_info=True) finally: ch_conn.disconnect() if __name__ == "__main__": main()