main
黄海 5 months ago
parent fd94c7ff36
commit 1a981d3dab

@ -0,0 +1,52 @@
TABLE_CONFIG = {
# 公共日期字段(会被所有表继承)
"common_date_columns": [
'create_time',
'update_time',
'report_time'
],
# 公共整型字段(会被所有表继承)
"common_uint_columns": [
'id',
'user_id',
'company_id',
'station_id'
],
# 表配置列表
"tables": [
{
"name": "t_equipment_charge_order",
"schema_path": "schemas/t_equipment_charge_order.sql",
# 表特有日期字段(会与公共字段合并)
"date_columns": [
'order_time',
'charge_begin_time',
'charge_end_time',
'pay_time'
],
# 表特有整型字段(会与公共字段合并)
"uint_columns": [
'equipment_id',
'connector_id',
'charge_market_id',
'com_charge_discount_type'
],
"batch_size": 10000
},
{
"name": "t_station",
"schema_path": "schemas/t_station.sql",
"date_columns": [
'operation_date'
],
"uint_columns": [
'operator_id',
'fast_connector_nums',
'slow_connector_nums'
],
"batch_size": 5000
}
]
}

@ -6,45 +6,26 @@ from utils.logger import configure_logger
from config.db_config import *
logger = configure_logger()
from config.tables_config import TABLE_CONFIG
def main():
# 初始化组件
mysql_conn = MySQLConnector(mysql_config)
ch_conn = ClickHouseConnector(ch_config)
mapper = DataMapper(
date_columns=[
'order_time', 'charge_begin_time', 'charge_end_time',
'pay_time', 'cancel_time', 'finish_time', 'report_time',
'create_time', 'update_time'
],
uint64_columns=[
'id', 'user_id', 'company_id', 'station_id', 'equipment_id',
'connector_id', 'user_coupon_id', 'invoice_id', 'charge_market_id',
'company_activity_id', 'charge_begin_soc', 'charge_end_soc',
'charge_cur_soc', 'charge_duration', 'charge_settle_type',
'order_type', 'pay_method', 'coupon_type', 'ticket_status',
'charge_discount_type', 'settle_method', 'settle_status',
'cancel_type', 'cancel_code', 'finish_type', 'finish_code',
'boot_mode', 'settlement_type', 'is_parking_coupon', 'exception_flag',
'has_shallow_report', 'has_deep_report', 'shallow_report_is_received',
'com_charge_discount_type', 'special_channel_order_source'
]
)
# 创建同步服务
# 创建同步服务(支持多表)
service = SyncService(
mysql_conn=mysql_conn,
ch_conn=ch_conn,
mapper=mapper,
schema_path="schemas/t_equipment_charge_order.sql"
mapper=DataMapper(),
table_config=TABLE_CONFIG
)
try:
service.create_ch_table()
service.sync_data()
success, failed = service.sync_all_tables()
logger.info(f"同步完成:成功 {len(success)} 表,失败 {len(failed)}")
except Exception as e:
logger.error(f"同步失败: {str(e)}", exc_info=True)
logger.error(f"全局同步失败: {str(e)}", exc_info=True)
finally:
ch_conn.disconnect()

