You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

326 lines
12 KiB

2 years ago
import sqlite3
import sys
import json
import psycopg2
import psycopg2.extras
import psycopg2.pool
import datetime
import json
import pymysql
from dbutils.pooled_db import PooledDB
from datetime import datetime
##########################################通用功能封装########################################################################################
# 当前时间
def getCurrentTime():
now_time = datetime.now()
str_time = now_time.strftime("%Y-%m-%d %X") # 格式化时间字符串
return str_time
# 输出带时间的字符串
def printf(str):
print(getCurrentTime() + " " + str)
# 下面的Msql封装部分开始
class MysqlClient(object):
__pool = None
def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
maxusage=100, setsession=None, reset=True,
host="127.0.0.1", port=22066, db="dsideal_db",
user="root", passwd="DsideaL147258369", charset='utf8mb4'):
"""
:param mincached:连接池中空闲连接的初始数量
:param maxcached:连接池中空闲连接的最大数量
:param maxshared:共享连接的最大数量
:param maxconnections:创建连接池的最大数量
:param blocking:超过最大连接数量时候的表现为True等待连接数量下降为false直接报错处理
:param maxusage:单个连接的最大重复使用次数
:param setsession:optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
:param reset:how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param host:数据库ip地址
:param port:数据库端口
:param db:库名
:param user:用户名
:param passwd:密码
:param charset:字符编码
"""
if not self.__pool:
self.__class__.__pool = PooledDB(pymysql,
mincached, maxcached,
maxshared, maxconnections, blocking,
maxusage, setsession, reset,
host=host, port=port, db=db,
user=user, passwd=passwd,
charset=charset,
cursorclass=pymysql.cursors.DictCursor
)
self._conn = None
self._cursor = None
self.__get_conn()
def __get_conn(self):
self._conn = self.__pool.connection();
self._cursor = self._conn.cursor();
def close(self):
try:
self._cursor.close()
self._conn.close()
except Exception as e:
print(e)
def __execute(self, sql, param=()):
count = self._cursor.execute(sql, param)
# print(count)
self._conn.commit()
return count
@staticmethod
def __dict_datetime_obj_to_str(result_dict):
"""把字典里面的datatime对象转成字符串使json转换不出错"""
if result_dict:
result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime)}
result_dict.update(result_replace)
return result_dict
def find(self, sql, param=()):
"""查询单个结果"""
count = self.__execute(sql, param)
result = self._cursor.fetchone()
""":type result:dict"""
result = self.__dict_datetime_obj_to_str(result)
return count, result
def findList(self, sql, param=()):
"""
查询多个结果
:param sql: qsl语句
:param param: sql参数
:return: 结果数量和查询结果集
"""
count = self.__execute(sql, param)
result = self._cursor.fetchall()
""":type result:list"""
[self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
return count, result
def execute(self, sql, param=()):
count = self.__execute(sql, param)
return count
def begin(self):
"""开启事务"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""结束事务"""
if option == 'commit':
self._conn.autocommit()
else:
self._conn.rollback()
# 获取总数
def getCount(sql):
# 总数
sqlCount = "select count(1) as c from (" + sql + ") as t100"
result = db.findList(sqlCount)
total_count = result[1][0]["c"]
return total_count
# 获取数据
def getPageData(sql, pageNum, page_size):
# 分页
start = (pageNum - 1) * page_size
pageSql = sql + " limit " + str(start) + "," + str(page_size)
# 查询结果
l1 = db.findList(pageSql)
return l1[1]
# 获取分页的页数
def getPageCnt(totalCount, page_size):
# 分页获取
if totalCount % page_size == 0:
page_cnt = int(totalCount / page_size)
else:
page_cnt = int(totalCount / page_size) + 1
return page_cnt
# 获取 mysql 小写columns
def getLowFieldName(dbName, table):
col_sql = "select group_concat(lower(column_name)) as column_names from information_schema.columns where table_schema='" + dbName + "' and table_name = '" + \
table["name"] + "'"
lc = db.findList(col_sql)
return lc[1][0]['column_names']
################################### 上面的 Msql封装部分完成###########################################################################################
####################################下面的 Pg封装部分开始#############################################################################################
# 获取GreenPlum连接池
def gp_connect():
dbname = "dsideal_db"
user = "root"
password = "DsideaL147258369"
host = "10.10.14.206"
port = 5432
try:
simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5, dbname=dbname,
user=user,
password=password,
host=host,
port=port)
# 从数据库连接池获取连接
conn = simple_conn_pool.getconn()
return conn
except psycopg2.DatabaseError as e:
print("could not connect to Greenplum server", e)
# 通过连接池获取链接
pgconn = gp_connect()
# 保存到Gp数据库
def saveGp(l1, pk, table, column_names):
cur = pgconn.cursor()
# 创建SQL语句
gp_list = []
ids_list = []
for x in l1:
# 上报到Gp
# 处理一下日期时间格式
for key in x:
if x[key] == '0000-00-00 00:00:00':
x[key] = '2000-01-01 00:00:00'
elif x[key] == '0000-00-00':
x[key] = '2000-01-01'
elif type(x[key]).__name__ == 'datetime':
dc = x[key].strftime("%Y-%m-%d %H:%M:%S")
x[key] = dc
elif type(x[key]).__name__ == 'NoneType':
x[key] = None
gp_list.append(tuple(x.values()))
ids_list.append(x[pk])
# 按主键删除
del_sql = 'delete from ' + table[
'name'] + ' where ' + pk + ' in ( select ' + pk + ' from (values %s) as t1(' + pk + '))'
psycopg2.extras.execute_values(cur, del_sql, gp_list)
# 插入
pg_sql = 'insert into public.' + table["name"] + ' (' + column_names + ') values %s'
psycopg2.extras.execute_values(cur, pg_sql, gp_list)
# 提交
pgconn.commit()
# 读取json配置文件
def readJson(sqlFileName):
with open("./json/" + sqlFileName, 'r', encoding='utf-8') as load_f:
jo = json.load(load_f)
return jo
####################################上面的 Pg封装部分完成#############################################################################################
if __name__ == '__main__':
# 检查参数输入
if len(sys.argv) == 1:
# print("请输入需要处理的配置json文件名称注意不用带目录只有文件名称即可")
# sys.exit()
# 目前测试阶段这里先这么写以后就要求必须输入指定的JSON文件不同的厂商不同的文件。不是按表来的是按业务厂商来的配置文件。
sys.argv.append("dsideal_ypt.json")
# 异常判断
if pgconn is None:
printf("无法创建到Gp的链接请检查配置")
exit(0)
# 读取Json文件配置获取到Mysql的连接办法
jo = readJson(sys.argv[1])
# 初始化连接
db = MysqlClient(host=jo['host'], port=int(jo['port']), db=jo['db'], user=jo['user'], passwd=jo['passwd'])
# 创建sqlite3的链接
sqlite_conn = sqlite3.connect('./db/progress.db') # 建立连接,如果不存在将会创建
sqlite_cursor = sqlite_conn.cursor() # 创建cursor
# 每页大小
page_size = jo["page_size"]
# 要处理哪些表
tables = jo["tables"]
# 遍历这些表
for table in tables:
# 只处理增量更新内容
if table["type_id"] != 2:
continue
# 查看此表的更新进度是否有记录
dw_update_ts = '2000-01-01 00:00:00' # 默认值
dw_id = 0 # 默认值
sqlite_sql = "select * from t_sync_progress where table_name=?"
sqlite_cursor.execute(sqlite_sql, (table["name"],))
results = sqlite_cursor.fetchall()
if len(results) == 1:
dw_update_ts = results[0][1]
dw_id = results[0][2]
# 主键
pk = table["pk"]
# 数据库名
dbName = jo['db']
# 所有列名转小写
column_names = getLowFieldName(dbName, table)
# 替换sql
sql = 'select ' + column_names + ' from ' + table["name"] + " " + table["condition"]
sql = sql.replace("#dw_update_ts#", dw_update_ts)
sql = sql.replace("#dw_id#", str(dw_id))
# 获取个数
totalCount = getCount(sql)
# 输出
if totalCount == 0:
printf("%s需要同步的数据条数为0。" % (table["name"]))
# 总页数
pageCnt = getPageCnt(totalCount, page_size)
# 分页获取
finishCnt = 0
for i in range(1, pageCnt + 1):
l1 = getPageData(sql, i, page_size)
# 保存到GP
saveGp(l1, pk, table, column_names)
# 完成一个列表的数据上报
finishCnt = finishCnt + len(l1)
# 找出最后一条的dw_update_ts,dw_id
if len(l1) > 0:
dw_update_ts = l1[len(l1) - 1][table["dw_update_ts"]]
dw_id = l1[len(l1) - 1][table["pk"]]
# 插入或修改
sqlite_sql = "insert or replace into t_sync_progress(table_name,dw_update_ts,dw_id) values(?,?,?)"
sqlite_cursor.execute(sqlite_sql, (table["name"], dw_update_ts, str(dw_id)))
sqlite_conn.commit() # 提交事务
printf("正在同步表%s,成功获取完%d条,共%d条,第%d页,共%d页。" % (table["name"], finishCnt, totalCount, i, pageCnt))
# 关闭sqlit3
sqlite_cursor.close() # 关闭cursor
sqlite_conn.close() # 关闭连接
# 关闭数据库
db.close()
# 关闭连接
pgconn.close()
# 提示完成
printf("恭喜,本轮同步任务已完成!")