You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

307 lines
13 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import datetime
from urllib import request
# pip install pandas
# pip install sqlalchemy
# pip install requests
# pip install pymysql
import pandas as pd
import requests
from sqlalchemy import create_engine
import json
import pymysql
from dbutils.pooled_db import PooledDB
class MysqlClient(object):
__pool = None
def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
maxusage=100, setsession=None, reset=True,
host="gtzz.dsideal.com", port=22066, db="gtzz_base_db",
user="root", passwd="DsideaL147258369", charset='utf8mb4'):
"""
:param mincached:连接池中空闲连接的初始数量
:param maxcached:连接池中空闲连接的最大数量
:param maxshared:共享连接的最大数量
:param maxconnections:创建连接池的最大数量
:param blocking:超过最大连接数量时候的表现为True等待连接数量下降为false直接报错处理
:param maxusage:单个连接的最大重复使用次数
:param setsession:optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
:param reset:how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param host:数据库ip地址
:param port:数据库端口
:param db:库名
:param user:用户名
:param passwd:密码
:param charset:字符编码
"""
if not self.__pool:
self.__class__.__pool = PooledDB(pymysql,
mincached, maxcached,
maxshared, maxconnections, blocking,
maxusage, setsession, reset,
host=host, port=port, db=db,
user=user, passwd=passwd,
charset=charset,
cursorclass=pymysql.cursors.DictCursor
)
self._conn = None
self._cursor = None
self.__get_conn()
def __get_conn(self):
self._conn = self.__pool.connection()
self._cursor = self._conn.cursor()
def close(self):
try:
self._cursor.close()
self._conn.close()
except Exception as e:
print(e)
def __execute(self, sql, param=()):
count = self._cursor.execute(sql, param)
# print(count)
self._conn.commit()
return count
@staticmethod
def __dict_datetime_obj_to_str(result_dict):
"""把字典里面的datatime对象转成字符串使json转换不出错"""
if result_dict:
result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
result_dict.update(result_replace)
return result_dict
def find(self, sql, param=()):
"""查询单个结果"""
count = self.__execute(sql, param)
result = self._cursor.fetchone()
""":type result:dict"""
result = self.__dict_datetime_obj_to_str(result)
return count, result
def findList(self, sql, param=()):
"""
查询多个结果
:param sql: qsl语句
:param param: sql参数
:return: 结果数量和查询结果集
"""
count = self.__execute(sql, param)
result = self._cursor.fetchall()
""":type result:list"""
[self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
return count, result
def execute(self, sql, param=()):
count = self.__execute(sql, param)
return count
def begin(self):
"""开启事务"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""结束事务"""
if option == 'commit':
self._conn.autocommit()
else:
self._conn.rollback()
def printf(str):
now_time = datetime.datetime.now()
str_time = now_time.strftime("%Y-%m-%d %X") # 格式化时间字符串
print(str_time + " " + str)
def SyncData():
# 连接数据库的方法
connectString = 'mysql+pymysql://%s:%s@%s:%s/%s?charset=utf8' % (
"root", "DsideaL147258369", "gtzz.dsideal.com", 22066, "gtzz_base_db")
engine = create_engine(connectString) # use sqlalchemy to build link-engine
# 当前日期
cnt = 0
# 申请数据库连接
db = MysqlClient()
dateStr = datetime.datetime.now().strftime('%Y%m%d')
# dateStr = '20211115'
text_lines = ['https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_PARA.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_ATTACH.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_FORM_VALUE.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_DETAIL.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_FORM_ITEM.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_TARGET.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_ATTACH_TYPE.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_TASK.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_FORM.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_MESSAGE.json',
'https://dsideal.obs.cn-north-1.myhuaweicloud.com/down/ccsdjg/#(date)/T_PRO_CONFIG.json'
]
# 按行遍历
for line in text_lines:
cnt = cnt + 1
url = line.replace('#(date)', dateStr)
# 去掉最后的换行符
url = url.rstrip("\n")
# 表名
table_name = os.path.basename(url).split(".")[0].lower()
# 尝试下载
html = requests.head(url) # 用head方法去请求资源头部
if html.status_code != 200: # 状态码
printf("没有找到指定的下载文件,程序无法继续!")
exit(0)
# 下载文件
r = request.Request(url)
response = request.urlopen(r)
# 中文编码转换
s = response.read()
s = s.decode('gbk')
s = s.encode('gbk', 'ignore').decode('gbk')
s1 = json.loads(s)
try:
x = pd.DataFrame(s1["DATA"])
# https://www.jianshu.com/p/d615699ff254
# DataFrame 的处理逻辑是如果目标表存在,则删除重建,并且不能完整保留原表字段的数据类型
x.to_sql(table_name, con=engine, if_exists='replace', index=False)
printf("成功完成表%s的数据导入!" % table_name)
except Exception as err:
print(err)
# 创建主键
sql = "alter table `gtzz_base_db`.`t_pro_task` modify column `sn` varchar(128) character set utf8mb4 collate utf8mb4_general_ci not null first,add primary key (`sn`)"
db.execute(sql)
# 添加一列:subject_id
sql = "alter table `gtzz_base_db`.`t_pro_task` add column `subject_id` int null default 0 comment '与t_gtzz_subject对应的主键' after `reiterate_task_sn`"
db.execute(sql)
# 添加组织机构码
sql = "alter table `gtzz_base_db`.`t_pro_task` add column `org_code` varchar(36) null comment '组织机构码' after `subject_id`"
db.execute(sql)
# 更新组织机构码
sql = "update t_pro_task as t1 ,t_base_organization as t2 set t1.org_code=t2.org_code where t1.send_user_name=t2.org_name"
db.execute(sql)
# 创建索引
sql = "alter table `gtzz_base_db`.`t_pro_message` modify column `task_sn` varchar(128) character set utf8mb4 collate utf8mb4_bin null default null after `modify_time`"
db.execute(sql)
sql = "alter table `gtzz_base_db`.`t_pro_message` add index(`task_sn`)"
db.execute(sql)
# 写入同步日志
if cnt == 11:
message = '正常同步成功!'
else:
message = '财务系统未授权同步文件!'
sql = "insert into t_gtzz_sync_log(file_count,message) values(%s,'%s')" % (cnt, message)
db.execute(sql)
# 回写subject_id,初始化时应该都是0更新时才会回写
sql = "update t_pro_task as t1 ,t_gtzz_subject as t2 set t1.subject_id=t2.subject_id where t1.sn=t2.task_sn"
db.execute(sql)
# 填充表t_gtzz_subject
sql = "select t1.sn,(select itemvalue20 from t_pro_form_value where task_sn=t1.sn) as money,(select itemvalue11 from t_pro_form_value where task_sn=t1.sn) as xmjj ,t1.subject_id,t1.task_number,t1.task_title,t1.task_year,t1.ex_send_org_name,(select class_name from t_pro_detail where task_sn=t1.sn order by class_name limit 1 ) as subject_type_name, t1.create_time from t_pro_task as t1 left join t_pro_form_value as t2 on t2.task_sn=t1.sn where t1.ex_send_org_name in (select org_name from t_gtzz_school) and t1.use_flag=1 and t1.draft_flag=0 and t1.close_flag=1 and t1.data_type=0 order by t1.create_time desc"
l1 = db.findList(sql)
# 遍历每一条数据
c1 = 0
c2 = 0
for i in range(0, l1[0]):
row = l1[1][i]
# 项目代号
subject_id = row['subject_id']
# sn
task_sn = row['sn']
# 单位名称
xmdw = row['ex_send_org_name']
# 项目名称
xmmc = row['task_title']
# 申报年度
sbnd = row['task_year']
# 查询获取单位编号
sql = "select org_code from t_base_organization where org_name='%s'" % xmdw
l2 = db.find(sql)
org_code = l2[1]["org_code"]
# 立项时间
lixiang_time = row["create_time"]
# 项目编号
task_number = row["task_number"]
# 项目金额
money = row["money"]
# 项目简介
xmjj = row["xmjj"]
# 项目类型
sql = "select class_name,parent_name from t_pro_detail where task_sn='%s'" % task_sn
l3 = db.findList(sql)
# 声明空的两个
l1_list = []
l2_list = []
for x in l3[1]:
l1_list.append(x["parent_name"])
l2_list.append(x["class_name"])
# 去重
l1_list = list({}.fromkeys(l1_list).keys())
l2_list = list({}.fromkeys(l2_list).keys())
# 大的项目类型名称
if l1_list == None or len(l1_list) == 0:
subject_type_name = '其他'
else:
subject_type_name = l1_list[0]
# 如果存在subject_id,则update
if subject_id > 0:
finish_scan = 1
sql = "update t_gtzz_subject set finish_scan=1,task_sn='%s',org_code='%s',xmdw='%s',xmmc='%s',sbnd='%s',lixiang_time='%s',subject_type_id='-1',subject_type_name='%s' where subject_id=%d" % \
(task_sn, org_code, xmdw, xmmc, sbnd, lixiang_time, subject_type_name,subject_id)
db.execute(sql)
else:
# 如果不存在subject_id,则insert into
finish_scan = 0
sql = "insert into t_gtzz_subject(finish_scan,task_sn,org_code,xmdw,xmmc,sbnd,lixiang_time,subject_type_id,subject_type_name,money,xmjj,task_number) values(%s,'%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % \
(finish_scan, task_sn, org_code, xmdw, xmmc, sbnd, lixiang_time, '-1', subject_type_name, money, xmjj,
task_number)
# 执行insert 语句
db.execute(sql)
# 保存子类型名称
for x in l2_list:
sql = "insert into t_gtzz_subject_sub_type(task_sn,sub_type_name,year) values('%s','%s','%s')" % (
task_sn, x, sbnd)
db.execute(sql)
# 补充一下subject_type_id
sql = 'update t_gtzz_subject as t1,t_gtzz_subject_type as t2 set t1.subject_type_id=t2.subject_type_id where t1.subject_type_name=t2.subject_type_name'
db.execute(sql)
# 更新t_gtzz_subject_sub_type表的subject_id
sql = "update t_gtzz_subject_sub_type as t1,t_gtzz_subject as t2 set t1.subject_id=t2.subject_id where t1.task_sn=t2.task_sn"
db.execute(sql)
if finish_scan == 0:
c1 = c1 + 1
printf('成功插入%s条记录!' % c1)
else:
c2 = c2 + 1
printf('成功更新%s条记录!' % c2)
printf('恭喜,所有操作成功完成!')
if __name__ == '__main__':
SyncData()