main
黄海 5 months ago
parent 69a8f5af97
commit ba61be7d77

@ -129,7 +129,7 @@ def generate_clickhouse_ddl(table_name, columns, indexes, primary_keys):
ddl += ',\n'.join(column_defs) ddl += ',\n'.join(column_defs)
if index_defs: if index_defs:
ddl += ',\n' + ',\n'.join(index_defs) ddl += ',\n' + ',\n'.join(index_defs)
ddl += '\n) ENGINE = MergeTree()\n' ddl += '\n) ENGINE = ReplacingMergeTree()\n'
ddl += order_by ddl += order_by
ddl += '\nSETTINGS index_granularity = 8192;' ddl += '\nSETTINGS index_granularity = 8192;'

@ -1,19 +1,24 @@
from datetime import datetime from datetime import datetime
from utils.logger import configure_logger import logging
logger = configure_logger() from dateutil import parser
logger = logging.getLogger(__name__)
class DataMapper: class DataMapper:
def __init__(self, mysql_conn, table): def __init__(self, mysql_conn, table):
self.mysql_conn = mysql_conn self.mysql_conn = mysql_conn
self.table = table self.table = table
self.date_columns = [] self.date_columns = []
self.uint64_columns = [] self.uint_columns = []
self._analyze_schema() self._analyze_schema()
self.min_date = datetime(1970, 1, 1) self.min_date = datetime(1970, 1, 1)
self.max_date = datetime(2105, 12, 31, 23, 59, 59) self.max_date = datetime(2100, 1, 1)
self.clickhouse_min_date = datetime(1970, 1, 1)
self.clickhouse_max_date = datetime(2105, 12, 31, 23, 59, 59)
def _analyze_schema(self): def _analyze_schema(self):
"""分析表结构自动识别字段类型""" """分析MySQL表结构"""
schema_query = f""" schema_query = f"""
SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE
FROM INFORMATION_SCHEMA.COLUMNS FROM INFORMATION_SCHEMA.COLUMNS
@ -23,69 +28,95 @@ class DataMapper:
with self.mysql_conn.connect().cursor() as cursor: with self.mysql_conn.connect().cursor() as cursor:
cursor.execute(schema_query) cursor.execute(schema_query)
for col_name, data_type, col_type in cursor.fetchall(): for col, dtype, ctype in cursor.fetchall():
# 识别日期时间字段 if dtype in ('datetime', 'timestamp', 'date', 'time'):
if data_type in ('datetime', 'timestamp', 'date'): self.date_columns.append(col)
self.date_columns.append(col_name) if 'unsigned' in ctype.lower() and 'int' in dtype:
self.uint_columns.append(col)
# 识别无符号整数字段匹配BIGINT UNSIGNED等
if 'unsigned' in col_type.lower() and 'int' in data_type:
self.uint64_columns.append(col_name)
#logger.info(f"日期字段: {self.date_columns}")
#logger.info(f"无符号整数字段: {self.uint_columns}")
#logger.info(f"自动识别字段类型 - 日期字段: {self.date_columns}")
#logger.info(f"自动识别字段类型 - 无符号整数字段: {self.uint64_columns}")
def map_row(self, columns, row): def map_row(self, columns, row):
row_dict = dict(zip(columns, row)) """行数据映射(带调试信息)"""
return {col: self._map_value(col, val) for col, val in row_dict.items()} debug_row = {col: val for col, val in zip(columns, row)}
try:
return {col: self._map_value(col, val) for col, val in zip(columns, row)}
except Exception as e:
logger.error(f"解析失败的行数据: {debug_row}")
raise
def _map_value(self, col, value): def _map_value(self, col, value):
if col in self.uint64_columns: """字段值转换"""
return self._handle_uint64(value) if col in self.uint_columns:
elif col in self.date_columns: return self._handle_uint(value)
if col in self.date_columns:
return self._handle_datetime(value) return self._handle_datetime(value)
elif isinstance(value, str): if isinstance(value, str):
return value.strip() return value.strip()[:500] # 防止超长字符串
return value return value
def _handle_uint64(self, value): def _handle_uint(self, value):
"""处理无符号整数"""
try: try:
return int(float(value)) if value not in (None, '', 'NULL') else 0 return int(value) if value not in (None, '', 'NULL') else 0
except: except:
return 0 return 0
def _handle_datetime(self, value): def _handle_datetime(self, value):
dt = self._parse_datetime(value) """处理时间字段(增强版)"""
return dt if dt else self.min_date original_value = value
try:
# 处理空值和无效值
if value in (None, 0, '0', '0000-00-00', '0000-00-00 00:00:00'):
return self.clickhouse_min_date
def _parse_datetime(self, value): # 尝试自动解析各种格式
if value in (None, 0, '0', '0.0', '0.00', '', 'null', 'NULL'): try:
return self.min_date dt = parser.parse(str(value))
return self._clamp_datetime(dt)
except:
pass
try: # 处理时间戳
str_value = str(value).strip() if str(value).isdigit():
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d', '%Y%m%d%H%M%S', '%Y/%m/%d %H:%M:%S'): ts = int(value)
length = len(str(value))
if 10 <= length <= 13: # 秒级到毫秒级
divisor = 10 ** (length - 10)
return self._clamp_datetime(datetime.fromtimestamp(ts / divisor))
# 尝试常见格式
formats = [
'%Y-%m-%d %H:%M:%S',
'%Y/%m/%d %H:%M:%S',
'%Y%m%d%H%M%S',
'%Y-%m-%d',
'%Y%m%d',
'%Y-%m-%dT%H:%M:%S.%fZ' # ISO格式
]
for fmt in formats:
try: try:
parsed = datetime.strptime(str_value, fmt) return self._clamp_datetime(datetime.strptime(str(value), fmt))
return self._clamp_datetime(parsed)
except ValueError: except ValueError:
continue continue
if str_value.isdigit(): logger.warning(f"无法解析的时间格式: {original_value}")
ts = int(str_value) return self.clickhouse_min_date
if 1e12 < ts < 1e13: # 毫秒级时间戳 except Exception as e:
parsed = datetime.fromtimestamp(ts / 1000) logger.error(f"时间解析错误: {original_value} - {str(e)}")
elif 1e9 < ts < 1e10: # 秒级时间戳 return self.clickhouse_min_date
parsed = datetime.fromtimestamp(ts)
return self._clamp_datetime(parsed)
return self.min_date
except:
return self.min_date
def _clamp_datetime(self, dt): def _clamp_datetime(self, dt):
if dt < self.min_date: """严格限制时间范围"""
return self.min_date if not isinstance(dt, datetime):
elif dt > self.max_date: return self.clickhouse_min_date
return self.max_date
if dt < self.clickhouse_min_date:
logger.debug(f"修正过小时间值: {dt}")
return self.clickhouse_min_date
if dt > self.clickhouse_max_date:
logger.debug(f"修正过大时间值: {dt}")
return self.clickhouse_max_date
return dt return dt

