import json from datetime import date, datetime import psycopg2 from psycopg2 import pool from psycopg2.extras import RealDictCursor from Config import * # 创建连接池 postgresql_pool = psycopg2.pool.SimpleConnectionPool( minconn=1, maxconn=10, host=PG_HOST, port=PG_PORT, dbname=PG_DATABASE, user=PG_USER, password=PG_PASSWORD ) class PostgreSQLUtil: def __init__(self, connection): self.connection = connection def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.connection.commit() postgresql_pool.putconn(self.connection) def execute_query(self, sql, params=None, return_dict=True): """执行查询并返回结果""" try: with self.connection.cursor( cursor_factory=RealDictCursor if return_dict else None ) as cursor: cursor.execute(sql, params) if cursor.description: columns = [desc[0] for desc in cursor.description] results = cursor.fetchall() # 转换字典格式 if return_dict: return results else: return [dict(zip(columns, row)) for row in results] else: return {"rowcount": cursor.rowcount} except Exception as e: print(f"执行SQL出错: {e}") self.connection.rollback() raise def get_db(): connection = postgresql_pool.getconn() try: yield PostgreSQLUtil(connection) finally: postgresql_pool.putconn(connection) # 使用示例 if __name__ == "__main__": ''' db_gen = get_db():调用生成器函数,返回生成器对象。 db = next(db_gen):从生成器中获取 PostgreSQLUtil 实例。 生成器函数确保连接在使用后正确归还到连接池。 ''' # 从生成器中获取数据库实例 db_gen = get_db() db = next(db_gen) try: # 示例查询 result = db.execute_query("SELECT version()") print("数据库版本:", result) # 返回JSON json_data = db.query_to_json("SELECT * FROM t_base_class LIMIT 2") print("JSON结果:", json_data) finally: # 手动关闭生成器 db_gen.close()