diff --git a/ETL/Mars/Test.py b/ETL/Mars/Test.py new file mode 100644 index 0000000..d77084a --- /dev/null +++ b/ETL/Mars/Test.py @@ -0,0 +1,18 @@ +import datetime +import time + + +# 黄海定义的输出信息的办法,带当前时间 +def logInfo(msg): + i = datetime.datetime.now() + print(" %s %s" % (i, msg)) + + +if __name__ == '__main__': + cnt = 0 + while True: + logInfo("我还活着!") + time.sleep(1) + cnt = cnt + 1 + if cnt == 30: + break diff --git a/ETL/Mars/Utils/MySQLHelper.py b/ETL/Mars/Utils/MySQLHelper.py new file mode 100644 index 0000000..dc2ab01 --- /dev/null +++ b/ETL/Mars/Utils/MySQLHelper.py @@ -0,0 +1,69 @@ + +import pymysql.cursors + + +class MySQLHelper: + myVersion = 0.1 + # 配置 + user = 'root' + password = 'DsideaL147258369' + host = '10.10.14.230' + port = 22066 + db = 'test' + + def __init__(self, host=host, port=port, db=db, user=user, password=password, charset="utf8"): + self.host = host + self.user = user + self.port = port + self.password = password + self.charset = charset + self.db = db + + try: + self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.password, + db=self.db, charset=self.charset, cursorclass=pymysql.cursors.DictCursor) + self.cursor = self.conn.cursor() + + # 设置执行时间为8小时 + sql = "set session wait_timeout=288000" + self.cursor.execute(sql) + sql = "set session interactive_timeout=288000" + self.cursor.execute(sql) + + except Exception as e: + print('MySql Error : %d %s' % (e.args[0], e.args[1])) + + def query(self, sql: object) -> object: + try: + self.cursor.execute(sql) + result = self.cursor.fetchall() + return result + except Exception as e: + print('MySql Error: %s SQL: %s' % (e, sql)) + + def execute(self, sql): + try: + self.cursor.execute(sql) + self.conn.commit() + # 获取更新了多少条数据 + return self.cursor.rowcount + except Exception as e: + print('MySql Error: %s SQL: %s' % (e, sql)) + + def executeWithPara(self, sql, params): + try: + self.cursor.execute(sql, params) + self.conn.commit() + except Exception as e: + print('MySql Error: %s SQL: %s' % (e, sql)) + + def executemany(self, sql, data): + try: + self.cursor.executemany(sql, data) + self.conn.commit() + except Exception as e: + print('MySql Error: %s SQL: %s' % (e, sql)) + + def close(self): + self.cursor.close() + self.conn.close() diff --git a/ETL/Mars/Utils/__pycache__/MySQLHelper.cpython-37.pyc b/ETL/Mars/Utils/__pycache__/MySQLHelper.cpython-37.pyc new file mode 100644 index 0000000..94f2325 Binary files /dev/null and b/ETL/Mars/Utils/__pycache__/MySQLHelper.cpython-37.pyc differ diff --git a/ETL/Mars/pack.json b/ETL/Mars/pack.json new file mode 100644 index 0000000..545481f --- /dev/null +++ b/ETL/Mars/pack.json @@ -0,0 +1,20 @@ +{ + "t_zy_score": [ + { + "sql": "update t_zy_score as t1 left join t_base_person as t2 on t1.person_id=t2.person_id set t1.check_flag=-1 where t2.person_id is null limit 100", + "memo": "检查人员ID不存在的情况" + }, + { + "sql": "update t_zy_score as t1 left join t_zy_main as t2 on t1.zy_id=t2.zy_id set t1.zy_name=t2.zy_name where t1.zy_name is null limit 100", + "memo": "扩展作业名称字段内容" + }, + { + "sql": "update t_zy_score as t1 set t1.check_flag=-1 where t1.zy_name is null limit 100", + "memo": "没补上名称的就是坏人" + }, + { + "sql": "update t_zy_score as t1 set check_flag=1 where check_flag=0 limit 100", + "memo": "打上成功标识" + } + ] +} \ No newline at end of file diff --git a/ETL/Mars/pack.py b/ETL/Mars/pack.py index 28d33f2..a8771eb 100644 --- a/ETL/Mars/pack.py +++ b/ETL/Mars/pack.py @@ -1,15 +1,20 @@ # python -m pip install --upgrade pip -i http://pypi.douban.com/simple/ # pip install pymysql -i http://pypi.douban.com/simple/ import datetime -import sys +import json import pymysql.cursors class MySQLHelper: - myVersion = 0.1 + # 配置 + user = 'root' + password = 'DsideaL147258369' + host = '10.10.14.230' + port = 22066 + db = 'huanghai' - def __init__(self, host, port, db, user, password, charset="utf8"): + def __init__(self, host=host, port=port, db=db, user=user, password=password, charset="utf8"): self.host = host self.user = user self.port = port @@ -73,30 +78,23 @@ def logInfo(msg): print(" %s %s" % (i, msg)) -# 准备扩展 -_dict = { - "t_resource_base": "update t_resource_base t1,t_dm_stage t2 set t1.CHECK_MESSAGE=t2.stage_name where t1.stage_id=t2.stage_id and t1.CHECK_MESSAGE is null limit 100"} - if __name__ == '__main__': - if len(sys.argv) == 1: - print("没有正确输入参数,请检查!输入样例: python pack.py t_resource_base") - sys.exit() - tableName = sys.argv[1] - - # 配置 - user = 'root' - password = 'DsideaL147258369' - host = '10.10.14.230' - port = 22066 - database = 'test' - - db = MySQLHelper(user=user, host=host, port=port, password=password, db=database) - - while True: - sql = _dict[tableName] - cnt = db.execute(sql) - logInfo("成功更新%s条" % cnt) - if cnt == 0: - break - + with open("./pack.json", "r", encoding="utf-8") as f: + _dict = json.load(f) + + # + allcnt = 0 + db = MySQLHelper() + for _bean in _dict: + actions = _dict[_bean] + for action in actions: + memo = action["memo"] + sql = action["sql"] + while True: + cnt = db.execute(sql) + allcnt = allcnt + cnt + if cnt == 0: + break + logInfo(memo + ",更新%s条" % cnt) db.close() + logInfo("恭喜,所有清洗工作成功完成,共清洗%s条" % allcnt)