HuangHai 1 week ago
commit d9583daf28

@ -13,11 +13,11 @@ router = APIRouter(dependencies=[Depends(get_current_user)])
async def get_stage_subject_list():
# 先查询学段list
select_stage_sql: str = "select stage_id, stage_name from t_dm_stage where b_use = 1 order by sort_id;"
stage_list = await find_by_sql(select_stage_sql, ())
stage_list = await find_by_sql(select_stage_sql,())
for stage in stage_list:
# 再查询学科list
select_subject_sql: str = "select subject_id, subject_name from t_dm_subject where stage_id = %s order by sort_id;"
subject_list = await find_by_sql(select_subject_sql, (stage["stage_id"],))
select_subject_sql: str = "select subject_id, subject_name from t_dm_subject where stage_id = " + str(stage["stage_id"]) + " order by sort_id;"
subject_list = await find_by_sql(select_subject_sql,())
stage["subject_list"] = subject_list
return {"success": True, "message": "成功!", "data": stage_list}

@ -1,13 +1,132 @@
# routes/LoginController.py
import os
from fastapi import APIRouter, Request, Response, Depends
from fastapi import APIRouter, Request, Response, Depends, UploadFile, File
from auth.dependencies import get_current_user
from utils.PageUtil import *
from utils.ParseRequest import *
# 创建一个路由实例,需要依赖get_current_user,登录后才能访问
router = APIRouter(dependencies=[Depends(get_current_user)])
# 创建上传文件的目录
UPLOAD_DIR = "upload_file"
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
@router.get("/")
async def test(request: Request, response: Response):
return {"success": True, "message": "成功!"}
# 合法文件扩展名
supported_suffix_types = ['doc', 'docx', 'ppt', 'pptx', 'xls', 'xlsx']
# 【Document-1】文档管理列表
@router.get("/list")
async def list(request: Request):
# 获取参数
person_id = await get_request_str_param(request, "person_id", True, True)
stage_id = await get_request_num_param(request, "stage_id", False, True, -1)
subject_id = await get_request_num_param(request, "subject_id", False, True, -1)
document_suffix = await get_request_str_param(request, "document_suffix", False, True)
document_name = await get_request_str_param(request, "document_name", False, True)
page_number = await get_request_num_param(request, "page_number", False, True, 1)
page_size = await get_request_num_param(request, "page_size", False, True, 10)
print(person_id, stage_id, subject_id, document_suffix, document_name, page_number, page_size)
# 拼接查询SQL语句
select_document_sql: str = " SELECT * FROM t_ai_teaching_model_document WHERE is_deleted = 0 and person_id = '" + person_id + "'"
if stage_id != -1:
select_document_sql += " AND stage_id = " + str(stage_id)
if subject_id != -1:
select_document_sql += " AND subject_id = " + str(subject_id)
if document_suffix != "":
select_document_sql += " AND document_suffix = '" + document_suffix + "'"
if document_name != "":
select_document_sql += " AND document_name = '" + document_name + "'"
select_document_sql += " ORDER BY create_time DESC "
# 查询文档列表
page = await get_page_data_by_sql(select_document_sql, page_number, page_size)
for item in page["list"]:
theme_info = await find_by_id("t_ai_teaching_model_theme", "id", item["theme_id"])
item["theme_info"] = theme_info
return {"success": True, "message": "查询成功!", "data": page}
# 【Document-2】保存文档管理
@router.post("/save")
async def save(request: Request, file: UploadFile = File(...)):
# 获取参数
id = await get_request_num_param(request, "id", False, True, 0)
stage_id = await get_request_num_param(request, "stage_id", False, True, -1)
subject_id = await get_request_num_param(request, "subject_id", False, True, -1)
theme_id = await get_request_num_param(request, "theme_id", True, True, None)
person_id = await get_request_str_param(request, "person_id", True, True)
bureau_id = await get_request_str_param(request, "bureau_id", True, True)
# 先获取theme主题信息
theme_object = await find_by_id("t_ai_teaching_model_theme", "id", theme_id)
if theme_object is None:
return {"success": False, "message": "主题不存在!"}
# 获取文件名
document_name = file.filename
# 检查文件名在该主题下是否重复
select_theme_document_sql: str = "SELECT * FROM t_ai_teaching_model_document WHERE is_deleted = 0 and document_name = '" + document_name + "'"
if id != 0:
select_theme_document_sql += " AND id <> " + id
theme_document = await find_by_sql(select_theme_document_sql, ())
if theme_document is not None:
return {"success": False, "message": "该主题下文档名称重复!"}
# 获取文件扩展名
document_suffix = file.filename.split(".")[-1]
# 检查文件扩展名
if document_suffix not in supported_suffix_types:
return {"success": False, "message": "不支持的文件类型!"}
# 构造文件保存路径
document_dir = UPLOAD_DIR + os.sep + str(theme_object["short_name"]) + "_" + str(theme_object["id"]) + os.sep
if not os.path.exists(document_dir):
os.makedirs(document_dir)
document_path = os.path.join(document_dir, file.filename)
# 保存文件
try:
with open(document_path, "wb") as buffer:
buffer.write(await file.read())
except Exception as e:
return {"success": False, "message": f"文件保存失败!{e}"}
# 构造保存文档SQL语句
param = {"stage_id": stage_id, "subject_id": subject_id, "document_name": document_name, "theme_id": theme_id, "document_path": document_path, "document_suffix": document_suffix, "person_id": person_id, "bureau_id": bureau_id}
# 保存数据
if id == 0:
param["train_flag"] = 0
# 插入数据
id = await insert("t_ai_teaching_model_document", param, False)
return {"success": True, "message": "保存成功!", "data": {"insert_id" : id}}
else:
# 更新数据
await update("t_ai_teaching_model_document", param, "id", id)
return {"success": True, "message": "更新成功!", "data": {"update_id" : id}}
# 【Document-3】获取文档信息
@router.get("/get")
async def get(request: Request):
# 获取参数
id = await get_request_num_param(request, "id", True, True, None)
# 查询数据
document_object = await find_by_id("t_ai_teaching_model_document", "id", id)
if document_object is None:
return {"success": False, "message": "未查询到该文档信息!"}
theme_info = await find_by_id("t_ai_teaching_model_theme", "id", document_object["theme_id"])
document_object["theme_info"] = theme_info
return {"success": True, "message": "查询成功!", "data": {"document": document_object}}
@router.post("/delete")
async def delete(request: Request):
# 获取参数
id = await get_request_num_param(request, "id", True, True, None)
result = await delete_by_id("t_ai_teaching_model_document", "id", id)
if not result:
return {"success": False, "message": "删除失败!"}
return {"success": True, "message": "删除成功!"}

