You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

170 lines
6.0 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import datetime
from typing import Union, List
import psycopg2
import psycopg2.extras
from dbutils.pooled_db import PooledDB
from Config.Config import *
POOL = PooledDB(
creator=psycopg2, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数0和None表示不限制连接数
mincached=2, # 初始化时链接池中至少创建的空闲的链接0表示不创建
maxcached=5, # 链接池中最多闲置的链接0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量0和None表示全部共享。PS: 无用因为pymysql和MySQLdb等模块的 threadsafety都为1所有值无论设置为多少_maxcached永远为0所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后是否阻塞等待。True等待False不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
host=host,
port=port,
user=user,
password=password,
database=dbname
)
class Db(object):
@staticmethod
def fetchOne(sql, _id):
conn = POOL.connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor.execute(sql)
id = cursor.fetchone()[_id]
conn.commit()
conn.close()
return id
@staticmethod
def find(sql):
conn = POOL.connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor.execute(sql)
des = cursor.description
t = ",".join([item[0] for item in des])
table_head = t.split(',') # # 查询表列名 用,分割
result = cursor.fetchall()
conn.close()
list1 = []
for a in result:
c = {}
for b in table_head:
if type(a[b]) == datetime.datetime:
a[b] = datetime.datetime.strftime(a[b], '%Y-%m-%d %H:%M:%S')
c[b] = a[b]
list1.append(c)
return list1
@staticmethod
def findFirst(sql):
return Db.find(sql)[0]
@staticmethod
def findFirstValue(sql):
row = Db.find(sql)[0]
for key in row:
return row[key]
@staticmethod
def paginate(sql, page, limit):
conn = POOL.connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
pageSql = "select * from (" + sql + ") as t100 limit " + str(limit) + " offset " + str((page - 1) * limit)
cursor.execute(pageSql)
des = cursor.description
t = ",".join([item[0] for item in des])
table_head = t.split(',') # # 查询表列名 用,分割
result = cursor.fetchall()
list1 = []
for a in result:
c = {}
for b in table_head:
if type(a[b]) == datetime.datetime:
a[b] = datetime.datetime.strftime(a[b], '%Y-%m-%d %H:%M:%S')
c[b] = a[b]
list1.append(c)
# 一共多少个
sql = "select count(1) as c from (" + sql + ") as t100"
cursor.execute(sql)
cnt = cursor.fetchone()["c"]
conn.close()
return list1, cnt
@staticmethod
def execute(sql):
conn = POOL.connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
res = cursor.execute(sql)
conn.commit()
conn.close()
return res
# 执行类ORM插入
@staticmethod
def save(table, data):
conn = POOL.connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
keys = ', '.join(data.keys())
values = ', '.join(['%s'] * len(data))
sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table, keys=keys, values=values)
sql = sql % tuple(data.values())
cursor.execute(sql)
conn.commit()
conn.close()
# 执行类ORM更新(目前只支持单主键)
@staticmethod
def update(table, primaryKey, data):
conn = POOL.connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
sql1 = "update {table} set ".format(table=table)
sql2 = ''
primaryKeyValue = ''
for key in data:
if key != primaryKey:
sql2 = sql2 + key + "='" + str(data[key]).replace("'",'"') + "',"
else:
primaryKeyValue = str(data[key])
sql2 = sql2[:-1]
sql2 = sql2 + " where " + primaryKey + "='" + primaryKeyValue + "';"
sql = sql1 + sql2
cursor.execute(sql)
conn.commit()
conn.close()
@staticmethod
def execute_by_execute_values(sql: str, data: Union[list, set], template: str = None, page_size: int = 100,
fetch: bool = False):
"""
使用execute_values批量插入、更新效率比execute_batch和executemany高
"""
conn = POOL.connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
try:
psycopg2.extras.execute_values(cursor, sql, data, template=template, page_size=page_size, fetch=fetch)
conn.commit()
conn.close()
except Exception as e:
conn.rollback()
raise e
@staticmethod
def execute_by_execute_batch(sql: str, data: List[dict], page_size: int = 100):
conn = POOL.connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
try:
psycopg2.extras.execute_batch(cursor, sql, data, page_size=page_size)
conn.commit()
conn.close()
except Exception as e:
conn.rollback()
raise e
if __name__ == '__main__':
pass