import json from datetime import date, datetime from asyncpg.pool import Pool from sqlalchemy.dialects.postgresql import asyncpg from Config import * class PostgreSQLUtil: def __init__(self, pool: Pool): self.pool = pool async def execute_query(self, sql, params=None): async with self.pool.acquire() as connection: result = await connection.fetch(sql, params) return result async def query_to_json(self, sql, params=None): data = await self.execute_query(sql, params) return json.dumps(data, default=self.json_serializer) @staticmethod def json_serializer(obj): """处理JSON无法序列化的类型""" if isinstance(obj, (date, datetime)): return obj.isoformat() raise TypeError(f"Type {type(obj)} not serializable") async def create_pool(): return await asyncpg.create_pool( host=PG_HOST, port=PG_PORT, database=PG_DATABASE, user=PG_USER, password=PG_PASSWORD, min_size=1, max_size=10 )