import sys import json import psycopg2 import psycopg2.extras import psycopg2.pool import datetime import json import pymysql from dbutils.pooled_db import PooledDB from datetime import datetime ##########################################通用功能封装######################################################################################## # 当前时间 def getCurrentTime(): now_time = datetime.now() str_time = now_time.strftime("%Y-%m-%d %X") # 格式化时间字符串 return str_time # 输出带时间的字符串 def printf(str): print(getCurrentTime() + " " + str) # 下面的Msql封装部分开始 class MysqlClient(object): __pool = None def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True, maxusage=100, setsession=None, reset=True, host="127.0.0.1", port=22066, db="dsideal_db", user="root", passwd="DsideaL147258369", charset='utf8mb4'): """ :param mincached:连接池中空闲连接的初始数量 :param maxcached:连接池中空闲连接的最大数量 :param maxshared:共享连接的最大数量 :param maxconnections:创建连接池的最大数量 :param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理 :param maxusage:单个连接的最大重复使用次数 :param setsession:optional list of SQL commands that may serve to prepare the session, e.g. ["set datestyle to ...", "set time zone ..."] :param reset:how connections should be reset when returned to the pool (False or None to rollback transcations started with begin(), True to always issue a rollback for safety's sake) :param host:数据库ip地址 :param port:数据库端口 :param db:库名 :param user:用户名 :param passwd:密码 :param charset:字符编码 """ if not self.__pool: self.__class__.__pool = PooledDB(pymysql, mincached, maxcached, maxshared, maxconnections, blocking, maxusage, setsession, reset, host=host, port=port, db=db, user=user, passwd=passwd, charset=charset, cursorclass=pymysql.cursors.DictCursor ) self._conn = None self._cursor = None self.__get_conn() def __get_conn(self): self._conn = self.__pool.connection(); self._cursor = self._conn.cursor(); def close(self): try: self._cursor.close() self._conn.close() except Exception as e: print(e) def __execute(self, sql, param=()): count = self._cursor.execute(sql, param) # print(count) self._conn.commit() return count @staticmethod def __dict_datetime_obj_to_str(result_dict): """把字典里面的datatime对象转成字符串,使json转换不出错""" if result_dict: result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime)} result_dict.update(result_replace) return result_dict def find(self, sql, param=()): """查询单个结果""" count = self.__execute(sql, param) result = self._cursor.fetchone() """:type result:dict""" result = self.__dict_datetime_obj_to_str(result) return count, result def findList(self, sql, param=()): """ 查询多个结果 :param sql: qsl语句 :param param: sql参数 :return: 结果数量和查询结果集 """ count = self.__execute(sql, param) result = self._cursor.fetchall() """:type result:list""" [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result] return count, result def execute(self, sql, param=()): count = self.__execute(sql, param) return count def begin(self): """开启事务""" self._conn.autocommit(0) def end(self, option='commit'): """结束事务""" if option == 'commit': self._conn.autocommit() else: self._conn.rollback() # 获取总数 def getCount(sql): # 总数 sqlCount = "select count(1) as c from (" + sql + ") as t100" result = db.findList(sqlCount) total_count = result[1][0]["c"] return total_count # 获取数据 def getPageData(sql, pageNum, page_size): # 分页 start = (pageNum - 1) * page_size pageSql = sql + " limit " + str(start) + "," + str(page_size) # 查询结果 l1 = db.findList(pageSql) return l1[1] # 获取分页的页数 def getPageCnt(totalCount, page_size): # 分页获取 if totalCount % page_size == 0: page_cnt = int(totalCount / page_size) else: page_cnt = int(totalCount / page_size) + 1 return page_cnt # 获取 mysql 小写columns def getLowFieldName(dbName, table): col_sql = "select group_concat(lower(column_name)) as column_names from information_schema.columns where table_schema='" + dbName + "' and table_name = '" + \ table["name"] + "'" lc = db.findList(col_sql) return lc[1][0]['column_names'] ################################### 上面的 Msql封装部分完成########################################################################################### ####################################下面的 Pg封装部分开始############################################################################################# # 获取GreenPlum连接池 def gp_connect(): dbname = "dsideal_db" user = "root" password = "DsideaL147258369" host = "10.10.14.206" port = 5432 try: simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5, dbname=dbname, user=user, password=password, host=host, port=port) # 从数据库连接池获取连接 conn = simple_conn_pool.getconn() return conn except psycopg2.DatabaseError as e: print("could not connect to Greenplum server", e) # 通过连接池获取链接 pgconn = gp_connect() # 保存到Gp数据库 def saveGp(l1, pk, table, column_names): cur = pgconn.cursor() # 创建SQL语句 gp_list = [] ids_list = [] for x in l1: # 上报到Gp # 处理一下日期时间格式 for key in x: if x[key] == '0000-00-00 00:00:00': x[key] = '2000-01-01 00:00:00' elif x[key] == '0000-00-00': x[key] = '2000-01-01' elif type(x[key]).__name__ == 'datetime': dc = x[key].strftime("%Y-%m-%d %H:%M:%S") x[key] = dc elif type(x[key]).__name__ == 'NoneType': x[key] = None gp_list.append(tuple(x.values())) ids_list.append(x[pk]) # 按主键删除 del_sql = 'delete from ' + table[ 'name'] + ' where ' + pk + ' in ( select ' + pk + ' from (values %s) as t1(' + pk + '))' psycopg2.extras.execute_values(cur, del_sql, gp_list) # 插入 pg_sql = 'insert into public.' + table["name"] + ' (' + column_names + ') values %s' psycopg2.extras.execute_values(cur, pg_sql, gp_list) # 提交 pgconn.commit() # 读取json配置文件 def readJson(sqlFileName): with open("./json/" + sqlFileName, 'r', encoding='utf-8') as load_f: jo = json.load(load_f) return jo ####################################上面的 Pg封装部分完成############################################################################################# if __name__ == '__main__': # 检查参数输入 if len(sys.argv) == 1: # print("请输入需要处理的配置json文件名称,注意不用带目录,只有文件名称即可!") # sys.exit() # 目前测试阶段,这里先这么写,以后就要求必须输入指定的JSON文件,不同的厂商不同的文件。不是按表来的,是按业务厂商来的配置文件。 sys.argv.append("dsideal_ypt.json") # 异常判断 if pgconn is None: printf("无法创建到Gp的链接,请检查配置!") exit(0) # 读取Json文件配置,获取到Mysql的连接办法 jo = readJson(sys.argv[1]) # 初始化连接 db = MysqlClient(host=jo['host'], port=int(jo['port']), db=jo['db'], user=jo['user'], passwd=jo['passwd']) # 每页大小 page_size = jo["page_size"] # 要处理哪些表 tables = jo["tables"] # 遍历这些表 for table in tables: # 只处理全量内容 if table["type_id"] != 1: continue # 主键 pk = table["pk"] # 数据库名 dbName = jo['db'] # 列名转小写 column_names = getLowFieldName(dbName, table) # 替换sql sql = 'select ' + column_names + ' from ' + table["name"] # 获取个数 totalCount = getCount(sql) # 输出 if totalCount == 0: printf("表%s需要同步的数据条数为0。" % (table["name"])) # 总页数 pageCnt = getPageCnt(totalCount, page_size) # 分页获取 finishCnt = 0 for i in range(1, pageCnt + 1): l1 = getPageData(sql, i, page_size) # 保存到GP saveGp(l1, pk, table, column_names) # 完成一个列表的数据上报 finishCnt = finishCnt + len(l1) printf("正在同步表%s,成功获取完%d条,共%d条,第%d页,共%d页。" % (table["name"], finishCnt, totalCount, i, pageCnt)) # 关闭连接 pgconn.close() # 关闭Mysql连接 db.close() # 提示完成 printf("恭喜,本轮同步任务已完成!")