From 06f103e9896ece1bf7ff02896fc40702611a02c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Fri, 17 Mar 2023 11:15:54 +0800 Subject: [PATCH] 'commit' --- ETL/Mars/MySQLHelper.py | 67 +++++++++++++++++++++++++++++++++++ ETL/Mars/fake.py | 77 +++++++++++++++++++++++++++++++++++++++++ ETL/Mars/pack.json | 8 ++--- ETL/Mars/pack.py | 73 ++------------------------------------ 4 files changed, 150 insertions(+), 75 deletions(-) create mode 100644 ETL/Mars/MySQLHelper.py create mode 100644 ETL/Mars/fake.py diff --git a/ETL/Mars/MySQLHelper.py b/ETL/Mars/MySQLHelper.py new file mode 100644 index 0000000..692a0d1 --- /dev/null +++ b/ETL/Mars/MySQLHelper.py @@ -0,0 +1,67 @@ +import pymysql.cursors + + +class MySQLHelper: + # 配置 + user = 'root' + password = 'DsideaL147258369' + host = '10.10.14.230' + port = 22066 + db = 'huanghai' + + def __init__(self, host=host, port=port, db=db, user=user, password=password, charset="utf8"): + self.host = host + self.user = user + self.port = port + self.password = password + self.charset = charset + self.db = db + + try: + self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.password, + db=self.db, charset=self.charset, cursorclass=pymysql.cursors.DictCursor) + self.cursor = self.conn.cursor() + + # 设置执行时间为8小时 + sql = "set session wait_timeout=288000" + self.cursor.execute(sql) + sql = "set session interactive_timeout=288000" + self.cursor.execute(sql) + + except Exception as e: + print('MySql Error : %d %s' % (e.args[0], e.args[1])) + + def query(self, sql: object) -> object: + try: + self.cursor.execute(sql) + result = self.cursor.fetchall() + return result + except Exception as e: + print('MySql Error: %s SQL: %s' % (e, sql)) + + def execute(self, sql): + try: + self.cursor.execute(sql) + self.conn.commit() + # 获取更新了多少条数据 + return self.cursor.rowcount + except Exception as e: + print('MySql Error: %s SQL: %s' % (e, sql)) + + def executeWithPara(self, sql, params): + try: + self.cursor.execute(sql, params) + self.conn.commit() + except Exception as e: + print('MySql Error: %s SQL: %s' % (e, sql)) + + def executemany(self, sql, data): + try: + self.cursor.executemany(sql, data) + self.conn.commit() + except Exception as e: + print('MySql Error: %s SQL: %s' % (e, sql)) + + def close(self): + self.cursor.close() + self.conn.close() diff --git a/ETL/Mars/fake.py b/ETL/Mars/fake.py new file mode 100644 index 0000000..d29bdaa --- /dev/null +++ b/ETL/Mars/fake.py @@ -0,0 +1,77 @@ +# pip install Faker -i https://pypi.douban.com/simple/ +import random + +# 导入生成数据类 +from faker import Faker +import datetime +from MySQLHelper import MySQLHelper + +maxN = 100000 + + +# 黄海定义的输出信息的办法,带当前时间 +def logInfo(msg): + i = datetime.datetime.now() + print(" %s %s" % (i, msg)) + + +# 设置显示的数据为中文 +shuju = Faker("zh_CN") +db = MySQLHelper() + +# 人员 +sql = 'truncate table t_base_person' +db.execute(sql) + +sql = "insert into t_base_person(person_name) values(%s)" +data = [] +for i in range(maxN): + obj = [shuju.name()] + data.append(obj) +db.executemany(sql, data) +logInfo("成功完成人员信息填充,共%s条!" % maxN) + +# 作业 +sql = 'truncate table t_zy_main' +db.execute(sql) + +sql = "insert into t_zy_main(zy_name) values(%s)" +data = [] +for i in range(maxN): + obj = [shuju.job()] + data.append(obj) +db.executemany(sql, data) +logInfo("成功完成作业信息填充,共%s条!" % maxN) + + +sql = 'truncate table t_zy_score' +db.execute(sql) + +# 存在的关系 +sql = "insert into t_zy_score(zy_id,person_id,score)values(%s,%s,%s)" +data = [] + +for x in range(1, 301): # 作业 + for y in range(1, 301): # 人员 + obj = [x, y, random.randint(1, 100)] + data.append(obj) +db.executemany(sql, data) +logInfo("成功完成作业与人员关系信息填充,共%s条!" % len(data)) + +# 不存在的关系A(作业不存在,人员存在) +data = [] +for x in range(1 + maxN, 301 + maxN): # 作业 + for y in range(1, 301): # 人员 + obj = [x, y, random.randint(1, 100)] + data.append(obj) +db.executemany(sql, data) +logInfo("成功完成作业不存在,人员存在的关系A填充,共%s条!" % len(data)) + +# 不存在的关系B(作业存在,人员不存在) +data = [] +for x in range(1, 301): # 作业 + for y in range(1 + maxN, 301 + maxN): # 人员 + obj = [x, y, random.randint(1, 100)] + data.append(obj) +db.executemany(sql, data) +logInfo("成功完成作业存在,人员不存在的关系B填充,共%s条!" % len(data)) diff --git a/ETL/Mars/pack.json b/ETL/Mars/pack.json index 545481f..1a7b4f4 100644 --- a/ETL/Mars/pack.json +++ b/ETL/Mars/pack.json @@ -1,19 +1,19 @@ { "t_zy_score": [ { - "sql": "update t_zy_score as t1 left join t_base_person as t2 on t1.person_id=t2.person_id set t1.check_flag=-1 where t2.person_id is null limit 100", + "sql": "update t_zy_score as t1 left join t_base_person as t2 on t1.person_id=t2.person_id set t1.check_flag=-1 where t2.person_id is null", "memo": "检查人员ID不存在的情况" }, { - "sql": "update t_zy_score as t1 left join t_zy_main as t2 on t1.zy_id=t2.zy_id set t1.zy_name=t2.zy_name where t1.zy_name is null limit 100", + "sql": "update t_zy_score as t1 left join t_zy_main as t2 on t1.zy_id=t2.zy_id set t1.zy_name=t2.zy_name where t1.zy_name is null", "memo": "扩展作业名称字段内容" }, { - "sql": "update t_zy_score as t1 set t1.check_flag=-1 where t1.zy_name is null limit 100", + "sql": "update t_zy_score as t1 set t1.check_flag=-1 where t1.zy_name is null", "memo": "没补上名称的就是坏人" }, { - "sql": "update t_zy_score as t1 set check_flag=1 where check_flag=0 limit 100", + "sql": "update t_zy_score as t1 set check_flag=1 where check_flag=0", "memo": "打上成功标识" } ] diff --git a/ETL/Mars/pack.py b/ETL/Mars/pack.py index a8771eb..6ed430b 100644 --- a/ETL/Mars/pack.py +++ b/ETL/Mars/pack.py @@ -3,73 +3,7 @@ import datetime import json -import pymysql.cursors - - -class MySQLHelper: - # 配置 - user = 'root' - password = 'DsideaL147258369' - host = '10.10.14.230' - port = 22066 - db = 'huanghai' - - def __init__(self, host=host, port=port, db=db, user=user, password=password, charset="utf8"): - self.host = host - self.user = user - self.port = port - self.password = password - self.charset = charset - self.db = db - - try: - self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.password, - db=self.db, charset=self.charset, cursorclass=pymysql.cursors.DictCursor) - self.cursor = self.conn.cursor() - - # 设置执行时间为8小时 - sql = "set session wait_timeout=288000" - self.cursor.execute(sql) - sql = "set session interactive_timeout=288000" - self.cursor.execute(sql) - - except Exception as e: - print('MySql Error : %d %s' % (e.args[0], e.args[1])) - - def query(self, sql: object) -> object: - try: - self.cursor.execute(sql) - result = self.cursor.fetchall() - return result - except Exception as e: - print('MySql Error: %s SQL: %s' % (e, sql)) - - def execute(self, sql): - try: - self.cursor.execute(sql) - self.conn.commit() - # 获取更新了多少条数据 - return self.cursor.rowcount - except Exception as e: - print('MySql Error: %s SQL: %s' % (e, sql)) - - def executeWithPara(self, sql, params): - try: - self.cursor.execute(sql, params) - self.conn.commit() - except Exception as e: - print('MySql Error: %s SQL: %s' % (e, sql)) - - def executemany(self, sql, data): - try: - self.cursor.executemany(sql, data) - self.conn.commit() - except Exception as e: - print('MySql Error: %s SQL: %s' % (e, sql)) - - def close(self): - self.cursor.close() - self.conn.close() +from MySQLHelper import MySQLHelper # 黄海定义的输出信息的办法,带当前时间 @@ -82,8 +16,6 @@ if __name__ == '__main__': with open("./pack.json", "r", encoding="utf-8") as f: _dict = json.load(f) - # - allcnt = 0 db = MySQLHelper() for _bean in _dict: actions = _dict[_bean] @@ -92,9 +24,8 @@ if __name__ == '__main__': sql = action["sql"] while True: cnt = db.execute(sql) - allcnt = allcnt + cnt if cnt == 0: break logInfo(memo + ",更新%s条" % cnt) db.close() - logInfo("恭喜,所有清洗工作成功完成,共清洗%s条" % allcnt) + logInfo("恭喜,所有清洗工作成功完成!")