""" pip install asyncpg """ import asyncpg from Config.Config import * # PostgreSQL 配置 POSTGRES_CONFIG = { "host": POSTGRES_HOST, "port": POSTGRES_PORT, "user": POSTGRES_USER, "password": POSTGRES_PASSWORD, "database": POSTGRES_DATABASE, "min_size": 1, # 设置为0表示不保留空闲连接 "max_size": 20, "command_timeout": 60 } # 初始化 PostgreSQL 连接池 async def init_postgres_pool(): return await asyncpg.create_pool(**POSTGRES_CONFIG) # 查询示例 async def query_example(pool): async with pool.acquire() as conn: # 执行查询 rows = await conn.fetch('SELECT * FROM your_table LIMIT 10') # 处理结果 for row in rows: print(dict(row)) return rows # 插入示例 async def insert_example(pool, data): async with pool.acquire() as conn: # 执行插入 stmt = """ INSERT INTO your_table (column1, column2) VALUES ($1, $2) """ # 使用参数化查询防止SQL注入 inserted_id = await conn.fetchval(stmt, data['value1'], data['value2']) print(f"插入成功!") return inserted_id # 使用示例 async def main(): pool = await init_postgres_pool() # 查询示例 await query_example(pool) # 插入示例 #data_to_insert = {'value1': 'test1', 'value2': 'test2'} #await insert_example(pool, data_to_insert) await pool.close() if __name__ == '__main__': import asyncio asyncio.run(main())