You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

326 lines
12 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import sqlite3
import sys
import json
import psycopg2
import psycopg2.extras
import psycopg2.pool
import datetime
import json
import pymysql
from dbutils.pooled_db import PooledDB
from datetime import datetime
##########################################通用功能封装########################################################################################
# 当前时间
def getCurrentTime():
now_time = datetime.now()
str_time = now_time.strftime("%Y-%m-%d %X") # 格式化时间字符串
return str_time
# 输出带时间的字符串
def printf(str):
print(getCurrentTime() + " " + str)
# 下面的Msql封装部分开始
class MysqlClient(object):
__pool = None
def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
maxusage=100, setsession=None, reset=True,
host="127.0.0.1", port=22066, db="dsideal_db",
user="root", passwd="DsideaL147258369", charset='utf8mb4'):
"""
:param mincached:连接池中空闲连接的初始数量
:param maxcached:连接池中空闲连接的最大数量
:param maxshared:共享连接的最大数量
:param maxconnections:创建连接池的最大数量
:param blocking:超过最大连接数量时候的表现为True等待连接数量下降为false直接报错处理
:param maxusage:单个连接的最大重复使用次数
:param setsession:optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
:param reset:how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param host:数据库ip地址
:param port:数据库端口
:param db:库名
:param user:用户名
:param passwd:密码
:param charset:字符编码
"""
if not self.__pool:
self.__class__.__pool = PooledDB(pymysql,
mincached, maxcached,
maxshared, maxconnections, blocking,
maxusage, setsession, reset,
host=host, port=port, db=db,
user=user, passwd=passwd,
charset=charset,
cursorclass=pymysql.cursors.DictCursor
)
self._conn = None
self._cursor = None
self.__get_conn()
def __get_conn(self):
self._conn = self.__pool.connection();
self._cursor = self._conn.cursor();
def close(self):
try:
self._cursor.close()
self._conn.close()
except Exception as e:
print(e)
def __execute(self, sql, param=()):
count = self._cursor.execute(sql, param)
# print(count)
self._conn.commit()
return count
@staticmethod
def __dict_datetime_obj_to_str(result_dict):
"""把字典里面的datatime对象转成字符串使json转换不出错"""
if result_dict:
result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime)}
result_dict.update(result_replace)
return result_dict
def find(self, sql, param=()):
"""查询单个结果"""
count = self.__execute(sql, param)
result = self._cursor.fetchone()
""":type result:dict"""
result = self.__dict_datetime_obj_to_str(result)
return count, result
def findList(self, sql, param=()):
"""
查询多个结果
:param sql: qsl语句
:param param: sql参数
:return: 结果数量和查询结果集
"""
count = self.__execute(sql, param)
result = self._cursor.fetchall()
""":type result:list"""
[self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
return count, result
def execute(self, sql, param=()):
count = self.__execute(sql, param)
return count
def begin(self):
"""开启事务"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""结束事务"""
if option == 'commit':
self._conn.autocommit()
else:
self._conn.rollback()
# 获取总数
def getCount(sql):
# 总数
sqlCount = "select count(1) as c from (" + sql + ") as t100"
result = db.findList(sqlCount)
total_count = result[1][0]["c"]
return total_count
# 获取数据
def getPageData(sql, pageNum, page_size):
# 分页
start = (pageNum - 1) * page_size
pageSql = sql + " limit " + str(start) + "," + str(page_size)
# 查询结果
l1 = db.findList(pageSql)
return l1[1]
# 获取分页的页数
def getPageCnt(totalCount, page_size):
# 分页获取
if totalCount % page_size == 0:
page_cnt = int(totalCount / page_size)
else:
page_cnt = int(totalCount / page_size) + 1
return page_cnt
# 获取 mysql 小写columns
def getLowFieldName(dbName, table):
col_sql = "select group_concat(lower(column_name)) as column_names from information_schema.columns where table_schema='" + dbName + "' and table_name = '" + \
table["name"] + "'"
lc = db.findList(col_sql)
return lc[1][0]['column_names']
################################### 上面的 Msql封装部分完成###########################################################################################
####################################下面的 Pg封装部分开始#############################################################################################
# 获取GreenPlum连接池
def gp_connect():
dbname = "dsideal_db"
user = "root"
password = "DsideaL147258369"
host = "10.10.14.206"
port = 5432
try:
simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5, dbname=dbname,
user=user,
password=password,
host=host,
port=port)
# 从数据库连接池获取连接
conn = simple_conn_pool.getconn()
return conn
except psycopg2.DatabaseError as e:
print("could not connect to Greenplum server", e)
# 通过连接池获取链接
pgconn = gp_connect()
# 保存到Gp数据库
def saveGp(l1, pk, table, column_names):
cur = pgconn.cursor()
# 创建SQL语句
gp_list = []
ids_list = []
for x in l1:
# 上报到Gp
# 处理一下日期时间格式
for key in x:
if x[key] == '0000-00-00 00:00:00':
x[key] = '2000-01-01 00:00:00'
elif x[key] == '0000-00-00':
x[key] = '2000-01-01'
elif type(x[key]).__name__ == 'datetime':
dc = x[key].strftime("%Y-%m-%d %H:%M:%S")
x[key] = dc
elif type(x[key]).__name__ == 'NoneType':
x[key] = None
gp_list.append(tuple(x.values()))
ids_list.append(x[pk])
# 按主键删除
del_sql = 'delete from ' + table[
'name'] + ' where ' + pk + ' in ( select ' + pk + ' from (values %s) as t1(' + pk + '))'
psycopg2.extras.execute_values(cur, del_sql, gp_list)
# 插入
pg_sql = 'insert into public.' + table["name"] + ' (' + column_names + ') values %s'
psycopg2.extras.execute_values(cur, pg_sql, gp_list)
# 提交
pgconn.commit()
# 读取json配置文件
def readJson(sqlFileName):
with open("./json/" + sqlFileName, 'r', encoding='utf-8') as load_f:
jo = json.load(load_f)
return jo
####################################上面的 Pg封装部分完成#############################################################################################
if __name__ == '__main__':
# 检查参数输入
if len(sys.argv) == 1:
# print("请输入需要处理的配置json文件名称注意不用带目录只有文件名称即可")
# sys.exit()
# 目前测试阶段这里先这么写以后就要求必须输入指定的JSON文件不同的厂商不同的文件。不是按表来的是按业务厂商来的配置文件。
sys.argv.append("dsideal_ypt.json")
# 异常判断
if pgconn is None:
printf("无法创建到Gp的链接请检查配置")
exit(0)
# 读取Json文件配置获取到Mysql的连接办法
jo = readJson(sys.argv[1])
# 初始化连接
db = MysqlClient(host=jo['host'], port=int(jo['port']), db=jo['db'], user=jo['user'], passwd=jo['passwd'])
# 创建sqlite3的链接
sqlite_conn = sqlite3.connect('./db/progress.db') # 建立连接,如果不存在将会创建
sqlite_cursor = sqlite_conn.cursor() # 创建cursor
# 每页大小
page_size = jo["page_size"]
# 要处理哪些表
tables = jo["tables"]
# 遍历这些表
for table in tables:
# 只处理增量更新内容
if table["type_id"] != 2:
continue
# 查看此表的更新进度是否有记录
dw_update_ts = '2000-01-01 00:00:00' # 默认值
dw_id = 0 # 默认值
sqlite_sql = "select * from t_sync_progress where table_name=?"
sqlite_cursor.execute(sqlite_sql, (table["name"],))
results = sqlite_cursor.fetchall()
if len(results) == 1:
dw_update_ts = results[0][1]
dw_id = results[0][2]
# 主键
pk = table["pk"]
# 数据库名
dbName = jo['db']
# 所有列名转小写
column_names = getLowFieldName(dbName, table)
# 替换sql
sql = 'select ' + column_names + ' from ' + table["name"] + " " + table["condition"]
sql = sql.replace("#dw_update_ts#", dw_update_ts)
sql = sql.replace("#dw_id#", str(dw_id))
# 获取个数
totalCount = getCount(sql)
# 输出
if totalCount == 0:
printf("%s需要同步的数据条数为0。" % (table["name"]))
# 总页数
pageCnt = getPageCnt(totalCount, page_size)
# 分页获取
finishCnt = 0
for i in range(1, pageCnt + 1):
l1 = getPageData(sql, i, page_size)
# 保存到GP
saveGp(l1, pk, table, column_names)
# 完成一个列表的数据上报
finishCnt = finishCnt + len(l1)
# 找出最后一条的dw_update_ts,dw_id
if len(l1) > 0:
dw_update_ts = l1[len(l1) - 1][table["dw_update_ts"]]
dw_id = l1[len(l1) - 1][table["pk"]]
# 插入或修改
sqlite_sql = "insert or replace into t_sync_progress(table_name,dw_update_ts,dw_id) values(?,?,?)"
sqlite_cursor.execute(sqlite_sql, (table["name"], dw_update_ts, str(dw_id)))
sqlite_conn.commit() # 提交事务
printf("正在同步表%s,成功获取完%d条,共%d条,第%d页,共%d页。" % (table["name"], finishCnt, totalCount, i, pageCnt))
# 关闭sqlit3
sqlite_cursor.close() # 关闭cursor
sqlite_conn.close() # 关闭连接
# 关闭数据库
db.close()
# 关闭连接
pgconn.close()
# 提示完成
printf("恭喜,本轮同步任务已完成!")