From 154c2de3ec0afdc93a61559cc0d5fd8753f5dcef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Sat, 22 Feb 2025 18:32:02 +0800 Subject: [PATCH] 'commit' --- .../__pycache__/sync_service.cpython-310.pyc | Bin 2803 -> 4108 bytes ClickHouse/services/sync_service.py | 171 ++++++++++++------ 2 files changed, 120 insertions(+), 51 deletions(-) diff --git a/ClickHouse/services/__pycache__/sync_service.cpython-310.pyc b/ClickHouse/services/__pycache__/sync_service.cpython-310.pyc index 2eb9c3666b3e34ddac2beb2c1aff24e67ee7b1a4..3f0ce418aab6a2ec76d8aedb368c915776d8c278 100644 GIT binary patch literal 4108 zcmahMYjf1rRaeq#wc7Q1ZOp?!Ktt0Gn|3_VbW-YJXo9^B?zm=ZJ9XL8(JZ>MSzEhW z>k+dWZH5%cqXld-G^7F2T?XRx1EHO^d9s=izaF2!L{wm|Py;yvdbFo|O1JPjD`n zLP^LAY)nisFEZR%KE~csUNWS5+>je2pLmg@B8{ErXzU!HS11|bveFVz=Z3O;m^ofC z?1Edg$^|oL+m;=s2gC`>cuO=v6?n&K5~N6l$&qSVA2IBgi@KqUKxg&BF*4|b$JqxUAOWXw z0S}M_0zhB|P@o(@P#(zfLm^+J*;uHQs?M~j=~lTM#`V+DKq?g~plX<`*u_%8u4+?8 z6;>_NE0vwB7>bTzj@xlyWy=T>0Di^UZSeZnbN#(1sWWETQzyS)EKdwt<%yGhW>KFy zW_gZr(uovsPCB5CwleFfR6|+QishoKX(^!Mpy4GF-3sx$*67}|im0Q~s~7qL9t1mt zr(qCqlU$uNxH>9Nu5&l}2C4Csr^0zSnttva+E(aZiv}#uF_3~F*sq7 zbH(a630D&d=XbcVxKhRi_hdb@z_!#`er?177y zKW@&x-MVqMd3%1K-%pO@26KI*YN>E$zaP)4-x)ssJ(Zcw+Xh?LV>fUAX7P7-dexz< z96A0LK(XZd)ZyKiv(uPA34KSI7?1Nz$YX9{DW)xv#D>%qE+4jF8m}iDckm zmT7^*1za7{Mg!9XJaw=K+QJp;B18{Z2&0E!Ou^|lz!G!BKNyJhc!QhG)?$(UD5RQ6 z8LTPd4jQ`|uSr3yo(LqiH&#~~9F@AcTAb012XVlXZwk0N#?gcbmOckrx-1{$obS%S zSG-TagPh#tWNNalL1zD4lc|FKvVmW6g?sE~qLv6`;G00oZaRq56m6sFOF~VV;A+W0 zsHNQYdOPURmf-xsXdJZd;Amz_wC4hGGDF*G$0gjof(P_K`dT_jJMWBf20lwb>zP}UBt($fRDbRgObZ#4D-ATKm(}Z|P)H+`2m=2;h zM#FS1NX`)(tn}X`0C{#b;JlfAwj!U%?(3|RILOTrXXje`jDdxK@_-Lf8BxF;EOK|x zk?OG(`8$B_Q*@iVC+OG^yG&Nx@mN}%F{-vNJ*GN?*#K!yuh7i=Z<;e}C_+Bi?(G1t zZ@oFc*tpod_1@C!v#p>1x^?Np=G?{B-8hxu7;-E0i`ho0Dq(yoa2%;u~kx%OzkEZtLy4aHx=$2l`bH zGpE{m^;YxtrJkOiB`g||A(%~?A6{&|F$dU~vF9#F0-zzhr%;{?zg@a^H=^6R_EB^8 z%EHxOE?xb_;=OylYId)U_0&!x*n81r6&|B@D2(XT2JJ%Etg^|*n zqHpBmD}{n-l~3uGS9Y~X;(RiiO?!r2wN)hb6#!w3eF)`>ZB0Nk;Dj+oFQ0~OHWVfe z3L=!AdPYK-z=>zXQe|t%rayjuKrtD<^eB5R5s(6{jF=O z=@%NTE1~O}n>kR+W$h+5Fq6=7LQ&Yl(kLsOtZ89|&fg9NB3_pp+++d@CC>__09~Vm zZ9lgBHw2oz1g#FF&^DGrmFkO-?4ssJs3&R4O+C^4w6W#~@F^Ry<|pl;(>FhPzxmpQ z=G}J|u3u?go`)8<_2#FT_-5Z~UbwTA2NW-hcns;`{RazIKUnba>JwQz|iP%)sL$`cwz9R+{kD?dCGvE#TbVW zb6L__!WJ0P#5VMJ=0Fe&70=ZGqf~J7y9%ah&1l9M_~8KWDnl;O3h;wf^`U#(y%tp~ zdnT+o8a2iX(7i%Soph@e!-3Yw(5FJ#Ffd-Dwx;Z)V-k$c x=_riCICj05#uCth4zdo|&LIjrR3yF`*HAtj$$kWV#mNAWxeP`wyA{4u_z$oQPYVD5 literal 2803 zcmai0-ESL35Z~Q9pU-C}MExWXfmIOHP@*DGA5f|4Brc7VI1sy3wSrFW?Im?{zDsV; zw6V?)kRr-Q2?YTXU$TXY=mTF52mwO=Y* zslhU@TiEF@3_3@liYGzX1SXLRkhMK6B~;o<$U2llq|n9CS36{atkRR&vLUU{5|ht7c>KBu7a1b9koVBoL6_5J$V``0^P{Py96^)Elz`0BggzWr(a(>H#- z*4cRJ>*i-hwp&Vtsls?UJ3cdYV!CJ+^V5X`*@=8PZ|{6R9Gjh)&Q2DO z&CE{c%ab!jvowCZFr6Qbl}lTSCyy6q3)ymhY^u=J$v!PtAIp~t`*T~0*=)Wz(N#nn zK<(kC@$lx37KJnQ3VIepT7oL}fv5paUKnVBPdOO|9C0co8z%GYUAD}?D~gXR+>iq2 zOE6qz5gR=iDZY13_G5w9D>4rAniW`Hed)+<=nI^)k2DYVhkC&d%%FPOu}xu}vFp{* z!;WWF9dTrIYjQD{kU&Iux~^>nk!Ce~;Q?Ol^S0Fp>`0%t1M5uHadN7UAc+!k(I^wI zYA!cyf5qPkBioG}+HOG_O&tWm7*q9|Lz+Q{=!3VKBmZUgZAeA-K(tDvqHnj*iJ09> zI<&=N9tjy@5`3#R&_(O7x3B;Fa~E+JKj;GOu8Bfve0FlIi-PPw0*RZ+K<M(-3!KHrzYZ14a2Yu)h^O}qW{dM<5hV#SY4jzi`M4+t4d55J8&B` z2ylZT@%Mm;G~X5tC)mFK&O0oQ`T~!(iw)A|{r_W%gm;vfJvByGTS2iXzo zS=LrM1Xu!FzzZ!E_#pwzvAdNB6K$;*B}$-|>;T6Mla_W8GC_|wTpf7hKtFGUs$Ahz zD;XwX1qI{GRg5(yfgWl|@jQ7+Y3U23l}ZuNf|XlFXozRs2f&h4+W<*Jqr+P1Fx5_n zX*o9&rdDal78AY?rx*xRa1I8R>FvQlj-eK=xjY%B{Fnb-2WH&W0fy^82j&Gh$>si& z$XuZpo4Yq@?L@J>|B+l)QkDc#bFe%1Nc^#x$2W6J<(SS8c_AeqO<{2=)x(wBU10Z=WXgq-dVp($}j<=b(d5||QU z|0*6^90HMHhI|I;ARW17s5{`TQpR+Ybch)=!!q%=Khyzi8EoJ05S^`mg3--YZ@UF} zBQ|fr0q6j}WR^S&KOi9kd_o1LhYT5`^J9hUmsH4sDPSa{aX(Dic-0$hzuvEK+keU{*V um5*7|?`01WlbB9*O;77u=0Q19%&{2osCK9Mk$yO~;t1wx$)6drL-`w}v9ekK diff --git a/ClickHouse/services/sync_service.py b/ClickHouse/services/sync_service.py index eb1b0de0..e610c76c 100644 --- a/ClickHouse/services/sync_service.py +++ b/ClickHouse/services/sync_service.py @@ -1,4 +1,11 @@ +import pymysql +from pymysql import OperationalError, InterfaceError from tqdm import tqdm +import time +import psutil +import logging + +logger = logging.getLogger(__name__) class SyncService: @@ -6,70 +13,132 @@ class SyncService: self.mysql_conn = mysql_conn self.ch_conn = ch_conn self.mapper = mapper - self.columns = None # 初始化为None,稍后加载 - - def get_table_schema(self, table): - """获取表的字段类型信息""" - schema_query = f""" - SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_SCHEMA = DATABASE() - AND TABLE_NAME = '{table}' - """ - with self.mysql_conn.connect().cursor() as cursor: - cursor.execute(schema_query) - return {row[0]: (row[1], row[2]) for row in cursor.fetchall()} - def _load_table_columns(self,table): + self.primary_key = 'id' # 主键字段名 + self.columns = None + + + def _load_table_columns(self, table): """加载表的列信息""" - result = self.ch_conn.connect().execute("DESCRIBE TABLE "+table) + result = self.ch_conn.connect().execute(f"DESCRIBE TABLE {table}") return [row[0] for row in result] - # 同步数据 + def _get_last_id_from_ch(self, table): + """从ClickHouse获取最大ID""" + try: + client = self.ch_conn.connect() + result = client.execute(f"SELECT max({self.primary_key}) FROM {table}") + return result[0][0] or 0 + except Exception as e: + logger.error(f"获取最大ID失败: {str(e)}") + return 0 + def sync_data(self, table, batch_size): - # 加载表的列信息 + """执行增量数据同步""" + # 加载列信息 self.columns = self._load_table_columns(table) - # 统计个数的连接 - count_conn = self.mysql_conn.create_count_connection() - # 读取数据的连接 + + # 获取最后同步ID + last_id = self._get_last_id_from_ch(table) + logger.info(f"开始增量同步,起始ID: {last_id}") + + # 使用同一个MySQL连接 mysql_conn = self.mysql_conn.connect() - with count_conn.cursor() as count_cursor: - count_cursor.execute("SELECT COUNT(*) FROM " + table) - total = count_cursor.fetchone()[0] + try: + # 统计需要同步的总数 + with mysql_conn.cursor() as count_cursor: + count_query = f""" + SELECT COUNT(*) + FROM {table} + WHERE {self.primary_key} > {last_id} + """ + count_cursor.execute(count_query) + total = count_cursor.fetchone()[0] + logger.info(f"待同步数据量: {total} 条") - with mysql_conn.cursor() as cursor: - cursor.execute("SELECT * FROM " + table + " ORDER BY id") - progress = tqdm(total=total, desc="同步进度", unit="rec") - batch = [] + # 使用流式游标读取数据 + with mysql_conn.cursor(pymysql.cursors.SSCursor) as cursor: + query = f""" + SELECT * + FROM {table} + WHERE {self.primary_key} > {last_id} + ORDER BY {self.primary_key} + """ + cursor.execute(query) - while True: - row = cursor.fetchone() - if not row: - break + progress = tqdm(total=total, desc="同步进度", unit="rec") + batch = [] + current_max_id = last_id + last_success_time = time.time() - mapped = self.mapper.map_row(self.columns, row) - batch.append(mapped) + while True: + try: + # 每5分钟检查连接 + if time.time() - last_success_time > 300: + mysql_conn.ping(reconnect=True) + last_success_time = time.time() - if len(batch) >= batch_size: - self._insert_batch(batch,table) + row = cursor.fetchone() + if not row: + break + + # 更新当前最大ID + current_id = row[0] + if current_id > current_max_id: + current_max_id = current_id + + # 映射数据 + mapped = self.mapper.map_row(self.columns, row) + batch.append(mapped) + + # 批量插入 + if len(batch) >= batch_size: + self._insert_batch(batch, table) + progress.update(len(batch)) + batch = [] + last_success_time = time.time() + + except (OperationalError, InterfaceError) as e: + logger.warning(f"连接中断,从ID {current_max_id} 恢复...") + mysql_conn.ping(reconnect=True) + time.sleep(5) + continue + + # 插入剩余数据 + if batch: + self._insert_batch(batch, table) progress.update(len(batch)) - batch = [] - if batch: - self._insert_batch(batch,table) - progress.update(len(batch)) + progress.close() + logger.info(f"同步完成,最后同步ID: {current_max_id}") - progress.close() + finally: + if mysql_conn: + try: + mysql_conn.close() + except Exception as e: + logger.error(f"关闭连接时发生错误: {str(e)}") def _insert_batch(self, batch, table): - """批量插入数据到ClickHouse""" - self.ch_conn.connect().execute( - 'INSERT INTO ' + table + ' VALUES', - batch, - types_check=True, - settings={ - 'date_time_input_format': 'best_effort', - 'allow_experimental_analyzer': 0, - 'input_format_null_as_default': 1 - } - ) + """带内存监控的批量插入""" + # 内存检查 + mem = psutil.virtual_memory() + if mem.percent > 90: + logger.warning("内存使用超过90%,暂停处理60秒") + time.sleep(60) + + # 执行插入 + try: + self.ch_conn.connect().execute( + f'INSERT INTO {table} VALUES', + batch, + types_check=True, + settings={ + 'date_time_input_format': 'best_effort', + 'allow_experimental_analyzer': 0, + 'input_format_null_as_default': 1 + } + ) + except Exception as e: + logger.error(f"批量插入失败: {str(e)}") + # 可选:实现重试逻辑 \ No newline at end of file