You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
122 lines
4.4 KiB
122 lines
4.4 KiB
from datetime import datetime
|
|
import logging
|
|
from dateutil.parser import parse
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DataMapper:
|
|
def __init__(self, mysql_conn, table):
|
|
self.mysql_conn = mysql_conn
|
|
self.table = table
|
|
self.date_columns = []
|
|
self.uint_columns = []
|
|
self._analyze_schema()
|
|
self.min_date = datetime(1970, 1, 1)
|
|
self.max_date = datetime(2100, 1, 1)
|
|
self.clickhouse_min_date = datetime(1970, 1, 1)
|
|
self.clickhouse_max_date = datetime(2105, 12, 31, 23, 59, 59)
|
|
|
|
def _analyze_schema(self):
|
|
"""分析MySQL表结构"""
|
|
schema_query = f"""
|
|
SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE
|
|
FROM INFORMATION_SCHEMA.COLUMNS
|
|
WHERE TABLE_SCHEMA = DATABASE()
|
|
AND TABLE_NAME = '{self.table}'
|
|
"""
|
|
|
|
with self.mysql_conn.connect().cursor() as cursor:
|
|
cursor.execute(schema_query)
|
|
for col, dtype, ctype in cursor.fetchall():
|
|
if dtype in ('datetime', 'timestamp', 'date', 'time'):
|
|
self.date_columns.append(col)
|
|
if 'unsigned' in ctype.lower() and 'int' in dtype:
|
|
self.uint_columns.append(col)
|
|
|
|
#logger.info(f"日期字段: {self.date_columns}")
|
|
#logger.info(f"无符号整数字段: {self.uint_columns}")
|
|
|
|
def map_row(self, columns, row):
|
|
"""行数据映射(带调试信息)"""
|
|
debug_row = {col: val for col, val in zip(columns, row)}
|
|
try:
|
|
return {col: self._map_value(col, val) for col, val in zip(columns, row)}
|
|
except Exception as e:
|
|
logger.error(f"解析失败的行数据: {debug_row}")
|
|
raise
|
|
|
|
def _map_value(self, col, value):
|
|
"""字段值转换"""
|
|
if col in self.uint_columns:
|
|
return self._handle_uint(value)
|
|
if col in self.date_columns:
|
|
return self._handle_datetime(value)
|
|
if isinstance(value, str):
|
|
return value.strip()[:500] # 防止超长字符串
|
|
return value
|
|
|
|
def _handle_uint(self, value):
|
|
"""处理无符号整数"""
|
|
try:
|
|
return int(value) if value not in (None, '', 'NULL') else 0
|
|
except:
|
|
return 0
|
|
|
|
def _handle_datetime(self, value):
|
|
"""处理时间字段(增强版)"""
|
|
original_value = value
|
|
try:
|
|
# 处理空值和无效值
|
|
if value in (None, 0, '0', '0000-00-00', '0000-00-00 00:00:00'):
|
|
return self.clickhouse_min_date
|
|
|
|
# 尝试自动解析各种格式
|
|
try:
|
|
dt = parser.parse(str(value))
|
|
return self._clamp_datetime(dt)
|
|
except:
|
|
pass
|
|
|
|
# 处理时间戳
|
|
if str(value).isdigit():
|
|
ts = int(value)
|
|
length = len(str(value))
|
|
if 10 <= length <= 13: # 秒级到毫秒级
|
|
divisor = 10 ** (length - 10)
|
|
return self._clamp_datetime(datetime.fromtimestamp(ts / divisor))
|
|
|
|
# 尝试常见格式
|
|
formats = [
|
|
'%Y-%m-%d %H:%M:%S',
|
|
'%Y/%m/%d %H:%M:%S',
|
|
'%Y%m%d%H%M%S',
|
|
'%Y-%m-%d',
|
|
'%Y%m%d',
|
|
'%Y-%m-%dT%H:%M:%S.%fZ' # ISO格式
|
|
]
|
|
|
|
for fmt in formats:
|
|
try:
|
|
return self._clamp_datetime(datetime.strptime(str(value), fmt))
|
|
except ValueError:
|
|
continue
|
|
|
|
logger.warning(f"无法解析的时间格式: {original_value}")
|
|
return self.clickhouse_min_date
|
|
except Exception as e:
|
|
logger.error(f"时间解析错误: {original_value} - {str(e)}")
|
|
return self.clickhouse_min_date
|
|
|
|
def _clamp_datetime(self, dt):
|
|
"""严格限制时间范围"""
|
|
if not isinstance(dt, datetime):
|
|
return self.clickhouse_min_date
|
|
|
|
if dt < self.clickhouse_min_date:
|
|
logger.debug(f"修正过小时间值: {dt}")
|
|
return self.clickhouse_min_date
|
|
if dt > self.clickhouse_max_date:
|
|
logger.debug(f"修正过大时间值: {dt}")
|
|
return self.clickhouse_max_date
|
|
return dt
|