import json import psycopg2 import psycopg2.extras import psycopg2.pool # 获取GreenPlum连接池 def gp_connect(): dbname = "dsideal_db" user = "root" password = "DsideaL147258369" host = "10.10.14.206" port = 5432 try: simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5, dbname=dbname, user=user, password=password, host=host, port=port) # 从数据库连接池获取连接 conn = simple_conn_pool.getconn() return conn except psycopg2.DatabaseError as e: print("could not connect to Greenplum server", e) # 保存到Gp数据库 def saveGp(l1, pk, table, column_names): cur = pgconn.cursor() # 创建SQL语句 gp_list = [] ids_list = [] for x in l1: # 上报到Gp # 处理一下日期时间格式 for key in x: if type(x[key]).__name__ == 'datetime': dc = x[key].strftime("%Y-%m-%d %H:%M:%S") x[key] = dc if type(x[key]).__name__ == 'NoneType': x[key] = None gp_list.append(tuple(x.values())) ids_list.append(x[pk]) # 按主键删除 del_sql = 'delete from ' + table[ 'name'] + ' where ' + pk + ' in ( select ' + pk + ' from (values %s) as t1(' + pk + '))' psycopg2.extras.execute_values(cur, del_sql, gp_list) # 插入 pg_sql = 'insert into public.' + table["name"] + ' (' + column_names + ') values %s' psycopg2.extras.execute_values(cur, pg_sql, gp_list) # 提交 pgconn.commit() # 读取json配置文件 def readJson(sqlFileName): with open("./json/" + sqlFileName, 'r', encoding='utf-8') as load_f: jo = json.load(load_f) return jo