@ -1,64 +1,108 @@
from datetime import datetime
import datetime
import logging
from typing import List, Dict, Any
class DataMapper:
def __init__(self, date_columns, uint64_columns):
self.date_columns = date_columns
self.uint64_columns = uint64_columns
self.min_date = datetime(1970, 1, 1)
self.max_date = datetime(2105, 12, 31, 23, 59, 59)
def __init__(self, date_columns: List[str] = None, uint64_columns: List[str] = None):
"""
初始化数据映射器
:param date_columns: 公共日期字段列表
:param uint64_columns: 公共整型字段列表
"""
self.base_date_columns = date_columns if date_columns else []
self.base_uint64_columns = uint64_columns if uint64_columns else []
self.logger = logging.getLogger(__name__)
def map_row(self, columns, row):
row_dict = dict(zip(columns, row))
return {col: self._map_value(col, val) for col, val in row_dict.items()}
def map_row(self,
columns: List[str],
row: tuple,
date_columns: List[str] = None,
uint_columns: List[str] = None) -> Dict[str, Any]:
"""
映射单行数据到ClickHouse兼容格式
:param columns: 字段名称列表
:param row: MySQL原始数据行
:param date_columns: 表特有日期字段
:param uint_columns: 表特有整型字段
:return: 转换后的字典
"""
# 合并公共字段和表特有字段
all_date_columns = self._merge_columns(self.base_date_columns, date_columns)
all_uint_columns = self._merge_columns(self.base_uint64_columns, uint_columns)
def _map_value(self, col, value):
if col in self.uint64_columns:
return self._handle_uint64(value)
elif col in self.date_columns:
return self._handle_datetime(value)
elif isinstance(value, str):
return value.strip()
return value
mapped = {}
for col_name, value in zip(columns, row):
try:
# 处理空值
if value is None:
mapped[col_name] = self._handle_null(col_name, all_uint_columns)
continue
def _handle_uint64(self, value):
try:
return int(float(value)) if value not in (None, '', 'NULL') else 0
except:
return 0
# 日期类型处理
if col_name in all_date_columns:
mapped[col_name] = self._convert_datetime(value)
# 整型处理
elif col_name in all_uint_columns:
mapped[col_name] = self._convert_uint(value)
# 默认处理
else:
mapped[col_name] = value
except Exception as e:
self.logger.warning(f"字段 {col_name} 转换异常: {str(e)}")
mapped[col_name] = value # 保持原始值
def _handle_datetime(self, value):
dt = self._parse_datetime(value)
return dt if dt else self.min_date
return mapped
def _parse_datetime(self, value):
if value in (None, 0, '0', '0.0', '0.00', '', 'null', 'NULL'):
return self.min_date
def _merge_columns(self, base: List[str], specific: List[str] = None) -> List[str]:
"""合并公共字段和表特有字段"""
if specific is None:
return base.copy()
return list(set(base + specific))
def _convert_datetime(self, value) -> datetime.datetime:
"""转换日期时间字段(增加范围校验)"""
MIN_DATE = datetime.datetime(1970, 1, 1)
MAX_DATE = datetime.datetime(2105, 12, 31, 23, 59, 59)
try:
str_value = str(value).strip()
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d', '%Y%m%d%H%M%S', '%Y/%m/%d %H:%M:%S'):
try:
parsed = datetime.strptime(str_value, fmt)
return self._clamp_datetime(parsed)
except ValueError:
continue
"""转换日期时间字段"""
if isinstance(value, datetime.datetime):
return value.replace(tzinfo=None) # 移除时区信息
if isinstance(value, str):
# 处理多种日期格式
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d', '%Y%m%d %H%M%S'):
try:
return datetime.datetime.strptime(value, fmt)
except ValueError:
continue
raise ValueError(f"无法解析的日期格式: {value}")
except Exception as e:
self.logger.warning(f"日期转换异常: {value}, 使用默认值")
return MIN_DATE # 返回合法的最小日期
# 增加范围校验
if converted < MIN_DATE or converted > MAX_DATE:
self.logger.warning(f"日期 {converted} 超出范围,使用默认值")
return MAX_DATE
if str_value.isdigit():
ts = int(str_value)
if 1e12 < ts < 1e13: # 毫秒级时间戳
parsed = datetime.fromtimestamp(ts / 1000)
elif 1e9 < ts < 1e10: # 秒级时间戳
parsed = datetime.fromtimestamp(ts)
return self._clamp_datetime(parsed)
return converted.replace(microsecond=0) # 去除微秒
return self.min_date
except:
return self.min_date
def _convert_uint(self, value) -> int:
"""转换无符号整型"""
if isinstance(value, int):
return value
if isinstance(value, str) and value.isdigit():
return int(value)
if isinstance(value, float):
return int(value)
raise TypeError(f"无法转换为整型: {type(value)} {value}")
def _clamp_datetime(self, dt):
if dt < self.min_date:
return self.min_date
elif dt > self.max_date:
return self.max_date
return dt
def _handle_null(self, col_name: str, uint_columns: List[str]) -> Any:
"""处理空值"""
if col_name in uint_columns:
return 0 # 整型字段默认0
if col_name.lower().endswith('time') or col_name.lower().startswith('date'):
return datetime.datetime(1970, 1, 1) # 日期字段默认值
return '' # 其他字段默认空字符串

