You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
63 lines
2.0 KiB
63 lines
2.0 KiB
import json
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import psycopg2.pool
|
|
|
|
|
|
# 获取GreenPlum连接池
|
|
def gp_connect():
|
|
dbname = "dsideal_db"
|
|
user = "root"
|
|
password = "DsideaL147258369"
|
|
host = "10.10.14.206"
|
|
port = 5432
|
|
try:
|
|
simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5, dbname=dbname,
|
|
user=user,
|
|
password=password,
|
|
host=host,
|
|
port=port)
|
|
# 从数据库连接池获取连接
|
|
conn = simple_conn_pool.getconn()
|
|
return conn
|
|
except psycopg2.DatabaseError as e:
|
|
print("could not connect to Greenplum server", e)
|
|
|
|
|
|
# 保存到Gp数据库
|
|
def saveGp(l1, pk, table, column_names):
|
|
cur = pgconn.cursor()
|
|
# 创建SQL语句
|
|
gp_list = []
|
|
ids_list = []
|
|
for x in l1:
|
|
# 上报到Gp
|
|
# 处理一下日期时间格式
|
|
for key in x:
|
|
if type(x[key]).__name__ == 'datetime':
|
|
dc = x[key].strftime("%Y-%m-%d %H:%M:%S")
|
|
x[key] = dc
|
|
if type(x[key]).__name__ == 'NoneType':
|
|
x[key] = None
|
|
|
|
gp_list.append(tuple(x.values()))
|
|
ids_list.append(x[pk])
|
|
|
|
# 按主键删除
|
|
del_sql = 'delete from ' + table[
|
|
'name'] + ' where ' + pk + ' in ( select ' + pk + ' from (values %s) as t1(' + pk + '))'
|
|
psycopg2.extras.execute_values(cur, del_sql, gp_list)
|
|
# 插入
|
|
pg_sql = 'insert into public.' + table["name"] + ' (' + column_names + ') values %s'
|
|
psycopg2.extras.execute_values(cur, pg_sql, gp_list)
|
|
# 提交
|
|
pgconn.commit()
|
|
|
|
|
|
# 读取json配置文件
|
|
def readJson(sqlFileName):
|
|
with open("./json/" + sqlFileName, 'r', encoding='utf-8') as load_f:
|
|
jo = json.load(load_f)
|
|
return jo
|