# Database.py import datetime import logging from utils.PostgreSQLUtil import init_postgres_pool # 根据sql语句查询数据 async def find_by_sql(sql: str, params: tuple): try: pg_pool = await init_postgres_pool() async with pg_pool.acquire() as conn: result = await conn.fetch(sql, *params) # 将 asyncpg.Record 转换为字典 result_dict = [dict(record) for record in result] if result_dict: return result_dict else: return None except Exception as e: logging.error(f"数据库查询错误: {e}") return None # 插入数据 async def insert(tableName, param, onlyForParam=False): current_time = datetime.datetime.now() columns = [] values = [] placeholders = [] for key, value in param.items(): if value is not None: if isinstance(value, (int, float)): columns.append(key) values.append(value) placeholders.append(f"${len(values)}") elif isinstance(value, str): columns.append(key) values.append(value) placeholders.append(f"${len(values)}") else: columns.append(key) values.append(None) placeholders.append("NULL") if not onlyForParam: if 'is_deleted' not in param: columns.append("is_deleted") values.append(0) placeholders.append(f"${len(values)}") if 'create_time' not in param: columns.append("create_time") values.append(current_time) placeholders.append(f"${len(values)}") if 'update_time' not in param: columns.append("update_time") values.append(current_time) placeholders.append(f"${len(values)}") # 构造 SQL 语句 column_names = ", ".join(columns) placeholder_names = ", ".join(placeholders) sql = f"INSERT INTO {tableName} ({column_names}) VALUES ({placeholder_names}) RETURNING id" try: pg_pool = await init_postgres_pool() async with pg_pool.acquire() as conn: result = await conn.fetchrow(sql, *values) if result: return result['id'] else: logging.error("插入数据失败: 未返回ID") return None except Exception as e: logging.error(f"数据库查询错误: {e}") logging.error(f"执行的SQL语句: {sql}") logging.error(f"参数: {values}") raise Exception(f"为表[{tableName}]插入数据失败: {e}") # 更新数据 async def update(table_name, param, property_name, property_value, only_for_param=False): current_time = datetime.datetime.now() set_clauses = [] values = [] # 处理要更新的参数 for key, value in param.items(): if value is not None: if isinstance(value, (int, float)): set_clauses.append(f"{key} = ${len(values) + 1}") values.append(value) elif isinstance(value, str): set_clauses.append(f"{key} = ${len(values) + 1}") values.append(value) else: set_clauses.append(f"{key} = NULL") values.append(None) if not only_for_param: if 'update_time' not in param: set_clauses.append(f"update_time = ${len(values) + 1}") values.append(current_time) # 构造 SQL 语句 set_clause = ", ".join(set_clauses) sql = f"UPDATE {table_name} SET {set_clause} WHERE {property_name} = ${len(values) + 1} RETURNING id" print(sql) # 添加条件参数 values.append(property_value) try: pg_pool = await init_postgres_pool() async with pg_pool.acquire() as conn: result = await conn.fetchrow(sql, *values) if result: return result['id'] else: logging.error("更新数据失败: 未返回ID") return None except Exception as e: logging.error(f"数据库查询错误: {e}") logging.error(f"执行的SQL语句: {sql}") logging.error(f"参数: {values}") raise Exception(f"为表[{table_name}]更新数据失败: {e}") # 获取Bean # 通过主键查询 async def find_by_id(table_name, property_name, property_value): if table_name and property_name and property_value is not None: # 构造 SQL 语句 sql = f"SELECT * FROM {table_name} WHERE is_deleted = 0 AND {property_name} = $1" logging.debug(sql) # 执行查询 result = await find_by_sql(sql, (property_value,)) if not result: logging.error("查询失败: 未找到数据") return None # 返回第一条数据 return result[0] else: logging.error("参数不全") return None # 通过主键删除 # 逻辑删除 async def delete_by_id(table_name, property_name, property_value): if table_name and property_name and property_value is not None: sql = f"UPDATE {table_name} SET is_deleted = 1, update_time = now() WHERE {property_name} = $1 and is_deleted = 0" logging.debug(sql) # 执行删除 try: pg_pool = await init_postgres_pool() async with pg_pool.acquire() as conn: result = await conn.execute(sql, property_value) if result: return True else: logging.error("删除失败: 未找到数据") return False except Exception as e: logging.error(f"数据库查询错误: {e}") logging.error(f"执行的SQL语句: {sql}") logging.error(f"参数: {property_value}") raise Exception(f"为表[{table_name}]删除数据失败: {e}") else: logging.error("参数不全") return False # 执行一个SQL语句 async def execute_sql(sql, params): try: pg_pool = await init_postgres_pool() async with pg_pool.acquire() as conn: await conn.fetch(sql, *params) except Exception as e: logging.error(f"数据库查询错误: {e}") logging.error(f"执行的SQL语句: {sql}") raise Exception(f"执行SQL失败: {e}") async def find_person_name_by_id(person_id): sql = f"SELECT person_name FROM t_sys_loginperson WHERE person_id = $1 and b_use = 1" logging.debug(sql) person_list = await find_by_sql(sql, (person_id,)) if person_list: return person_list[0]['person_name'] else: return None async def find_bureau_name_by_id(org_id): sql = f"SELECT org_name FROM t_base_organization WHERE org_id = $1 and b_use = 1" logging.debug(sql) org_list = await find_by_sql(sql, (org_id,)) if org_list: return org_list[0]['org_name'] else: return None async def update_batch_property(table_name, update_param, property_param, only_for_param=False): current_time = datetime.datetime.now() set_clauses = [] values = [] # 处理要更新的参数 for key, value in update_param.items(): if value is not None: if isinstance(value, (int, float)): set_clauses.append(f"{key} = ${len(values) + 1}") values.append(value) elif isinstance(value, str): set_clauses.append(f"{key} = ${len(values) + 1}") values.append(value) else: set_clauses.append(f"{key} = NULL") values.append(None) if not only_for_param: if 'update_time' not in update_param: set_clauses.append(f"update_time = ${len(values) + 1}") values.append(current_time) # 构造 WHERE 子句 property_clauses = [] for key, value in property_param.items(): if value is not None: if isinstance(value, (int, float)): property_clauses.append(f"{key} = ${len(values) + 1}") values.append(value) elif isinstance(value, str): property_clauses.append(f"{key} = ${len(values) + 1}") values.append(value) else: property_clauses.append(f"{key} IS NULL") values.append(None) # 构造 SQL 语句 set_clause = ", ".join(set_clauses) property_clause = " AND ".join(property_clauses) sql = f"UPDATE {table_name} SET {set_clause} WHERE {property_clause}" logging.debug(sql) try: pg_pool = await init_postgres_pool() async with pg_pool.acquire() as conn: result = await conn.execute(sql, *values) affected_rows = conn.rowcount return affected_rows except Exception as e: logging.error(f"数据库查询错误: {e}") logging.error(f"执行的SQL语句: {sql}") logging.error(f"参数: {values}") raise Exception(f"为表[{table_name}]批量更新数据失败: {e}")