@ -1,37 +1,59 @@
import sys
import os
import logging
from tqdm import tqdm
from ClickHouse.utils.logger import configure_logger
logger = configure_logger()
class SyncService:
def __init__(self, mysql_conn, ch_conn, mapper, schema_path):
def __init__(self, mysql_conn, ch_conn, mapper, table_config):
"""
:param table_config: 表配置字典包含
- name: 表名
- schema_path: DDL文件路径
- date_columns: 日期字段列表
- uint_columns: 整型字段列表
- batch_size: 批处理大小可选
"""
self.mysql_conn = mysql_conn
self.ch_conn = ch_conn
self.mapper = mapper
self.schema_path = schema_path
self.columns = self._load_table_columns()
self.table_config = table_config
self.batch_size = table_config.get('batch_size', 5000)
def _load_table_columns(self):
result = self.ch_conn.connect().execute("DESCRIBE TABLE t_equipment_charge_order")
return [row[0] for row in result]
def sync_all_tables(self):
"""同步所有配置表"""
success_tables = []
failed_tables = []
def create_ch_table(self):
with open(self.schema_path, 'r', encoding='utf-8') as f:
create_sql = f.read()
for table in self.table_config['tables']:
try:
self.sync_single_table(table)
success_tables.append(table['name'])
except Exception as e:
logger.error(f"{table['name']} 同步失败: {str(e)}", exc_info=True)
failed_tables.append(table['name'])
client = self.ch_conn.connect()
client.execute("DROP TABLE IF EXISTS t_equipment_charge_order")
client.execute(create_sql)
logger.info(f"同步完成!成功:{len(success_tables)} 表,失败:{len(failed_tables)}")
return success_tables, failed_tables
def sync_data(self, batch_size=5000):
count_conn = self.mysql_conn.create_count_connection()
mysql_conn = self.mysql_conn.connect()
def sync_single_table(self, table_config):
"""同步单个表"""
logger.info(f"开始同步表:{table_config['name']}")
with count_conn.cursor() as count_cursor:
count_cursor.execute("SELECT COUNT(*) FROM t_equipment_charge_order")
total = count_cursor.fetchone()[0]
# 初始化表结构
self._init_table(table_config)
with mysql_conn.cursor() as cursor:
cursor.execute("SELECT * FROM t_equipment_charge_order ORDER BY id")
progress = tqdm(total=total, desc="同步进度", unit="rec")
# 获取数据总量
total = self._get_total_count(table_config['name'])
# 执行数据同步
with self.mysql_conn.connect().cursor() as cursor:
cursor.execute(f"SELECT * FROM {table_config['name']} ORDER BY id")
progress = tqdm(total=total, desc=f"同步 {table_config['name']}", unit="rec")
batch = []
while True:
@ -39,23 +61,50 @@ class SyncService:
if not row:
break
mapped = self.mapper.map_row(self.columns, row)
mapped = self.mapper.map_row(
columns=self._get_table_columns(table_config['name']),
row=row,
date_columns=table_config.get('date_columns', []),
uint_columns=table_config.get('uint_columns', [])
)
batch.append(mapped)
if len(batch) >= batch_size:
self._insert_batch(batch)
if len(batch) >= self.batch_size:
self._insert_batch(table_config['name'], batch)
progress.update(len(batch))
batch = []
if batch:
self._insert_batch(batch)
self._insert_batch(table_config['name'], batch)
progress.update(len(batch))
progress.close()
logger.info(f"{table_config['name']} 同步完成")
def _init_table(self, table_config):
"""初始化ClickHouse表"""
with open(table_config['schema_path'], 'r', encoding='utf-8') as f:
create_sql = f.read()
client = self.ch_conn.connect()
client.execute(f"DROP TABLE IF EXISTS {table_config['name']}")
client.execute(create_sql)
def _get_total_count(self, table_name):
"""获取MySQL表数据总量"""
with self.mysql_conn.connect().cursor() as cursor:
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
return cursor.fetchone()[0]
def _get_table_columns(self, table_name):
"""获取ClickHouse表字段列表"""
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table_name}")
return [row[0] for row in result]
def _insert_batch(self, batch):
def _insert_batch(self, table_name, batch):
"""批量插入数据"""
self.ch_conn.connect().execute(
'INSERT INTO t_equipment_charge_order VALUES',
f'INSERT INTO {table_name} VALUES',
batch,
types_check=True,
settings={

Loading…
Cancel
Save