You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

241 lines
7.4 KiB

# Database.py
import datetime
import logging
import asyncpg
from Config.Config import *
# 创建一个全局的连接池
pool = None
async def create_pool():
global pool
pool = await asyncpg.create_pool(
host=POSTGRES_HOST,
port=POSTGRES_PORT,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
database=POSTGRES_DATABASE,
min_size=1, # 设置连接池最小连接数
max_size=10 # 设置连接池最大连接数
)
async def get_connection():
if pool is None:
raise Exception("连接池未初始化")
async with pool.acquire() as conn:
return conn
async def close_pool():
if pool is not None:
await pool.close()
# 初始化连接池的函数
async def init_database():
await create_pool()
# 关闭连接池的函数
async def shutdown_database():
await close_pool()
# 根据sql语句查询数据
async def find_by_sql(sql: str, params: tuple):
if pool is None:
logging.error("数据库连接池未创建")
return None
try:
async with pool.acquire() as conn:
result = await conn.fetch(sql, *params)
# 将 asyncpg.Record 转换为字典
result_dict = [dict(record) for record in result]
if result_dict:
return result_dict
else:
return None
except Exception as e:
logging.error(f"数据库查询错误: {e}")
return None
# 插入数据
async def insert(tableName, param, onlyForParam=False):
current_time = datetime.datetime.now()
columns = []
values = []
placeholders = []
for key, value in param.items():
if value is not None:
if isinstance(value, (int, float)):
columns.append(key)
values.append(value)
placeholders.append(f"${len(values)}")
elif isinstance(value, str):
columns.append(key)
values.append(value)
placeholders.append(f"${len(values)}")
else:
columns.append(key)
values.append(None)
placeholders.append("NULL")
if not onlyForParam:
if 'is_deleted' not in param:
columns.append("is_deleted")
values.append(0)
placeholders.append(f"${len(values)}")
if 'create_time' not in param:
columns.append("create_time")
values.append(current_time)
placeholders.append(f"${len(values)}")
if 'update_time' not in param:
columns.append("update_time")
values.append(current_time)
placeholders.append(f"${len(values)}")
# 构造 SQL 语句
column_names = ", ".join(columns)
placeholder_names = ", ".join(placeholders)
sql = f"INSERT INTO {tableName} ({column_names}) VALUES ({placeholder_names}) RETURNING id"
try:
async with pool.acquire() as conn:
result = await conn.fetchrow(sql, *values)
if result:
return result['id']
else:
logging.error("插入数据失败: 未返回ID")
return None
except Exception as e:
logging.error(f"数据库查询错误: {e}")
logging.error(f"执行的SQL语句: {sql}")
logging.error(f"参数: {values}")
raise Exception(f"为表[{tableName}]插入数据失败: {e}")
# 更新数据
async def update(table_name, param, property_name, property_value, only_for_param=False):
current_time = datetime.datetime.now()
set_clauses = []
values = []
# 处理要更新的参数
for key, value in param.items():
if value is not None:
if isinstance(value, (int, float)):
set_clauses.append(f"{key} = ${len(values) + 1}")
values.append(value)
elif isinstance(value, str):
set_clauses.append(f"{key} = ${len(values) + 1}")
values.append(value)
else:
set_clauses.append(f"{key} = NULL")
values.append(None)
if not only_for_param:
if 'update_time' not in param:
set_clauses.append(f"update_time = ${len(values) + 1}")
values.append(current_time)
# 构造 SQL 语句
set_clause = ", ".join(set_clauses)
sql = f"UPDATE {table_name} SET {set_clause} WHERE {property_name} = ${len(values) + 1} RETURNING id"
print(sql)
# 添加条件参数
values.append(property_value)
try:
async with pool.acquire() as conn:
result = await conn.fetchrow(sql, *values)
if result:
return result['id']
else:
logging.error("更新数据失败: 未返回ID")
return None
except Exception as e:
logging.error(f"数据库查询错误: {e}")
logging.error(f"执行的SQL语句: {sql}")
logging.error(f"参数: {values}")
raise Exception(f"为表[{table_name}]更新数据失败: {e}")
# 获取Bean
# 通过主键查询
async def find_by_id(table_name, property_name, property_value):
if table_name and property_name and property_value is not None:
# 构造 SQL 语句
sql = f"SELECT * FROM {table_name} WHERE is_deleted = 0 AND {property_name} = $1"
logging.debug(sql)
# 执行查询
result = await find_by_sql(sql, (property_value,))
if not result:
logging.error("查询失败: 未找到数据")
return None
# 返回第一条数据
return result[0]
else:
logging.error("参数不全")
return None
# 通过主键删除
# 逻辑删除
async def delete_by_id(table_name, property_name, property_value):
if table_name and property_name and property_value is not None:
sql = f"UPDATE {table_name} SET is_deleted = 1, update_time = now() WHERE {property_name} = $1 and is_deleted = 0"
logging.debug(sql)
# 执行删除
try:
async with pool.acquire() as conn:
result = await conn.execute(sql, property_value)
if result:
return True
else:
logging.error("删除失败: 未找到数据")
return False
except Exception as e:
logging.error(f"数据库查询错误: {e}")
logging.error(f"执行的SQL语句: {sql}")
logging.error(f"参数: {property_value}")
raise Exception(f"为表[{table_name}]删除数据失败: {e}")
else:
logging.error("参数不全")
return False
# 执行一个SQL语句
async def execute_sql(sql):
logging.debug(sql)
try:
async with pool.acquire() as conn:
await conn.fetch(sql)
except Exception as e:
logging.error(f"数据库查询错误: {e}")
logging.error(f"执行的SQL语句: {sql}")
raise Exception(f"执行SQL失败: {e}")
async def find_person_name_by_id(person_id):
sql = f"SELECT person_name FROM t_sys_loginperson WHERE person_id = $1 and b_use = 1"
logging.debug(sql)
person_list = await find_by_sql(sql, (person_id,))
if person_list:
return person_list[0]['person_name']
else:
return None
async def find_bureau_name_by_id(org_id):
sql = f"SELECT org_name FROM t_base_organization WHERE org_id = $1 and b_use = 1"
logging.debug(sql)
org_list = await find_by_sql(sql, (org_id,))
if org_list:
return org_list[0]['org_name']
else:
return None