from datetime import datetime import logging from dateutil.parser import parse logger = logging.getLogger(__name__) class DataMapper: def __init__(self, mysql_conn, table): self.mysql_conn = mysql_conn self.table = table self.date_columns = [] self.uint_columns = [] self._analyze_schema() self.min_date = datetime(1970, 1, 1) self.max_date = datetime(2100, 1, 1) self.clickhouse_min_date = datetime(1970, 1, 1) self.clickhouse_max_date = datetime(2105, 12, 31, 23, 59, 59) def _analyze_schema(self): """分析MySQL表结构""" schema_query = f""" SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = '{self.table}' """ with self.mysql_conn.connect().cursor() as cursor: cursor.execute(schema_query) for col, dtype, ctype in cursor.fetchall(): if dtype in ('datetime', 'timestamp', 'date', 'time'): self.date_columns.append(col) if 'unsigned' in ctype.lower() and 'int' in dtype: self.uint_columns.append(col) #logger.info(f"日期字段: {self.date_columns}") #logger.info(f"无符号整数字段: {self.uint_columns}") def map_row(self, columns, row): """行数据映射(带调试信息)""" debug_row = {col: val for col, val in zip(columns, row)} try: return {col: self._map_value(col, val) for col, val in zip(columns, row)} except Exception as e: logger.error(f"解析失败的行数据: {debug_row}") raise def _map_value(self, col, value): """字段值转换""" if col in self.uint_columns: return self._handle_uint(value) if col in self.date_columns: return self._handle_datetime(value) if isinstance(value, str): return value.strip()[:500] # 防止超长字符串 return value def _handle_uint(self, value): """处理无符号整数""" try: return int(value) if value not in (None, '', 'NULL') else 0 except: return 0 def _handle_datetime(self, value): """处理时间字段(增强版)""" original_value = value try: # 处理空值和无效值 if value in (None, 0, '0', '0000-00-00', '0000-00-00 00:00:00'): return self.clickhouse_min_date # 尝试自动解析各种格式 try: dt = parser.parse(str(value)) return self._clamp_datetime(dt) except: pass # 处理时间戳 if str(value).isdigit(): ts = int(value) length = len(str(value)) if 10 <= length <= 13: # 秒级到毫秒级 divisor = 10 ** (length - 10) return self._clamp_datetime(datetime.fromtimestamp(ts / divisor)) # 尝试常见格式 formats = [ '%Y-%m-%d %H:%M:%S', '%Y/%m/%d %H:%M:%S', '%Y%m%d%H%M%S', '%Y-%m-%d', '%Y%m%d', '%Y-%m-%dT%H:%M:%S.%fZ' # ISO格式 ] for fmt in formats: try: return self._clamp_datetime(datetime.strptime(str(value), fmt)) except ValueError: continue logger.warning(f"无法解析的时间格式: {original_value}") return self.clickhouse_min_date except Exception as e: logger.error(f"时间解析错误: {original_value} - {str(e)}") return self.clickhouse_min_date def _clamp_datetime(self, dt): """严格限制时间范围""" if not isinstance(dt, datetime): return self.clickhouse_min_date if dt < self.clickhouse_min_date: logger.debug(f"修正过小时间值: {dt}") return self.clickhouse_min_date if dt > self.clickhouse_max_date: logger.debug(f"修正过大时间值: {dt}") return self.clickhouse_max_date return dt