@ -13,7 +13,7 @@ from utils.CookieUtil import *
from utils.Database import *
from utils.JwtUtil import *
from utils.ParseRequest import *
from config.Config import *
from Config.Config import *
# 创建一个路由实例
router = APIRouter()
@ -108,8 +108,9 @@ async def login(request: Request, response: Response):
return {"success": False, "message": "用户名和密码不能为空"}
password = md5_encrypt(password)
select_user_sql: str = "SELECT person_id, person_name, identity_id, login_name, xb, bureau_id, org_id, pwdmd5 FROM t_sys_loginperson WHERE login_name = %s AND b_use = 1"
user = await find_one_by_sql(select_user_sql, (username,))
select_user_sql: str = "SELECT person_id, person_name, identity_id, login_name, xb, bureau_id, org_id, pwdmd5 FROM t_sys_loginperson WHERE login_name = '" + username + "' AND b_use = 1"
userlist = await find_by_sql(select_user_sql,())
user = userlist[0] if userlist else None
logging.info(f"查询结果: {user}")
if user and user['pwdmd5'] == password: # 验证的cas用户密码md5加密的版本
token = create_access_token({"user_id": user['person_id'], "identity_id": user['identity_id']})

@ -3,12 +3,15 @@
from fastapi import APIRouter, Depends
from utils.ParseRequest import *
from auth.dependencies import *
from utils.Database import *
from utils.PageUtil import *
# 创建一个路由实例,需要依赖get_current_user,登录后才能访问
router = APIRouter(dependencies=[Depends(get_current_user)])
# 功能【Theme-1】主题管理列表
# 作者Kalman.CHENG ☆
# 时间2025-07-14
# 备注:
@router.get("/list")
async def list(request: Request):
# 获取参数
@ -24,9 +27,9 @@ async def list(request: Request):
# 拼接查询SQL语句
select_theme_sql: str = " SELECT * FROM t_ai_teaching_model_theme WHERE is_deleted = 0 and person_id = '" + person_id + "'"
if stage_id != -1:
select_theme_sql += " and stage_id = " + stage_id
select_theme_sql += " and stage_id = " + str(stage_id)
if subject_id != -1:
select_theme_sql += " and subject_id = " + subject_id
select_theme_sql += " and subject_id = " + str(subject_id)
if theme_name != "":
select_theme_sql += " and theme_name = '" + theme_name + "'"
select_theme_sql += " ORDER BY create_time DESC"
@ -37,16 +40,76 @@ async def list(request: Request):
return {"success": True, "message": "查询成功!", "data": page}
# 功能【Theme-2】保存主题管理
# 作者Kalman.CHENG ☆
# 时间2025-07-14
# 备注:
@router.post("/save")
async def save(request: Request):
# 获取参数
id = await get_request_num_param(request, "id", False, True, 0)
theme_name = await get_request_str_param(request, "theme_name", True, True)
short_name = await get_request_str_param(request, "short_name", True, True)
theme_icon = await get_request_str_param(request, "theme_icon", False, True)
stage_id = await get_request_num_param(request, "stage_id", True, True, None)
subject_id = await get_request_num_param(request, "subject_id", True, True, None)
person_id = await get_request_str_param(request, "person_id", True, True)
bureau_id = await get_request_str_param(request, "bureau_id", True, True)
# 业务逻辑处理
# 校验参数
check_theme_sql = "SELECT theme_name FROM t_ai_teaching_model_theme WHERE is_deleted = 0 and bureau_id = '" + bureau_id + "' and theme_name = '" + theme_name + "'"
if id != 0:
check_theme_sql += " and id <> " + id
print(check_theme_sql)
check_theme_result = await find_by_sql(check_theme_sql,())
if check_theme_result:
return {"success": False, "message": "该主题名称已存在!"}
check_short_name_sql = "SELECT short_name FROM t_ai_teaching_model_theme WHERE is_deleted = 0 and bureau_id = '" + bureau_id + "' and short_name = '" + short_name + "'"
if id != 0:
check_short_name_sql += " and id <> " + id
print(check_short_name_sql)
check_short_name_result = await find_by_sql(check_short_name_sql,())
if check_short_name_result:
return {"success": False, "message": "该主题英文简称已存在!"}
# 组装参数
param = {"theme_name": theme_name,"short_name": short_name,"theme_icon": theme_icon,"stage_id": stage_id,"subject_id": subject_id,"person_id": person_id,"bureau_id": bureau_id}
# 保存数据
if id == 0:
param["search_flag"] = 0
param["train_flag"] = 0
# 插入数据
id = await insert("t_ai_teaching_model_theme", param, False)
return {"success": True, "message": "保存成功!", "data": {"insert_id" : id}}
else:
# 更新数据
await update("t_ai_teaching_model_theme", param, "id", id, False)
return {"success": True, "message": "更新成功!", "data": {"update_id" : id}}
# 功能【Theme-3】获取主题信息
# 作者Kalman.CHENG ☆
# 时间2025-07-14
# 备注:
@router.get("/get")
async def get(request: Request):
# 获取参数
id = await get_request_num_param(request, "id", True, True, None)
theme_obj = await find_by_id("t_ai_teaching_model_theme", "id", id)
if theme_obj is None:
return {"success": False, "message": "未查询到该主题信息!"}
return {"success": True, "message": "查询成功!", "data": {"theme": theme_obj}}
@router.post("/delete")
async def delete(request: Request):
# 获取参数
id = await get_request_num_param(request, "id", True, True, None)
result = await delete_by_id("t_ai_teaching_model_theme", "id", id)
if not result:
return {"success": False, "message": "删除失败!"}
return {"success": True, "message": "删除成功!"}

