import os import datetime from urllib import request # pip install pandas # pip install sqlalchemy # pip install requests # pip install pymysql import pandas as pd import requests from sqlalchemy import create_engine import json import pymysql from dbutils.pooled_db import PooledDB class MysqlClient(object): __pool = None def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True, maxusage=100, setsession=None, reset=True, host="gtzz.dsideal.com", port=22066, db="gtzz_base_db", user="root", passwd="DsideaL147258369", charset='utf8mb4'): """ :param mincached:连接池中空闲连接的初始数量 :param maxcached:连接池中空闲连接的最大数量 :param maxshared:共享连接的最大数量 :param maxconnections:创建连接池的最大数量 :param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理 :param maxusage:单个连接的最大重复使用次数 :param setsession:optional list of SQL commands that may serve to prepare the session, e.g. ["set datestyle to ...", "set time zone ..."] :param reset:how connections should be reset when returned to the pool (False or None to rollback transcations started with begin(), True to always issue a rollback for safety's sake) :param host:数据库ip地址 :param port:数据库端口 :param db:库名 :param user:用户名 :param passwd:密码 :param charset:字符编码 """ if not self.__pool: self.__class__.__pool = PooledDB(pymysql, mincached, maxcached, maxshared, maxconnections, blocking, maxusage, setsession, reset, host=host, port=port, db=db, user=user, passwd=passwd, charset=charset, cursorclass=pymysql.cursors.DictCursor ) self._conn = None self._cursor = None self.__get_conn() def __get_conn(self): self._conn = self.__pool.connection() self._cursor = self._conn.cursor() def close(self): try: self._cursor.close() self._conn.close() except Exception as e: print(e) def __execute(self, sql, param=()): count = self._cursor.execute(sql, param) # print(count) self._conn.commit() return count @staticmethod def __dict_datetime_obj_to_str(result_dict): """把字典里面的datatime对象转成字符串,使json转换不出错""" if result_dict: result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)} result_dict.update(result_replace) return result_dict def find(self, sql, param=()): """查询单个结果""" count = self.__execute(sql, param) result = self._cursor.fetchone() """:type result:dict""" result = self.__dict_datetime_obj_to_str(result) return count, result def findList(self, sql, param=()): """ 查询多个结果 :param sql: qsl语句 :param param: sql参数 :return: 结果数量和查询结果集 """ count = self.__execute(sql, param) result = self._cursor.fetchall() """:type result:list""" [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result] return count, result def execute(self, sql, param=()): count = self.__execute(sql, param) return count def begin(self): """开启事务""" self._conn.autocommit(0) def end(self, option='commit'): """结束事务""" if option == 'commit': self._conn.autocommit() else: self._conn.rollback() def printf(str): now_time = datetime.datetime.now() str_time = now_time.strftime("%Y-%m-%d %X") # 格式化时间字符串 print(str_time + " " + str) def SyncData(): # 连接数据库的方法 connectString = 'mysql+pymysql://%s:%s@%s:%s/%s?charset=utf8' % ( "root", "DsideaL147258369", "gtzz.dsideal.com", 22066, "gtzz_base_db") engine = create_engine(connectString) # use sqlalchemy to build link-engine # 当前日期 cnt = 0 # 申请数据库连接 db = MysqlClient() dateStr = datetime.datetime.now().strftime('%Y%m%d') # dateStr = '20211115' text_lines = ['https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_PARA.json', 'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_ATTACH.json', 'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_FORM_VALUE.json', 'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_DETAIL.json', 'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_FORM_ITEM.json', 'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_TARGET.json', 'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_ATTACH_TYPE.json', 'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_TASK.json', 'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_FORM.json', 'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_MESSAGE.json', 'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_CONFIG.json' ] # 按行遍历 for line in text_lines: cnt = cnt + 1 url = line.replace('#(date)', dateStr) # 去掉最后的换行符 url = url.rstrip("\n") # 表名 table_name = os.path.basename(url).split(".")[0].lower() # 尝试下载 html = requests.head(url) # 用head方法去请求资源头部 if html.status_code != 200: # 状态码 printf("没有找到指定的下载文件,程序无法继续!") exit(0) # 下载文件 r = request.Request(url) response = request.urlopen(r) # 中文编码转换 s = response.read() s = s.decode('gbk') s = s.encode('gbk', 'ignore').decode('gbk') s1 = json.loads(s) try: x = pd.DataFrame(s1["DATA"]) # https://www.jianshu.com/p/d615699ff254 # DataFrame 的处理逻辑是如果目标表存在,则删除重建,并且不能完整保留原表字段的数据类型 x.to_sql(table_name, con=engine, if_exists='replace', index=False) printf("成功完成表%s的数据导入!" % table_name) except Exception as err: print(err) # 创建主键 sql = "alter table `gtzz_base_db`.`t_pro_task` modify column `sn` varchar(128) character set utf8mb4 collate utf8mb4_general_ci not null first,add primary key (`sn`)" db.execute(sql) # 添加一列:subject_id sql = "alter table `gtzz_base_db`.`t_pro_task` add column `subject_id` int null default 0 comment '与t_gtzz_subject对应的主键' after `reiterate_task_sn`" db.execute(sql) # 添加组织机构码 sql = "alter table `gtzz_base_db`.`t_pro_task` add column `org_code` varchar(36) null comment '组织机构码' after `subject_id`" db.execute(sql) # 更新组织机构码 sql = "update t_pro_task as t1 ,t_base_organization as t2 set t1.org_code=t2.org_code where t1.send_user_name=t2.org_name" db.execute(sql) # 创建索引 sql = "alter table `gtzz_base_db`.`t_pro_message` modify column `task_sn` varchar(128) character set utf8mb4 collate utf8mb4_bin null default null after `modify_time`" db.execute(sql) sql = "alter table `gtzz_base_db`.`t_pro_message` add index(`task_sn`)" db.execute(sql) # 写入同步日志 if cnt == 11: message = '正常同步成功!' else: message = '财务系统未授权同步文件!' sql = "insert into t_gtzz_sync_log(file_count,message) values(%s,'%s')" % (cnt, message) db.execute(sql) # 回写subject_id,初始化时应该都是0,更新时才会回写 sql = "update t_pro_task as t1 ,t_gtzz_subject as t2 set t1.subject_id=t2.subject_id where t1.sn=t2.task_sn" db.execute(sql) # 填充表t_gtzz_subject sql = "select t1.sn,(select itemvalue20 from t_pro_form_value where task_sn=t1.sn) as money,(select itemvalue11 from t_pro_form_value where task_sn=t1.sn) as xmjj ,t1.subject_id,t1.task_number,t1.task_title,t1.task_year,t1.ex_send_org_name,(select class_name from t_pro_detail where task_sn=t1.sn order by class_name limit 1 ) as subject_type_name, t1.create_time from t_pro_task as t1 left join t_pro_form_value as t2 on t2.task_sn=t1.sn where t1.ex_send_org_name in (select org_name from t_gtzz_school) and t1.use_flag=1 and t1.draft_flag=0 and t1.close_flag=1 and t1.data_type=0 order by t1.create_time desc" l1 = db.findList(sql) # 遍历每一条数据 c1 = 0 c2 = 0 for i in range(0, l1[0]): row = l1[1][i] # 项目代号 subject_id = row['subject_id'] # sn task_sn = row['sn'] # 单位名称 xmdw = row['ex_send_org_name'] # 项目名称 xmmc = row['task_title'] # 申报年度 sbnd = row['task_year'] # 查询获取单位编号 sql = "select org_code from t_base_organization where org_name='%s'" % xmdw l2 = db.find(sql) org_code = l2[1]["org_code"] # 立项时间 lixiang_time = row["create_time"] # 项目编号 task_number = row["task_number"] # 项目金额 money = row["money"] # 项目简介 xmjj = row["xmjj"] # 项目类型 sql = "select class_name,parent_name from t_pro_detail where task_sn='%s'" % task_sn l3 = db.findList(sql) # 声明空的两个 l1_list = [] l2_list = [] for x in l3[1]: l1_list.append(x["parent_name"]) l2_list.append(x["class_name"]) # 去重 l1_list = list({}.fromkeys(l1_list).keys()) l2_list = list({}.fromkeys(l2_list).keys()) # 大的项目类型名称 if l1_list == None or len(l1_list) == 0: subject_type_name = '其他' else: subject_type_name = l1_list[0] # 如果存在subject_id,则update if subject_id > 0: finish_scan = 1 sql = "update t_gtzz_subject set finish_scan=1,task_sn='%s',org_code='%s',xmdw='%s',xmmc='%s',sbnd='%s',lixiang_time='%s',subject_type_id='-1',subject_type_name='%s' where subject_id=%d" % \ (task_sn, org_code, xmdw, xmmc, sbnd, lixiang_time, subject_type_name,subject_id) db.execute(sql) else: # 如果不存在subject_id,则insert into finish_scan = 0 sql = "insert into t_gtzz_subject(finish_scan,task_sn,org_code,xmdw,xmmc,sbnd,lixiang_time,subject_type_id,subject_type_name,money,xmjj,task_number) values(%s,'%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % \ (finish_scan, task_sn, org_code, xmdw, xmmc, sbnd, lixiang_time, '-1', subject_type_name, money, xmjj, task_number) # 执行insert 语句 db.execute(sql) # 保存子类型名称 for x in l2_list: sql = "insert into t_gtzz_subject_sub_type(task_sn,sub_type_name,year) values('%s','%s','%s')" % ( task_sn, x, sbnd) db.execute(sql) # 补充一下subject_type_id sql = 'update t_gtzz_subject as t1,t_gtzz_subject_type as t2 set t1.subject_type_id=t2.subject_type_id where t1.subject_type_name=t2.subject_type_name' db.execute(sql) # 更新t_gtzz_subject_sub_type表的subject_id sql = "update t_gtzz_subject_sub_type as t1,t_gtzz_subject as t2 set t1.subject_id=t2.subject_id where t1.task_sn=t2.task_sn" db.execute(sql) if finish_scan == 0: c1 = c1 + 1 printf('成功插入%s条记录!' % c1) else: c2 = c2 + 1 printf('成功更新%s条记录!' % c2) printf('恭喜,所有操作成功完成!') if __name__ == '__main__': SyncData()