You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

307 lines
13 KiB

2 years ago
import os
import datetime
from urllib import request
# pip install pandas
# pip install sqlalchemy
# pip install requests
# pip install pymysql
import pandas as pd
import requests
from sqlalchemy import create_engine
import json
import pymysql
from dbutils.pooled_db import PooledDB
class MysqlClient(object):
__pool = None
def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
maxusage=100, setsession=None, reset=True,
host="gtzz.dsideal.com", port=22066, db="gtzz_base_db",
user="root", passwd="DsideaL147258369", charset='utf8mb4'):
"""
:param mincached:连接池中空闲连接的初始数量
:param maxcached:连接池中空闲连接的最大数量
:param maxshared:共享连接的最大数量
:param maxconnections:创建连接池的最大数量
:param blocking:超过最大连接数量时候的表现为True等待连接数量下降为false直接报错处理
:param maxusage:单个连接的最大重复使用次数
:param setsession:optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
:param reset:how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param host:数据库ip地址
:param port:数据库端口
:param db:库名
:param user:用户名
:param passwd:密码
:param charset:字符编码
"""
if not self.__pool:
self.__class__.__pool = PooledDB(pymysql,
mincached, maxcached,
maxshared, maxconnections, blocking,
maxusage, setsession, reset,
host=host, port=port, db=db,
user=user, passwd=passwd,
charset=charset,
cursorclass=pymysql.cursors.DictCursor
)
self._conn = None
self._cursor = None
self.__get_conn()
def __get_conn(self):
self._conn = self.__pool.connection()
self._cursor = self._conn.cursor()
def close(self):
try:
self._cursor.close()
self._conn.close()
except Exception as e:
print(e)
def __execute(self, sql, param=()):
count = self._cursor.execute(sql, param)
# print(count)
self._conn.commit()
return count
@staticmethod
def __dict_datetime_obj_to_str(result_dict):
"""把字典里面的datatime对象转成字符串使json转换不出错"""
if result_dict:
result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
result_dict.update(result_replace)
return result_dict
def find(self, sql, param=()):
"""查询单个结果"""
count = self.__execute(sql, param)
result = self._cursor.fetchone()
""":type result:dict"""
result = self.__dict_datetime_obj_to_str(result)
return count, result
def findList(self, sql, param=()):
"""
查询多个结果
:param sql: qsl语句
:param param: sql参数
:return: 结果数量和查询结果集
"""
count = self.__execute(sql, param)
result = self._cursor.fetchall()
""":type result:list"""
[self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
return count, result
def execute(self, sql, param=()):
count = self.__execute(sql, param)
return count
def begin(self):
"""开启事务"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""结束事务"""
if option == 'commit':
self._conn.autocommit()
else:
self._conn.rollback()
def printf(str):
now_time = datetime.datetime.now()
str_time = now_time.strftime("%Y-%m-%d %X") # 格式化时间字符串
print(str_time + " " + str)
def SyncData():
# 连接数据库的方法
connectString = 'mysql+pymysql://%s:%s@%s:%s/%s?charset=utf8' % (
"root", "DsideaL147258369", "gtzz.dsideal.com", 22066, "gtzz_base_db")
engine = create_engine(connectString) # use sqlalchemy to build link-engine
# 当前日期
cnt = 0
# 申请数据库连接
db = MysqlClient()
dateStr = datetime.datetime.now().strftime('%Y%m%d')
# dateStr = '20211115'
text_lines = ['https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_PARA.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_ATTACH.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_FORM_VALUE.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_DETAIL.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_FORM_ITEM.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_TARGET.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_ATTACH_TYPE.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_TASK.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_FORM.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_MESSAGE.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_CONFIG.json'
]
# 按行遍历
for line in text_lines:
cnt = cnt + 1
url = line.replace('#(date)', dateStr)
# 去掉最后的换行符
url = url.rstrip("\n")
# 表名
table_name = os.path.basename(url).split(".")[0].lower()
# 尝试下载
html = requests.head(url) # 用head方法去请求资源头部
if html.status_code != 200: # 状态码
printf("没有找到指定的下载文件,程序无法继续!")
exit(0)
# 下载文件
r = request.Request(url)
response = request.urlopen(r)
# 中文编码转换
s = response.read()
s = s.decode('gbk')
s = s.encode('gbk', 'ignore').decode('gbk')
s1 = json.loads(s)
try:
x = pd.DataFrame(s1["DATA"])
# https://www.jianshu.com/p/d615699ff254
# DataFrame 的处理逻辑是如果目标表存在,则删除重建,并且不能完整保留原表字段的数据类型
x.to_sql(table_name, con=engine, if_exists='replace', index=False)
printf("成功完成表%s的数据导入!" % table_name)
except Exception as err:
print(err)
# 创建主键
sql = "alter table `gtzz_base_db`.`t_pro_task` modify column `sn` varchar(128) character set utf8mb4 collate utf8mb4_general_ci not null first,add primary key (`sn`)"
db.execute(sql)
# 添加一列:subject_id
sql = "alter table `gtzz_base_db`.`t_pro_task` add column `subject_id` int null default 0 comment '与t_gtzz_subject对应的主键' after `reiterate_task_sn`"
db.execute(sql)
# 添加组织机构码
sql = "alter table `gtzz_base_db`.`t_pro_task` add column `org_code` varchar(36) null comment '组织机构码' after `subject_id`"
db.execute(sql)
# 更新组织机构码
sql = "update t_pro_task as t1 ,t_base_organization as t2 set t1.org_code=t2.org_code where t1.send_user_name=t2.org_name"
db.execute(sql)
# 创建索引
sql = "alter table `gtzz_base_db`.`t_pro_message` modify column `task_sn` varchar(128) character set utf8mb4 collate utf8mb4_bin null default null after `modify_time`"
db.execute(sql)
sql = "alter table `gtzz_base_db`.`t_pro_message` add index(`task_sn`)"
db.execute(sql)
# 写入同步日志
if cnt == 11:
message = '正常同步成功!'
else:
message = '财务系统未授权同步文件!'
sql = "insert into t_gtzz_sync_log(file_count,message) values(%s,'%s')" % (cnt, message)
db.execute(sql)
# 回写subject_id,初始化时应该都是0更新时才会回写
sql = "update t_pro_task as t1 ,t_gtzz_subject as t2 set t1.subject_id=t2.subject_id where t1.sn=t2.task_sn"
db.execute(sql)
# 填充表t_gtzz_subject
sql = "select t1.sn,(select itemvalue20 from t_pro_form_value where task_sn=t1.sn) as money,(select itemvalue11 from t_pro_form_value where task_sn=t1.sn) as xmjj ,t1.subject_id,t1.task_number,t1.task_title,t1.task_year,t1.ex_send_org_name,(select class_name from t_pro_detail where task_sn=t1.sn order by class_name limit 1 ) as subject_type_name, t1.create_time from t_pro_task as t1 left join t_pro_form_value as t2 on t2.task_sn=t1.sn where t1.ex_send_org_name in (select org_name from t_gtzz_school) and t1.use_flag=1 and t1.draft_flag=0 and t1.close_flag=1 and t1.data_type=0 order by t1.create_time desc"
l1 = db.findList(sql)
# 遍历每一条数据
c1 = 0
c2 = 0
for i in range(0, l1[0]):
row = l1[1][i]
# 项目代号
subject_id = row['subject_id']
# sn
task_sn = row['sn']
# 单位名称
xmdw = row['ex_send_org_name']
# 项目名称
xmmc = row['task_title']
# 申报年度
sbnd = row['task_year']
# 查询获取单位编号
sql = "select org_code from t_base_organization where org_name='%s'" % xmdw
l2 = db.find(sql)
org_code = l2[1]["org_code"]
# 立项时间
lixiang_time = row["create_time"]
# 项目编号
task_number = row["task_number"]
# 项目金额
money = row["money"]
# 项目简介
xmjj = row["xmjj"]
# 项目类型
sql = "select class_name,parent_name from t_pro_detail where task_sn='%s'" % task_sn
l3 = db.findList(sql)
# 声明空的两个
l1_list = []
l2_list = []
for x in l3[1]:
l1_list.append(x["parent_name"])
l2_list.append(x["class_name"])
# 去重
l1_list = list({}.fromkeys(l1_list).keys())
l2_list = list({}.fromkeys(l2_list).keys())
# 大的项目类型名称
if l1_list == None or len(l1_list) == 0:
subject_type_name = '其他'
else:
subject_type_name = l1_list[0]
# 如果存在subject_id,则update
if subject_id > 0:
finish_scan = 1
sql = "update t_gtzz_subject set finish_scan=1,task_sn='%s',org_code='%s',xmdw='%s',xmmc='%s',sbnd='%s',lixiang_time='%s',subject_type_id='-1',subject_type_name='%s' where subject_id=%d" % \
(task_sn, org_code, xmdw, xmmc, sbnd, lixiang_time, subject_type_name,subject_id)
db.execute(sql)
else:
# 如果不存在subject_id,则insert into
finish_scan = 0
sql = "insert into t_gtzz_subject(finish_scan,task_sn,org_code,xmdw,xmmc,sbnd,lixiang_time,subject_type_id,subject_type_name,money,xmjj,task_number) values(%s,'%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % \
(finish_scan, task_sn, org_code, xmdw, xmmc, sbnd, lixiang_time, '-1', subject_type_name, money, xmjj,
task_number)
# 执行insert 语句
db.execute(sql)
# 保存子类型名称
for x in l2_list:
sql = "insert into t_gtzz_subject_sub_type(task_sn,sub_type_name,year) values('%s','%s','%s')" % (
task_sn, x, sbnd)
db.execute(sql)
# 补充一下subject_type_id
sql = 'update t_gtzz_subject as t1,t_gtzz_subject_type as t2 set t1.subject_type_id=t2.subject_type_id where t1.subject_type_name=t2.subject_type_name'
db.execute(sql)
# 更新t_gtzz_subject_sub_type表的subject_id
sql = "update t_gtzz_subject_sub_type as t1,t_gtzz_subject as t2 set t1.subject_id=t2.subject_id where t1.task_sn=t2.task_sn"
db.execute(sql)
if finish_scan == 0:
c1 = c1 + 1
printf('成功插入%s条记录!' % c1)
else:
c2 = c2 + 1
printf('成功更新%s条记录!' % c2)
printf('恭喜,所有操作成功完成!')
if __name__ == '__main__':
SyncData()