@ -1,13 +1,18 @@
# 大模型 【DeepSeek深度求索官方】
#LLM_API_KEY = "sk-44ae895eeb614aa1a9c6460579e322f1"
#LLM_BASE_URL = "https://api.deepseek.com"
#LLM_MODEL_NAME = "deepseek-chat"
# 阿里云的配置信息
ALY_AK = 'LTAI5tE4tgpGcKWhbZg6C4bh'
ALY_SK = 'oizcTOZ8izbGUouboC00RcmGE8vBQ1'
# 阿里云提供的大模型服务
LLM_API_KEY="sk-f6da0c787eff4b0389e4ad03a35a911f"
# 大模型 【DeepSeek深度求索官方】训练时用这个
# LLM_API_KEY = "sk-44ae895eeb614aa1a9c6460579e322f1"
# LLM_BASE_URL = "https://api.deepseek.com"
# LLM_MODEL_NAME = "deepseek-chat"
# 阿里云提供的大模型服务 【阿里云在处理文字材料时,容易引发绿网拦截,导致数据上报异常】
LLM_API_KEY = "sk-f6da0c787eff4b0389e4ad03a35a911f"
LLM_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
#LLM_MODEL_NAME = "qwen-plus" # 不要使用通义千问,会导致化学方程式不正确!
LLM_MODEL_NAME = "deepseek-v3"
#LLM_MODEL_NAME = "deepseek-r1" # 使用更牛B的r1模型
EMBED_MODEL_NAME = "BAAI/bge-m3"
EMBED_API_KEY = "sk-pbqibyjwhrgmnlsmdygplahextfaclgnedetybccknxojlyl"
@ -15,21 +20,20 @@ EMBED_BASE_URL = "https://api.siliconflow.cn/v1"
EMBED_DIM = 1024
EMBED_MAX_TOKEN_SIZE = 8192
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USERNAME = "neo4j"
NEO4J_PASSWORD = "DsideaL147258369"
NEO4J_AUTH = (NEO4J_USERNAME, NEO4J_PASSWORD)
# MYSQL配置信息
MYSQL_HOST = "127.0.0.1"
MYSQL_PORT = 22066
MYSQL_USER = "root"
MYSQL_PASSWORD = "DsideaL147258369"
MYSQL_DB_NAME = "base_db"
MYSQL_POOL_SIZE = 200
# POSTGRESQL配置信息
AGE_GRAPH_NAME = "dickens"
POSTGRES_HOST = "10.10.14.208"
POSTGRES_PORT = 5432
POSTGRES_USER = "postgres"
POSTGRES_PASSWORD = "postgres"
POSTGRES_DATABASE = "rag"
# JWT配置信息
JWT_SECRET_KEY = "ZXZnZWVr5b+r5LmQ5L2g55qE5Ye66KGM"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 300000 # 访问令牌过期时间(分钟)
ACCESS_TOKEN_EXPIRE_MINUTES = 300000 # 访问令牌过期时间(分钟)

