|
|
import sqlite3
|
|
|
import sys
|
|
|
import json
|
|
|
|
|
|
import psycopg2
|
|
|
import psycopg2.extras
|
|
|
import psycopg2.pool
|
|
|
|
|
|
import datetime
|
|
|
import json
|
|
|
|
|
|
import pymysql
|
|
|
from dbutils.pooled_db import PooledDB
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
##########################################通用功能封装########################################################################################
|
|
|
# 当前时间
|
|
|
def getCurrentTime():
|
|
|
now_time = datetime.now()
|
|
|
str_time = now_time.strftime("%Y-%m-%d %X") # 格式化时间字符串
|
|
|
return str_time
|
|
|
|
|
|
|
|
|
# 输出带时间的字符串
|
|
|
def printf(str):
|
|
|
print(getCurrentTime() + " " + str)
|
|
|
|
|
|
|
|
|
# 下面的Msql封装部分开始
|
|
|
|
|
|
class MysqlClient(object):
|
|
|
__pool = None
|
|
|
|
|
|
def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
|
|
|
maxusage=100, setsession=None, reset=True,
|
|
|
host="127.0.0.1", port=22066, db="dsideal_db",
|
|
|
user="root", passwd="DsideaL147258369", charset='utf8mb4'):
|
|
|
"""
|
|
|
:param mincached:连接池中空闲连接的初始数量
|
|
|
:param maxcached:连接池中空闲连接的最大数量
|
|
|
:param maxshared:共享连接的最大数量
|
|
|
:param maxconnections:创建连接池的最大数量
|
|
|
:param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
|
|
|
:param maxusage:单个连接的最大重复使用次数
|
|
|
:param setsession:optional list of SQL commands that may serve to prepare
|
|
|
the session, e.g. ["set datestyle to ...", "set time zone ..."]
|
|
|
:param reset:how connections should be reset when returned to the pool
|
|
|
(False or None to rollback transcations started with begin(),
|
|
|
True to always issue a rollback for safety's sake)
|
|
|
:param host:数据库ip地址
|
|
|
:param port:数据库端口
|
|
|
:param db:库名
|
|
|
:param user:用户名
|
|
|
:param passwd:密码
|
|
|
:param charset:字符编码
|
|
|
"""
|
|
|
|
|
|
if not self.__pool:
|
|
|
self.__class__.__pool = PooledDB(pymysql,
|
|
|
mincached, maxcached,
|
|
|
maxshared, maxconnections, blocking,
|
|
|
maxusage, setsession, reset,
|
|
|
host=host, port=port, db=db,
|
|
|
user=user, passwd=passwd,
|
|
|
charset=charset,
|
|
|
cursorclass=pymysql.cursors.DictCursor
|
|
|
)
|
|
|
self._conn = None
|
|
|
self._cursor = None
|
|
|
self.__get_conn()
|
|
|
|
|
|
def __get_conn(self):
|
|
|
self._conn = self.__pool.connection();
|
|
|
self._cursor = self._conn.cursor();
|
|
|
|
|
|
def close(self):
|
|
|
try:
|
|
|
self._cursor.close()
|
|
|
self._conn.close()
|
|
|
except Exception as e:
|
|
|
print(e)
|
|
|
|
|
|
def __execute(self, sql, param=()):
|
|
|
count = self._cursor.execute(sql, param)
|
|
|
# print(count)
|
|
|
self._conn.commit()
|
|
|
return count
|
|
|
|
|
|
@staticmethod
|
|
|
def __dict_datetime_obj_to_str(result_dict):
|
|
|
"""把字典里面的datatime对象转成字符串,使json转换不出错"""
|
|
|
if result_dict:
|
|
|
result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime)}
|
|
|
result_dict.update(result_replace)
|
|
|
return result_dict
|
|
|
|
|
|
def find(self, sql, param=()):
|
|
|
"""查询单个结果"""
|
|
|
count = self.__execute(sql, param)
|
|
|
result = self._cursor.fetchone()
|
|
|
""":type result:dict"""
|
|
|
result = self.__dict_datetime_obj_to_str(result)
|
|
|
return count, result
|
|
|
|
|
|
def findList(self, sql, param=()):
|
|
|
"""
|
|
|
查询多个结果
|
|
|
:param sql: qsl语句
|
|
|
:param param: sql参数
|
|
|
:return: 结果数量和查询结果集
|
|
|
"""
|
|
|
count = self.__execute(sql, param)
|
|
|
result = self._cursor.fetchall()
|
|
|
""":type result:list"""
|
|
|
[self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
|
|
|
return count, result
|
|
|
|
|
|
def execute(self, sql, param=()):
|
|
|
count = self.__execute(sql, param)
|
|
|
return count
|
|
|
|
|
|
def begin(self):
|
|
|
"""开启事务"""
|
|
|
self._conn.autocommit(0)
|
|
|
|
|
|
def end(self, option='commit'):
|
|
|
"""结束事务"""
|
|
|
if option == 'commit':
|
|
|
self._conn.autocommit()
|
|
|
else:
|
|
|
self._conn.rollback()
|
|
|
|
|
|
|
|
|
# 获取总数
|
|
|
def getCount(sql):
|
|
|
# 总数
|
|
|
sqlCount = "select count(1) as c from (" + sql + ") as t100"
|
|
|
result = db.findList(sqlCount)
|
|
|
total_count = result[1][0]["c"]
|
|
|
return total_count
|
|
|
|
|
|
|
|
|
# 获取数据
|
|
|
def getPageData(sql, pageNum, page_size):
|
|
|
# 分页
|
|
|
start = (pageNum - 1) * page_size
|
|
|
pageSql = sql + " limit " + str(start) + "," + str(page_size)
|
|
|
# 查询结果
|
|
|
l1 = db.findList(pageSql)
|
|
|
return l1[1]
|
|
|
|
|
|
|
|
|
# 获取分页的页数
|
|
|
def getPageCnt(totalCount, page_size):
|
|
|
# 分页获取
|
|
|
if totalCount % page_size == 0:
|
|
|
page_cnt = int(totalCount / page_size)
|
|
|
else:
|
|
|
page_cnt = int(totalCount / page_size) + 1
|
|
|
return page_cnt
|
|
|
|
|
|
|
|
|
# 获取 mysql 小写columns
|
|
|
def getLowFieldName(dbName, table):
|
|
|
col_sql = "select group_concat(lower(column_name)) as column_names from information_schema.columns where table_schema='" + dbName + "' and table_name = '" + \
|
|
|
table["name"] + "'"
|
|
|
lc = db.findList(col_sql)
|
|
|
return lc[1][0]['column_names']
|
|
|
|
|
|
|
|
|
################################### 上面的 Msql封装部分完成###########################################################################################
|
|
|
|
|
|
####################################下面的 Pg封装部分开始#############################################################################################
|
|
|
# 获取GreenPlum连接池
|
|
|
def gp_connect():
|
|
|
dbname = "dsideal_db"
|
|
|
user = "root"
|
|
|
password = "DsideaL147258369"
|
|
|
host = "10.10.14.206"
|
|
|
port = 5432
|
|
|
try:
|
|
|
simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5, dbname=dbname,
|
|
|
user=user,
|
|
|
password=password,
|
|
|
host=host,
|
|
|
port=port)
|
|
|
# 从数据库连接池获取连接
|
|
|
conn = simple_conn_pool.getconn()
|
|
|
return conn
|
|
|
except psycopg2.DatabaseError as e:
|
|
|
print("could not connect to Greenplum server", e)
|
|
|
|
|
|
|
|
|
# 通过连接池获取链接
|
|
|
pgconn = gp_connect()
|
|
|
|
|
|
|
|
|
# 保存到Gp数据库
|
|
|
def saveGp(l1, pk, table, column_names):
|
|
|
cur = pgconn.cursor()
|
|
|
# 创建SQL语句
|
|
|
gp_list = []
|
|
|
ids_list = []
|
|
|
for x in l1:
|
|
|
# 上报到Gp
|
|
|
# 处理一下日期时间格式
|
|
|
for key in x:
|
|
|
if x[key] == '0000-00-00 00:00:00':
|
|
|
x[key] = '2000-01-01 00:00:00'
|
|
|
elif x[key] == '0000-00-00':
|
|
|
x[key] = '2000-01-01'
|
|
|
elif type(x[key]).__name__ == 'datetime':
|
|
|
dc = x[key].strftime("%Y-%m-%d %H:%M:%S")
|
|
|
x[key] = dc
|
|
|
elif type(x[key]).__name__ == 'NoneType':
|
|
|
x[key] = None
|
|
|
|
|
|
gp_list.append(tuple(x.values()))
|
|
|
ids_list.append(x[pk])
|
|
|
|
|
|
# 按主键删除
|
|
|
del_sql = 'delete from ' + table[
|
|
|
'name'] + ' where ' + pk + ' in ( select ' + pk + ' from (values %s) as t1(' + pk + '))'
|
|
|
psycopg2.extras.execute_values(cur, del_sql, gp_list)
|
|
|
|
|
|
# 插入
|
|
|
pg_sql = 'insert into public.' + table["name"] + ' (' + column_names + ') values %s'
|
|
|
psycopg2.extras.execute_values(cur, pg_sql, gp_list)
|
|
|
# 提交
|
|
|
pgconn.commit()
|
|
|
|
|
|
|
|
|
# 读取json配置文件
|
|
|
def readJson(sqlFileName):
|
|
|
with open("./json/" + sqlFileName, 'r', encoding='utf-8') as load_f:
|
|
|
jo = json.load(load_f)
|
|
|
return jo
|
|
|
|
|
|
|
|
|
####################################上面的 Pg封装部分完成#############################################################################################
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
# 检查参数输入
|
|
|
if len(sys.argv) == 1:
|
|
|
# print("请输入需要处理的配置json文件名称,注意不用带目录,只有文件名称即可!")
|
|
|
# sys.exit()
|
|
|
# 目前测试阶段,这里先这么写,以后就要求必须输入指定的JSON文件,不同的厂商不同的文件。不是按表来的,是按业务厂商来的配置文件。
|
|
|
sys.argv.append("dsideal_ypt.json")
|
|
|
|
|
|
# 异常判断
|
|
|
if pgconn is None:
|
|
|
printf("无法创建到Gp的链接,请检查配置!")
|
|
|
exit(0)
|
|
|
# 读取Json文件配置,获取到Mysql的连接办法
|
|
|
jo = readJson(sys.argv[1])
|
|
|
# 初始化连接
|
|
|
db = MysqlClient(host=jo['host'], port=int(jo['port']), db=jo['db'], user=jo['user'], passwd=jo['passwd'])
|
|
|
|
|
|
# 创建sqlite3的链接
|
|
|
sqlite_conn = sqlite3.connect('./db/progress.db') # 建立连接,如果不存在将会创建
|
|
|
sqlite_cursor = sqlite_conn.cursor() # 创建cursor
|
|
|
# 每页大小
|
|
|
page_size = jo["page_size"]
|
|
|
# 要处理哪些表
|
|
|
tables = jo["tables"]
|
|
|
# 遍历这些表
|
|
|
for table in tables:
|
|
|
# 只处理增量更新内容
|
|
|
if table["type_id"] != 2:
|
|
|
continue
|
|
|
# 查看此表的更新进度是否有记录
|
|
|
dw_update_ts = '2000-01-01 00:00:00' # 默认值
|
|
|
dw_id = 0 # 默认值
|
|
|
sqlite_sql = "select * from t_sync_progress where table_name=?"
|
|
|
sqlite_cursor.execute(sqlite_sql, (table["name"],))
|
|
|
results = sqlite_cursor.fetchall()
|
|
|
if len(results) == 1:
|
|
|
dw_update_ts = results[0][1]
|
|
|
dw_id = results[0][2]
|
|
|
# 主键
|
|
|
pk = table["pk"]
|
|
|
# 数据库名
|
|
|
dbName = jo['db']
|
|
|
# 所有列名转小写
|
|
|
column_names = getLowFieldName(dbName, table)
|
|
|
# 替换sql
|
|
|
sql = 'select ' + column_names + ' from ' + table["name"] + " " + table["condition"]
|
|
|
sql = sql.replace("#dw_update_ts#", dw_update_ts)
|
|
|
sql = sql.replace("#dw_id#", str(dw_id))
|
|
|
# 获取个数
|
|
|
totalCount = getCount(sql)
|
|
|
# 输出
|
|
|
if totalCount == 0:
|
|
|
printf("表%s需要同步的数据条数为0。" % (table["name"]))
|
|
|
# 总页数
|
|
|
pageCnt = getPageCnt(totalCount, page_size)
|
|
|
# 分页获取
|
|
|
finishCnt = 0
|
|
|
for i in range(1, pageCnt + 1):
|
|
|
l1 = getPageData(sql, i, page_size)
|
|
|
# 保存到GP
|
|
|
saveGp(l1, pk, table, column_names)
|
|
|
# 完成一个列表的数据上报
|
|
|
finishCnt = finishCnt + len(l1)
|
|
|
# 找出最后一条的dw_update_ts,dw_id
|
|
|
if len(l1) > 0:
|
|
|
dw_update_ts = l1[len(l1) - 1][table["dw_update_ts"]]
|
|
|
dw_id = l1[len(l1) - 1][table["pk"]]
|
|
|
# 插入或修改
|
|
|
sqlite_sql = "insert or replace into t_sync_progress(table_name,dw_update_ts,dw_id) values(?,?,?)"
|
|
|
sqlite_cursor.execute(sqlite_sql, (table["name"], dw_update_ts, str(dw_id)))
|
|
|
sqlite_conn.commit() # 提交事务
|
|
|
printf("正在同步表%s,成功获取完%d条,共%d条,第%d页,共%d页。" % (table["name"], finishCnt, totalCount, i, pageCnt))
|
|
|
|
|
|
# 关闭sqlit3
|
|
|
sqlite_cursor.close() # 关闭cursor
|
|
|
sqlite_conn.close() # 关闭连接
|
|
|
# 关闭数据库
|
|
|
db.close()
|
|
|
# 关闭连接
|
|
|
pgconn.close()
|
|
|
# 提示完成
|
|
|
printf("恭喜,本轮同步任务已完成!")
|