From 2f23e7541aafb3e248035231696dcab3ae494bb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Fri, 17 Mar 2023 10:24:28 +0800 Subject: [PATCH] 'commit' --- ETL/Mars/Test.py | 18 +++++ ETL/Mars/Utils/MySQLHelper.py | 69 ++++++++++++++++++ .../__pycache__/MySQLHelper.cpython-37.pyc | Bin 0 -> 2421 bytes ETL/Mars/pack.json | 20 +++++ ETL/Mars/pack.py | 54 +++++++------- 5 files changed, 133 insertions(+), 28 deletions(-) create mode 100644 ETL/Mars/Test.py create mode 100644 ETL/Mars/Utils/MySQLHelper.py create mode 100644 ETL/Mars/Utils/__pycache__/MySQLHelper.cpython-37.pyc create mode 100644 ETL/Mars/pack.json diff --git a/ETL/Mars/Test.py b/ETL/Mars/Test.py new file mode 100644 index 0000000..d77084a --- /dev/null +++ b/ETL/Mars/Test.py @@ -0,0 +1,18 @@ +import datetime +import time + + +# 黄海定义的输出信息的办法,带当前时间 +def logInfo(msg): + i = datetime.datetime.now() + print(" %s %s" % (i, msg)) + + +if __name__ == '__main__': + cnt = 0 + while True: + logInfo("我还活着!") + time.sleep(1) + cnt = cnt + 1 + if cnt == 30: + break diff --git a/ETL/Mars/Utils/MySQLHelper.py b/ETL/Mars/Utils/MySQLHelper.py new file mode 100644 index 0000000..dc2ab01 --- /dev/null +++ b/ETL/Mars/Utils/MySQLHelper.py @@ -0,0 +1,69 @@ + +import pymysql.cursors + + +class MySQLHelper: + myVersion = 0.1 + # 配置 + user = 'root' + password = 'DsideaL147258369' + host = '10.10.14.230' + port = 22066 + db = 'test' + + def __init__(self, host=host, port=port, db=db, user=user, password=password, charset="utf8"): + self.host = host + self.user = user + self.port = port + self.password = password + self.charset = charset + self.db = db + + try: + self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.password, + db=self.db, charset=self.charset, cursorclass=pymysql.cursors.DictCursor) + self.cursor = self.conn.cursor() + + # 设置执行时间为8小时 + sql = "set session wait_timeout=288000" + self.cursor.execute(sql) + sql = "set session interactive_timeout=288000" + self.cursor.execute(sql) + + except Exception as e: + print('MySql Error : %d %s' % (e.args[0], e.args[1])) + + def query(self, sql: object) -> object: + try: + self.cursor.execute(sql) + result = self.cursor.fetchall() + return result + except Exception as e: + print('MySql Error: %s SQL: %s' % (e, sql)) + + def execute(self, sql): + try: + self.cursor.execute(sql) + self.conn.commit() + # 获取更新了多少条数据 + return self.cursor.rowcount + except Exception as e: + print('MySql Error: %s SQL: %s' % (e, sql)) + + def executeWithPara(self, sql, params): + try: + self.cursor.execute(sql, params) + self.conn.commit() + except Exception as e: + print('MySql Error: %s SQL: %s' % (e, sql)) + + def executemany(self, sql, data): + try: + self.cursor.executemany(sql, data) + self.conn.commit() + except Exception as e: + print('MySql Error: %s SQL: %s' % (e, sql)) + + def close(self): + self.cursor.close() + self.conn.close() diff --git a/ETL/Mars/Utils/__pycache__/MySQLHelper.cpython-37.pyc b/ETL/Mars/Utils/__pycache__/MySQLHelper.cpython-37.pyc new file mode 100644 index 0000000000000000000000000000000000000000..94f2325f8eb24dad5a1af984df4d56aa049f6efe GIT binary patch literal 2421 zcmd5-Pj4GV6rY(L@2>wOaY&ldKNKG*k|h+()B zkg!h>)D9fM0xq1OBzWM8Qs4nm43`D{?Tvb2c*Ym3WQmy+d2gaq73Z7lDZ?UmRA>#>xfB2agr>OVo` zG$$d+IY^o_kSs5ObaEFY&mEAZeqh{|my;HO3OX&=(}I;2Xtp@4`K_{`KY~VBftkvF z^#C-(s$eg1S9QOZdk2&P)W^pgq;rcCGxfBh{-ETgPsyNRrUlRa){H2fBSq_;Bin3& zyoUW7czK}r%70A0Cmr?}MgWz!*7Pjj$G}=+-O`Djmuh5K=%!bW?hw_QFsRm`DFdVv`h(w_o0BV#CqL+yRUcCz>Bj^N5%#^ zg{?ZM+)ZM0rx;j{tJWPZSx>%@anDFwzV{-QT?3BTQdfbgmPhJ|Zk8?AGTj~;fZr}Y zHZ-M*Lo~oT@LfygZ$vG?@b9a4Z)^&^k*TMfTalLQQawvzxq0v5>gIibaq~NqqI}{xq(gBQdbYIN zldAs#baf2`p$q`2AS84!I|)dkcj!LmutWDDx=H`i3HRB)b3lebw0HqQ2_Sd?5U>IQ z7a(B&f`Gk(Fq#Oaw?W_?L7=9<1fXHoZYL)2RrWm2dL1)?hjWC-X*d)>egub1qQHDa z0i$W0_!dV5kI{cD$7t{nCi)l0IOOaAJcZE+2}m!?Km-mbtQ``e^q+~~occg1iM9 z%SB|O(|}Qz0D_yRfPtZTk{9@?7?Ug(Y%yIFkK4P@88TxO6>OhkuXr-(TGJ`6(U#&g zgVzG-+@wRw9uHX67@vsLBM_>`!?f7H)~sU!>_lxDhPD!h?M(Di?AO9@w-=?um7v`2 zKavXHn~4-$JPt>>84b5?w$*|+KwW@A5i06E>>)ITK@(Y}Fc(vcLf`Q)jYs&DGY)B` s__2kBy6=~L&-V(grW4GJ-kZhIE2A(Q6^o~xZT*`=mu}%lGz#DO2L(e7DF6Tf literal 0 HcmV?d00001 diff --git a/ETL/Mars/pack.json b/ETL/Mars/pack.json new file mode 100644 index 0000000..545481f --- /dev/null +++ b/ETL/Mars/pack.json @@ -0,0 +1,20 @@ +{ + "t_zy_score": [ + { + "sql": "update t_zy_score as t1 left join t_base_person as t2 on t1.person_id=t2.person_id set t1.check_flag=-1 where t2.person_id is null limit 100", + "memo": "检查人员ID不存在的情况" + }, + { + "sql": "update t_zy_score as t1 left join t_zy_main as t2 on t1.zy_id=t2.zy_id set t1.zy_name=t2.zy_name where t1.zy_name is null limit 100", + "memo": "扩展作业名称字段内容" + }, + { + "sql": "update t_zy_score as t1 set t1.check_flag=-1 where t1.zy_name is null limit 100", + "memo": "没补上名称的就是坏人" + }, + { + "sql": "update t_zy_score as t1 set check_flag=1 where check_flag=0 limit 100", + "memo": "打上成功标识" + } + ] +} \ No newline at end of file diff --git a/ETL/Mars/pack.py b/ETL/Mars/pack.py index 28d33f2..a8771eb 100644 --- a/ETL/Mars/pack.py +++ b/ETL/Mars/pack.py @@ -1,15 +1,20 @@ # python -m pip install --upgrade pip -i http://pypi.douban.com/simple/ # pip install pymysql -i http://pypi.douban.com/simple/ import datetime -import sys +import json import pymysql.cursors class MySQLHelper: - myVersion = 0.1 + # 配置 + user = 'root' + password = 'DsideaL147258369' + host = '10.10.14.230' + port = 22066 + db = 'huanghai' - def __init__(self, host, port, db, user, password, charset="utf8"): + def __init__(self, host=host, port=port, db=db, user=user, password=password, charset="utf8"): self.host = host self.user = user self.port = port @@ -73,30 +78,23 @@ def logInfo(msg): print(" %s %s" % (i, msg)) -# 准备扩展 -_dict = { - "t_resource_base": "update t_resource_base t1,t_dm_stage t2 set t1.CHECK_MESSAGE=t2.stage_name where t1.stage_id=t2.stage_id and t1.CHECK_MESSAGE is null limit 100"} - if __name__ == '__main__': - if len(sys.argv) == 1: - print("没有正确输入参数,请检查!输入样例: python pack.py t_resource_base") - sys.exit() - tableName = sys.argv[1] - - # 配置 - user = 'root' - password = 'DsideaL147258369' - host = '10.10.14.230' - port = 22066 - database = 'test' - - db = MySQLHelper(user=user, host=host, port=port, password=password, db=database) - - while True: - sql = _dict[tableName] - cnt = db.execute(sql) - logInfo("成功更新%s条" % cnt) - if cnt == 0: - break - + with open("./pack.json", "r", encoding="utf-8") as f: + _dict = json.load(f) + + # + allcnt = 0 + db = MySQLHelper() + for _bean in _dict: + actions = _dict[_bean] + for action in actions: + memo = action["memo"] + sql = action["sql"] + while True: + cnt = db.execute(sql) + allcnt = allcnt + cnt + if cnt == 0: + break + logInfo(memo + ",更新%s条" % cnt) db.close() + logInfo("恭喜,所有清洗工作成功完成,共清洗%s条" % allcnt)