@ -1,25 +1,23 @@
# Database.py
import datetime
import logging
import math
import asyncpg
import aiomysql
import asyncio
from config.Config import *
from Config.Config import *
# 创建一个全局的连接池
pool = None
async def create_pool(loop):
async def create_pool():
global pool
pool = await aiomysql.create_pool(
host=MYSQL_HOST,
port=MYSQL_PORT,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
db=MYSQL_DB_NAME,
minsize=1, # 设置连接池最小连接数
maxsize=MYSQL_POOL_SIZE, # 设置连接池最大连接数
cursorclass=aiomysql.DictCursor # 指定游标为字典模式
pool = await asyncpg.create_pool(
host=POSTGRES_HOST,
port=POSTGRES_PORT,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
database=POSTGRES_DATABASE,
min_size=1, # 设置连接池最小连接数
max_size=100 # 设置连接池最大连接数
)
async def get_connection():
@ -30,18 +28,17 @@ async def get_connection():
async def close_pool():
if pool is not None:
pool.close()
await pool.wait_closed()
await pool.close()
# 初始化连接池的函数
async def init_database():
loop = asyncio.get_event_loop()
await create_pool(loop)
await create_pool()
# 关闭连接池的函数
async def shutdown_database():
await close_pool()
# 根据sql语句查询数据
async def find_by_sql(sql: str, params: tuple):
if pool is None:
@ -49,79 +46,162 @@ async def find_by_sql(sql: str, params: tuple):
return None
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql, params)
result = await cur.fetchall()
if result:
return result
else:
return None
result = await conn.fetch(sql, *params)
# 将 asyncpg.Record 转换为字典
result_dict = [dict(record) for record in result]
if result_dict:
return result_dict
else:
return None
except Exception as e:
logging.error(f"数据库查询错误: {e}")
return None
# 插入数据
async def insert(tableName, param, onlyForParam=False):
current_time = datetime.datetime.now()
columns = []
values = []
placeholders = []
for key, value in param.items():
if value is not None:
if isinstance(value, (int, float)):
columns.append(key)
values.append(value)
placeholders.append(f"${len(values)}")
elif isinstance(value, str):
columns.append(key)
values.append(value)
placeholders.append(f"${len(values)}")
else:
columns.append(key)
values.append(None)
placeholders.append("NULL")
if not onlyForParam:
if 'is_deleted' not in param:
columns.append("is_deleted")
values.append(0)
placeholders.append(f"${len(values)}")
if 'create_time' not in param:
columns.append("create_time")
values.append(current_time)
placeholders.append(f"${len(values)}")
if 'update_time' not in param:
columns.append("update_time")
values.append(current_time)
placeholders.append(f"${len(values)}")
# 构造 SQL 语句
column_names = ", ".join(columns)
placeholder_names = ", ".join(placeholders)
sql = f"INSERT INTO {tableName} ({column_names}) VALUES ({placeholder_names}) RETURNING id"
# 根据sql语句查询数据
async def find_one_by_sql(sql: str, params: tuple):
if pool is None:
logging.error("数据库连接池未创建")
return None
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql, params)
result = await cur.fetchone()
result = await conn.fetchrow(sql, *values)
if result:
return result
return result['id']
else:
logging.error("插入数据失败: 未返回ID")
return None
except Exception as e:
logging.error(f"数据库查询错误: {e}")
return None
logging.error(f"执行的SQL语句: {sql}")
logging.error(f"参数: {values}")
raise Exception(f"为表[{tableName}]插入数据失败: {e}")
# 更新数据
async def update(table_name, param, property_name, property_value, only_for_param=False):
current_time = datetime.datetime.now()
set_clauses = []
values = []
# 处理要更新的参数
for key, value in param.items():
if value is not None:
if isinstance(value, (int, float)):
set_clauses.append(f"{key} = ${len(values) + 1}")
values.append(value)
elif isinstance(value, str):
set_clauses.append(f"{key} = ${len(values) + 1}")
values.append(value)
else:
set_clauses.append(f"{key} = NULL")
values.append(None)
if not only_for_param:
if 'update_time' not in param:
set_clauses.append(f"update_time = ${len(values) + 1}")
values.append(current_time)
# 查询数据条数
async def get_total_data_count(total_data_sql):
total_data_count = 0
total_data_count_sql = "select count(1) as count from (" + total_data_sql + ") as temp_table"
result = await find_one_by_sql(total_data_count_sql, ())
if result:
total_data_count = result.get("count")
return total_data_count
# 构造 SQL 语句
set_clause = ", ".join(set_clauses)
sql = f"UPDATE {table_name} SET {set_clause} WHERE {property_name} = ${len(values) + 1} RETURNING id"
print(sql)
# 添加条件参数
values.append(property_value)
def get_page_by_total_row(total_data_count, page_number, page_size):
total_page = (page_size != 0) and math.floor((total_data_count + page_size - 1) / page_size) or 0
if page_number <= 0:
page_number = 1
if 0 < total_page < page_number:
page_number = total_page
offset = page_size * page_number - page_size
limit = page_size
return total_data_count, total_page, offset, limit
try:
async with pool.acquire() as conn:
result = await conn.fetchrow(sql, *values)
if result:
return result['id']
else:
logging.error("更新数据失败: 未返回ID")
return None
except Exception as e:
logging.error(f"数据库查询错误: {e}")
logging.error(f"执行的SQL语句: {sql}")
logging.error(f"参数: {values}")
raise Exception(f"为表[{table_name}]更新数据失败: {e}")
async def get_page_data_by_sql(total_data_sql: str, page_number: int, page_size: int):
if pool is None:
logging.error("数据库连接池未创建")
return None
total_row: int = 0
total_page: int = 0
total_data_sql = total_data_sql.replace(";", "")
total_data_sql = total_data_sql.replace(" FROM ", " from ")
# 查询总数
total_data_count = await get_total_data_count(total_data_sql)
if total_data_count == 0:
return {"page_number": page_number, "page_size": page_size, "total_row": 0, "total_page": 0, "list": []}
# 获取Bean
# 通过主键查询
async def find_by_id(table_name, property_name, property_value):
if table_name and property_name and property_value is not None:
# 构造 SQL 语句
sql = f"SELECT * FROM {table_name} WHERE is_deleted = 0 AND {property_name} = $1"
logging.debug(sql)
# 执行查询
result = await find_by_sql(sql, (property_value,))
if not result:
logging.error("查询失败: 未找到数据")
return None
# 返回第一条数据
return result[0]
else:
total_row, total_page, offset, limit = get_page_by_total_row(total_data_count, page_number, page_size)
# 构造执行分页查询的sql语句
page_data_sql = total_data_sql + " LIMIT %d, %d " % (offset, limit)
print(page_data_sql)
# 执行分页查询
page_data = await find_by_sql(page_data_sql, ())
if page_data:
return {"page_number": page_number, "page_size": page_size, "total_row": total_row, "total_page": total_page, "list": page_data}
logging.error("参数不全")
return None
# 通过主键删除
# 逻辑删除
async def delete_by_id(table_name, property_name, property_value):
if table_name and property_name and property_value is not None:
sql = f"UPDATE {table_name} SET is_deleted = 1, update_time = now() WHERE {property_name} = $1 and is_deleted = 0"
logging.debug(sql)
# 执行删除
try:
async with pool.acquire() as conn:
result = await conn.execute(sql, property_value)
if result:
return True
else:
logging.error("删除失败: 未找到数据")
return False
except Exception as e:
logging.error(f"数据库查询错误: {e}")
logging.error(f"执行的SQL语句: {sql}")
logging.error(f"参数: {property_value}")
raise Exception(f"为表[{table_name}]删除数据失败: {e}")
else:
return {"page_number": page_number, "page_size": page_size, "total_row": 0, "total_page": 0, "list": []}
logging.error("参数不全")
return False

