|
|
import datetime
|
|
|
from typing import Union, List
|
|
|
|
|
|
import psycopg2
|
|
|
import psycopg2.extras
|
|
|
from dbutils.pooled_db import PooledDB
|
|
|
|
|
|
from Config.Config import *
|
|
|
|
|
|
POOL = PooledDB(
|
|
|
creator=psycopg2, # 使用链接数据库的模块
|
|
|
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
|
|
|
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
|
|
|
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
|
|
|
maxshared=3,
|
|
|
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
|
|
|
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
|
|
|
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
|
|
|
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
|
|
|
ping=0,
|
|
|
host=host,
|
|
|
port=port,
|
|
|
user=user,
|
|
|
password=password,
|
|
|
database=dbname
|
|
|
)
|
|
|
|
|
|
|
|
|
class Db(object):
|
|
|
@staticmethod
|
|
|
def fetchOne(sql, _id):
|
|
|
conn = POOL.connection()
|
|
|
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
|
cursor.execute(sql)
|
|
|
id = cursor.fetchone()[_id]
|
|
|
conn.commit()
|
|
|
conn.close()
|
|
|
return id
|
|
|
|
|
|
@staticmethod
|
|
|
def find(sql):
|
|
|
conn = POOL.connection()
|
|
|
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
|
cursor.execute(sql)
|
|
|
des = cursor.description
|
|
|
t = ",".join([item[0] for item in des])
|
|
|
table_head = t.split(',') # # 查询表列名 用,分割
|
|
|
result = cursor.fetchall()
|
|
|
conn.close()
|
|
|
|
|
|
list1 = []
|
|
|
for a in result:
|
|
|
c = {}
|
|
|
for b in table_head:
|
|
|
if type(a[b]) == datetime.datetime:
|
|
|
a[b] = datetime.datetime.strftime(a[b], '%Y-%m-%d %H:%M:%S')
|
|
|
c[b] = a[b]
|
|
|
list1.append(c)
|
|
|
return list1
|
|
|
|
|
|
@staticmethod
|
|
|
def findFirst(sql):
|
|
|
return Db.find(sql)[0]
|
|
|
|
|
|
@staticmethod
|
|
|
def findFirstValue(sql):
|
|
|
row = Db.find(sql)[0]
|
|
|
for key in row:
|
|
|
return row[key]
|
|
|
|
|
|
@staticmethod
|
|
|
def paginate(sql, page, limit):
|
|
|
conn = POOL.connection()
|
|
|
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
|
pageSql = "select * from (" + sql + ") as t100 limit " + str(limit) + " offset " + str((page - 1) * limit)
|
|
|
cursor.execute(pageSql)
|
|
|
des = cursor.description
|
|
|
t = ",".join([item[0] for item in des])
|
|
|
table_head = t.split(',') # # 查询表列名 用,分割
|
|
|
result = cursor.fetchall()
|
|
|
list1 = []
|
|
|
for a in result:
|
|
|
c = {}
|
|
|
for b in table_head:
|
|
|
if type(a[b]) == datetime.datetime:
|
|
|
a[b] = datetime.datetime.strftime(a[b], '%Y-%m-%d %H:%M:%S')
|
|
|
c[b] = a[b]
|
|
|
list1.append(c)
|
|
|
|
|
|
# 一共多少个
|
|
|
sql = "select count(1) as c from (" + sql + ") as t100"
|
|
|
cursor.execute(sql)
|
|
|
cnt = cursor.fetchone()["c"]
|
|
|
conn.close()
|
|
|
return list1, cnt
|
|
|
|
|
|
@staticmethod
|
|
|
def execute(sql):
|
|
|
conn = POOL.connection()
|
|
|
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
|
res = cursor.execute(sql)
|
|
|
conn.commit()
|
|
|
conn.close()
|
|
|
return res
|
|
|
|
|
|
# 执行类ORM插入
|
|
|
@staticmethod
|
|
|
def save(table, data):
|
|
|
conn = POOL.connection()
|
|
|
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
|
keys = ', '.join(data.keys())
|
|
|
values = ', '.join(['%s'] * len(data))
|
|
|
sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table, keys=keys, values=values)
|
|
|
sql = sql % tuple(data.values())
|
|
|
cursor.execute(sql)
|
|
|
conn.commit()
|
|
|
conn.close()
|
|
|
|
|
|
# 执行类ORM更新(目前只支持单主键)
|
|
|
@staticmethod
|
|
|
def update(table, primaryKey, data):
|
|
|
conn = POOL.connection()
|
|
|
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
|
sql1 = "update {table} set ".format(table=table)
|
|
|
sql2 = ''
|
|
|
primaryKeyValue = ''
|
|
|
for key in data:
|
|
|
if key != primaryKey:
|
|
|
sql2 = sql2 + key + "='" + str(data[key]).replace("'",'"') + "',"
|
|
|
else:
|
|
|
primaryKeyValue = str(data[key])
|
|
|
sql2 = sql2[:-1]
|
|
|
sql2 = sql2 + " where " + primaryKey + "='" + primaryKeyValue + "';"
|
|
|
sql = sql1 + sql2
|
|
|
cursor.execute(sql)
|
|
|
conn.commit()
|
|
|
conn.close()
|
|
|
|
|
|
@staticmethod
|
|
|
def execute_by_execute_values(sql: str, data: Union[list, set], template: str = None, page_size: int = 100,
|
|
|
fetch: bool = False):
|
|
|
"""
|
|
|
使用execute_values批量插入、更新,效率比execute_batch和executemany高
|
|
|
"""
|
|
|
conn = POOL.connection()
|
|
|
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
|
try:
|
|
|
psycopg2.extras.execute_values(cursor, sql, data, template=template, page_size=page_size, fetch=fetch)
|
|
|
conn.commit()
|
|
|
conn.close()
|
|
|
except Exception as e:
|
|
|
conn.rollback()
|
|
|
raise e
|
|
|
|
|
|
@staticmethod
|
|
|
def execute_by_execute_batch(sql: str, data: List[dict], page_size: int = 100):
|
|
|
conn = POOL.connection()
|
|
|
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
|
try:
|
|
|
psycopg2.extras.execute_batch(cursor, sql, data, page_size=page_size)
|
|
|
conn.commit()
|
|
|
conn.close()
|
|
|
except Exception as e:
|
|
|
conn.rollback()
|
|
|
raise e
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
pass
|