You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

122 lines
4.4 KiB

from datetime import datetime
import logging
from dateutil.parser import parse
logger = logging.getLogger(__name__)
class DataMapper:
def __init__(self, mysql_conn, table):
self.mysql_conn = mysql_conn
self.table = table
self.date_columns = []
self.uint_columns = []
self._analyze_schema()
self.min_date = datetime(1970, 1, 1)
self.max_date = datetime(2100, 1, 1)
self.clickhouse_min_date = datetime(1970, 1, 1)
self.clickhouse_max_date = datetime(2105, 12, 31, 23, 59, 59)
def _analyze_schema(self):
"""分析MySQL表结构"""
schema_query = f"""
SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = '{self.table}'
"""
with self.mysql_conn.connect().cursor() as cursor:
cursor.execute(schema_query)
for col, dtype, ctype in cursor.fetchall():
if dtype in ('datetime', 'timestamp', 'date', 'time'):
self.date_columns.append(col)
if 'unsigned' in ctype.lower() and 'int' in dtype:
self.uint_columns.append(col)
#logger.info(f"日期字段: {self.date_columns}")
#logger.info(f"无符号整数字段: {self.uint_columns}")
def map_row(self, columns, row):
"""行数据映射(带调试信息)"""
debug_row = {col: val for col, val in zip(columns, row)}
try:
return {col: self._map_value(col, val) for col, val in zip(columns, row)}
except Exception as e:
logger.error(f"解析失败的行数据: {debug_row}")
raise
def _map_value(self, col, value):
"""字段值转换"""
if col in self.uint_columns:
return self._handle_uint(value)
if col in self.date_columns:
return self._handle_datetime(value)
if isinstance(value, str):
return value.strip()[:500] # 防止超长字符串
return value
def _handle_uint(self, value):
"""处理无符号整数"""
try:
return int(value) if value not in (None, '', 'NULL') else 0
except:
return 0
def _handle_datetime(self, value):
"""处理时间字段(增强版)"""
original_value = value
try:
# 处理空值和无效值
if value in (None, 0, '0', '0000-00-00', '0000-00-00 00:00:00'):
return self.clickhouse_min_date
# 尝试自动解析各种格式
try:
dt = parser.parse(str(value))
return self._clamp_datetime(dt)
except:
pass
# 处理时间戳
if str(value).isdigit():
ts = int(value)
length = len(str(value))
if 10 <= length <= 13: # 秒级到毫秒级
divisor = 10 ** (length - 10)
return self._clamp_datetime(datetime.fromtimestamp(ts / divisor))
# 尝试常见格式
formats = [
'%Y-%m-%d %H:%M:%S',
'%Y/%m/%d %H:%M:%S',
'%Y%m%d%H%M%S',
'%Y-%m-%d',
'%Y%m%d',
'%Y-%m-%dT%H:%M:%S.%fZ' # ISO格式
]
for fmt in formats:
try:
return self._clamp_datetime(datetime.strptime(str(value), fmt))
except ValueError:
continue
logger.warning(f"无法解析的时间格式: {original_value}")
return self.clickhouse_min_date
except Exception as e:
logger.error(f"时间解析错误: {original_value} - {str(e)}")
return self.clickhouse_min_date
def _clamp_datetime(self, dt):
"""严格限制时间范围"""
if not isinstance(dt, datetime):
return self.clickhouse_min_date
if dt < self.clickhouse_min_date:
logger.debug(f"修正过小时间值: {dt}")
return self.clickhouse_min_date
if dt > self.clickhouse_max_date:
logger.debug(f"修正过大时间值: {dt}")
return self.clickhouse_max_date
return dt