You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

63 lines
2.0 KiB

import json
import psycopg2
import psycopg2.extras
import psycopg2.pool
# 获取GreenPlum连接池
def gp_connect():
dbname = "dsideal_db"
user = "root"
password = "DsideaL147258369"
host = "10.10.14.206"
port = 5432
try:
simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5, dbname=dbname,
user=user,
password=password,
host=host,
port=port)
# 从数据库连接池获取连接
conn = simple_conn_pool.getconn()
return conn
except psycopg2.DatabaseError as e:
print("could not connect to Greenplum server", e)
# 保存到Gp数据库
def saveGp(l1, pk, table, column_names):
cur = pgconn.cursor()
# 创建SQL语句
gp_list = []
ids_list = []
for x in l1:
# 上报到Gp
# 处理一下日期时间格式
for key in x:
if type(x[key]).__name__ == 'datetime':
dc = x[key].strftime("%Y-%m-%d %H:%M:%S")
x[key] = dc
if type(x[key]).__name__ == 'NoneType':
x[key] = None
gp_list.append(tuple(x.values()))
ids_list.append(x[pk])
# 按主键删除
del_sql = 'delete from ' + table[
'name'] + ' where ' + pk + ' in ( select ' + pk + ' from (values %s) as t1(' + pk + '))'
psycopg2.extras.execute_values(cur, del_sql, gp_list)
# 插入
pg_sql = 'insert into public.' + table["name"] + ' (' + column_names + ') values %s'
psycopg2.extras.execute_values(cur, pg_sql, gp_list)
# 提交
pgconn.commit()
# 读取json配置文件
def readJson(sqlFileName):
with open("./json/" + sqlFileName, 'r', encoding='utf-8') as load_f:
jo = json.load(load_f)
return jo