You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
87 lines
2.4 KiB
87 lines
2.4 KiB
import json
|
|
from datetime import date, datetime
|
|
|
|
import psycopg2
|
|
from psycopg2 import pool
|
|
from psycopg2.extras import RealDictCursor
|
|
from Config import *
|
|
|
|
# 创建连接池
|
|
postgresql_pool = psycopg2.pool.SimpleConnectionPool(
|
|
minconn=1,
|
|
maxconn=10,
|
|
host=PG_HOST,
|
|
port=PG_PORT,
|
|
dbname=PG_DATABASE,
|
|
user=PG_USER,
|
|
password=PG_PASSWORD
|
|
)
|
|
|
|
|
|
class PostgreSQLUtil:
|
|
def __init__(self, connection):
|
|
self.connection = connection
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.connection.commit()
|
|
postgresql_pool.putconn(self.connection)
|
|
|
|
def execute_query(self, sql, params=None, return_dict=True):
|
|
"""执行查询并返回结果"""
|
|
try:
|
|
with self.connection.cursor(
|
|
cursor_factory=RealDictCursor if return_dict else None
|
|
) as cursor:
|
|
cursor.execute(sql, params)
|
|
|
|
if cursor.description:
|
|
columns = [desc[0] for desc in cursor.description]
|
|
results = cursor.fetchall()
|
|
|
|
# 转换字典格式
|
|
if return_dict:
|
|
return results
|
|
else:
|
|
return [dict(zip(columns, row)) for row in results]
|
|
else:
|
|
return {"rowcount": cursor.rowcount}
|
|
|
|
except Exception as e:
|
|
print(f"执行SQL出错: {e}")
|
|
self.connection.rollback()
|
|
raise
|
|
|
|
|
|
def get_db():
|
|
connection = postgresql_pool.getconn()
|
|
try:
|
|
yield PostgreSQLUtil(connection)
|
|
finally:
|
|
postgresql_pool.putconn(connection)
|
|
|
|
|
|
# 使用示例
|
|
if __name__ == "__main__":
|
|
'''
|
|
db_gen = get_db():调用生成器函数,返回生成器对象。
|
|
db = next(db_gen):从生成器中获取 PostgreSQLUtil 实例。
|
|
生成器函数确保连接在使用后正确归还到连接池。
|
|
'''
|
|
# 从生成器中获取数据库实例
|
|
db_gen = get_db()
|
|
db = next(db_gen)
|
|
try:
|
|
# 示例查询
|
|
result = db.execute_query("SELECT version()")
|
|
print("数据库版本:", result)
|
|
|
|
# 返回JSON
|
|
json_data = db.query_to_json("SELECT * FROM t_base_class LIMIT 2")
|
|
print("JSON结果:", json_data)
|
|
finally:
|
|
# 手动关闭生成器
|
|
db_gen.close()
|