@ -2,7 +2,7 @@
from datetime import datetime, timedelta
from jose import JWTError, jwt
from config.Config import *
from Config.Config import *
def create_access_token(data: dict):

@ -8,7 +8,7 @@ from lightrag import LightRAG
from lightrag.kg.shared_storage import initialize_pipeline_status
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.utils import EmbeddingFunc, logger, set_verbose_debug
from config.Config import *
from Config.Config import *
async def print_stream(stream):

@ -0,0 +1,48 @@
import math
from utils.Database import *
# 查询数据条数
async def get_total_data_count(total_data_sql):
total_data_count = 0
total_data_count_sql = "select count(*) as num from (" + total_data_sql + ") as temp_table"
result = await find_by_sql(total_data_count_sql,())
row = result[0] if result else None
if row:
total_data_count = row.get("num")
return total_data_count
def get_page_by_total_row(total_data_count, page_number, page_size):
total_page = (page_size != 0) and math.floor((total_data_count + page_size - 1) / page_size) or 0
if page_number <= 0:
page_number = 1
if 0 < total_page < page_number:
page_number = total_page
offset = page_size * page_number - page_size
limit = page_size
return total_data_count, total_page, offset, limit
async def get_page_data_by_sql(total_data_sql: str, page_number: int, page_size: int):
total_row: int = 0
total_page: int = 0
total_data_sql = total_data_sql.replace(";", "")
total_data_sql = total_data_sql.replace(" FROM ", " from ")
# 查询总数
total_data_count = await get_total_data_count(total_data_sql)
if total_data_count == 0:
return {"page_number": page_number, "page_size": page_size, "total_row": 0, "total_page": 0, "list": []}
else:
total_row, total_page, offset, limit = get_page_by_total_row(total_data_count, page_number, page_size)
# 构造执行分页查询的sql语句
page_data_sql = total_data_sql + " LIMIT %d offset %d " % (limit, offset)
print(page_data_sql)
# 执行分页查询
page_data = await find_by_sql(page_data_sql, ())
if page_data:
return {"page_number": page_number, "page_size": page_size, "total_row": total_row, "total_page": total_page, "list": page_data}
else:
return {"page_number": page_number, "page_size": page_size, "total_row": 0, "total_page": 0, "list": []}
Loading…
Cancel
Save