@ -132,6 +132,6 @@ CREATE TABLE t_equipment_charge_order (
INDEX idx_stationid_chargedegree (station_id, charge_degree) TYPE minmax GRANULARITY 3, INDEX idx_stationid_chargedegree (station_id, charge_degree) TYPE minmax GRANULARITY 3,
INDEX platform_order_no (platform_order_no) TYPE minmax GRANULARITY 3, INDEX platform_order_no (platform_order_no) TYPE minmax GRANULARITY 3,
INDEX idx_cover_condition (station_id, report_time, state, charge_degree) TYPE minmax GRANULARITY 3 INDEX idx_cover_condition (station_id, report_time, state, charge_degree) TYPE minmax GRANULARITY 3
) ENGINE = MergeTree() ) ENGINE = ReplacingMergeTree()
ORDER BY (id) ORDER BY (id)
SETTINGS index_granularity = 8192; SETTINGS index_granularity = 8192;

@ -4,6 +4,7 @@ from tqdm import tqdm
import time import time
import psutil import psutil
import logging import logging
from datetime import datetime
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -14,12 +15,13 @@ class SyncService:
self.ch_conn = ch_conn self.ch_conn = ch_conn
self.mapper = mapper self.mapper = mapper
self.columns = None self.columns = None
self.full_sync_interval = 24 * 3600 # 24小时全量同步一次 self.full_sync_interval = 24 * 3600 # 全量同步间隔
self.last_full_sync = 0 self.last_full_sync = 0
self.recent_check_count = 5000 # 近期数据检查条数 self.recent_check_count = 5000 # 近期数据检查量
self.optimize_frequency = 100 # 每100批优化一次
def sync_data(self, table, batch_size): def sync_data(self, table, batch_size):
"""智能同步策略""" """智能同步入口"""
current_time = time.time() current_time = time.time()
self.columns = self._load_table_columns(table) self.columns = self._load_table_columns(table)
@ -30,111 +32,121 @@ class SyncService:
self._incremental_sync(table, batch_size) self._incremental_sync(table, batch_size)
def _full_sync(self, table, batch_size): def _full_sync(self, table, batch_size):
"""全量同步""" """全量数据同步"""
logger.info(f"🚀 开始全量同步{table}") logger.info(f"🚀 开始全量同步{table}")
mysql_conn = self.mysql_conn.connect() with self.mysql_conn.connect() as mysql_conn:
try: try:
with mysql_conn.cursor() as count_cursor: with mysql_conn.cursor() as count_cursor:
count_cursor.execute(f"SELECT COUNT(*) FROM {table}") count_cursor.execute(f"SELECT COUNT(*) FROM {table}")
total = count_cursor.fetchone()[0] total = count_cursor.fetchone()[0]
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor: with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
cursor.execute(f"SELECT * FROM {table} ORDER BY id") cursor.execute(f"SELECT * FROM {table} ORDER BY id")
self._sync_with_cursor(cursor, total, table, batch_size) self._sync_with_cursor(cursor, total, table, batch_size)
except Exception as e:
logger.error(f"全量同步失败: {str(e)}")
raise
finally:
self._optimize_table(table)
except Exception as e:
logger.error(f"全量同步失败: {str(e)}")
finally:
mysql_conn.close()
logger.info(f"✅ 全量同步{table}完成") logger.info(f"✅ 全量同步{table}完成")
def _incremental_sync(self, table, batch_size): def _incremental_sync(self, table, batch_size):
"""增量同步(含近期数据保障)""" """增量数据同步"""
last_id = self._get_last_id_from_ch(table) last_id = self._get_last_id_from_ch(table)
logger.info(f"🔁 开始增量同步{table}起始ID: {last_id}") logger.info(f"🔁 开始增量同步{table}起始ID: {last_id}")
# 标准增量同步 # 标准增量同步
mysql_conn = self.mysql_conn.connect() with self.mysql_conn.connect() as mysql_conn:
try: try:
with mysql_conn.cursor() as count_cursor: with mysql_conn.cursor() as count_cursor:
count_cursor.execute( count_cursor.execute(
f"SELECT COUNT(*) FROM {table} WHERE id > {last_id}" f"SELECT COUNT(*) FROM {table} WHERE id > {last_id}"
) )
total = count_cursor.fetchone()[0] total = count_cursor.fetchone()[0]
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor: with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
cursor.execute( cursor.execute(
f"SELECT * FROM {table} WHERE id > {last_id} ORDER BY id" f"SELECT * FROM {table} WHERE id > {last_id} ORDER BY id"
) )
self._sync_with_cursor(cursor, total, table, batch_size) self._sync_with_cursor(cursor, total, table, batch_size)
finally: except Exception as e:
mysql_conn.close() logger.error(f"增量同步失败: {str(e)}")
raise
# 近期数据保障同步
# 安全近期数据同步
self._sync_recent_data(table, batch_size) self._sync_recent_data(table, batch_size)
logger.info(f"✅ 增量同步{table}完成最后ID: {self._get_last_id_from_ch(table)}") logger.info(f"✅ 增量同步{table}完成最后ID: {self._get_last_id_from_ch(table)}")
def _sync_recent_data(self, table, batch_size): def _sync_recent_data(self, table, batch_size):
"""近期数据保障同步""" """安全近期数据同步"""
logger.info(f"🔄 开始同步{table}最近{self.recent_check_count}数据") logger.info(f"🔄 开始安全同步{table}近期数据")
mysql_conn = self.mysql_conn.connect() mysql_max_id = self._get_mysql_max_id(table)
try: ch_max_id = self._get_last_id_from_ch(table)
with mysql_conn.cursor() as cursor: safe_start = max(mysql_max_id - self.recent_check_count, ch_max_id + 1)
cursor.execute(f"SELECT MAX(id) FROM {table}")
max_id = cursor.fetchone()[0] or 0
if max_id <= 0: if safe_start > mysql_max_id:
return logger.info("⏩ 无需近期数据同步")
return
start_id = max(max_id - self.recent_check_count, 0) with self.mysql_conn.connect() as mysql_conn:
try:
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
cursor.execute(
f"SELECT * FROM {table} "
f"WHERE id BETWEEN {safe_start} AND {mysql_max_id} "
f"ORDER BY id"
)
total = mysql_max_id - safe_start + 1
progress = tqdm(total=total, desc="近期数据", unit="rec")
batch = []
insert_count = 0
with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor: while True:
cursor.execute( row = cursor.fetchone()
f"SELECT * FROM {table} WHERE id >= {start_id} ORDER BY id" if not row:
) break
total = max_id - start_id + 1 mapped = self.mapper.map_row(self.columns, row)
progress = tqdm(total=total, desc="近期数据", unit="rec") batch.append(mapped)
batch = []
while True: if len(batch) >= batch_size:
row = cursor.fetchone() self._insert_batch(batch, table)
if not row: insert_count += 1
break progress.update(len(batch))
batch = []
mapped = self.mapper.map_row(self.columns, row) if insert_count % self.optimize_frequency == 0:
batch.append(mapped) self._optimize_table(table)
if len(batch) >= batch_size: if batch:
self._insert_batch(batch, table) self._insert_batch(batch, table)
progress.update(len(batch)) progress.update(len(batch))
batch = []
if batch:
self._insert_batch(batch, table)
progress.update(len(batch))
progress.close() progress.close()
except Exception as e: except Exception as e:
logger.error(f"近期数据同步失败: {str(e)}") logger.error(f"近期数据同步失败: {str(e)}")
finally: raise
mysql_conn.close()
logger.info(f"🆗 近期数据同步范围ID: {start_id}-{max_id}") logger.info(f"🆗 近期数据同步范围ID: {safe_start}-{mysql_max_id}")
def _sync_with_cursor(self, cursor, total, table, batch_size): def _sync_with_cursor(self, cursor, total, table, batch_size):
"""通用同步流程""" """通用同步流程"""
progress = tqdm(total=total, desc="同步进度", unit="rec") progress = tqdm(total=total, desc="同步进度", unit="rec")
batch = [] batch = []
insert_count = 0
last_success_time = time.time() last_success_time = time.time()
while True: while True:
try: try:
if time.time() - last_success_time > 300: # 5分钟连接检查 # 每5分钟检查连接
if time.time() - last_success_time > 300:
cursor.connection.ping(reconnect=True) cursor.connection.ping(reconnect=True)
last_success_time = time.time() last_success_time = time.time()
@ -147,12 +159,17 @@ class SyncService:
if len(batch) >= batch_size: if len(batch) >= batch_size:
self._insert_batch(batch, table) self._insert_batch(batch, table)
insert_count += 1
progress.update(len(batch)) progress.update(len(batch))
batch = [] batch = []
last_success_time = time.time() last_success_time = time.time()
# 定期优化表
if insert_count % self.optimize_frequency == 0:
self._optimize_table(table)
except (OperationalError, InterfaceError) as e: except (OperationalError, InterfaceError) as e:
logger.warning("连接中断,尝试重新连接...") logger.warning("⚠️ 连接中断,尝试重新连接...")
cursor.connection.ping(reconnect=True) cursor.connection.ping(reconnect=True)
time.sleep(5) time.sleep(5)
continue continue
@ -163,25 +180,11 @@ class SyncService:
progress.close() progress.close()
def _get_last_id_from_ch(self, table):
"""获取ClickHouse最大ID"""
try:
result = self.ch_conn.connect().execute(f"SELECT max(id) FROM {table}")
return result[0][0] or 0
except Exception as e:
logger.error(f"获取最大ID失败: {str(e)}")
return 0
def _load_table_columns(self, table):
"""加载表的列信息"""
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}")
return [row[0] for row in result]
def _insert_batch(self, batch, table): def _insert_batch(self, batch, table):
"""带内存监控的批量插入""" """安全批量插入"""
mem = psutil.virtual_memory() mem = psutil.virtual_memory()
if mem.percent > 90: if mem.percent > 90:
logger.warning("内存使用超过90%暂停处理60秒") logger.warning("🛑 内存使用超过90%暂停处理60秒")
time.sleep(60) time.sleep(60)
try: try:
@ -193,9 +196,39 @@ class SyncService:
'date_time_input_format': 'best_effort', 'date_time_input_format': 'best_effort',
'allow_experimental_analyzer': 0, 'allow_experimental_analyzer': 0,
'input_format_null_as_default': 1, 'input_format_null_as_default': 1,
'max_partitions_per_insert_block': 500 'max_partitions_per_insert_block': 1000,
'optimize_on_insert': 1
} }
) )
except Exception as e: except Exception as e:
logger.error(f"批量插入失败: {str(e)}") logger.error(f"❌ 批量插入失败: {str(e)}")
# 可选添加重试逻辑 # 可添加重试逻辑
def _optimize_table(self, table):
"""优化ClickHouse表"""
try:
self.ch_conn.connect().execute(f"OPTIMIZE TABLE {table} FINAL")
logger.debug(f"{table}优化完成")
except Exception as e:
logger.warning(f"表优化失败: {str(e)}")
def _get_last_id_from_ch(self, table):
"""获取ClickHouse最大ID"""
try:
result = self.ch_conn.connect().execute(f"SELECT max(id) FROM {table}")
return result[0][0] or 0
except Exception as e:
logger.error(f"获取最大ID失败: {str(e)}")
return 0
def _get_mysql_max_id(self, table):
"""获取MySQL最大ID"""
with self.mysql_conn.connect() as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT MAX(id) FROM {table}")
return cursor.fetchone()[0] or 0
def _load_table_columns(self, table):
"""加载表结构"""
result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}")
return [row[0] for row in result]
Loading…
Cancel
Save