import asyncio import json import logging import fastapi import uvicorn from fastapi import FastAPI from lightrag import LightRAG from lightrag.kg.shared_storage import initialize_pipeline_status from raganything import RAGAnything from sse_starlette import EventSourceResponse from starlette.staticfiles import StaticFiles from Util.RagUtil import create_llm_model_func, create_vision_model_func, create_embedding_func # 在程序开始时添加以下配置 logging.basicConfig( level=logging.INFO, # 设置日志级别为INFO format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 或者如果你想更详细地控制日志输出 logger = logging.getLogger('lightrag') logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(handler) async def lifespan(app: FastAPI): yield async def print_stream(stream): async for chunk in stream: if chunk: print(chunk, end="", flush=True) app = FastAPI(lifespan=lifespan) # 挂载静态文件目录 app.mount("/static", StaticFiles(directory="Static"), name="static") @app.post("/api/rag") async def rag(request: fastapi.Request): data = await request.json() topic = data.get("topic") # Chinese, Math # 拼接路径 WORKING_PATH= "./Topic/" + topic # 查询的问题 query = data.get("query") # 关闭参考资料 user_prompt="\n 1、不要输出参考资料 或者 References !" user_prompt = user_prompt + "\n 2、如果问题与提供的知识库内容不符,则明确告诉未在知识库范围内提到!" async def generate_response_stream(query: str): try: # 初始化RAG组件 llm_model_func = create_llm_model_func() vision_model_func = create_vision_model_func(llm_model_func) embedding_func = create_embedding_func() lightrag_instance = LightRAG( working_dir=WORKING_PATH, llm_model_func=llm_model_func, embedding_func=embedding_func ) await lightrag_instance.initialize_storages() await initialize_pipeline_status() # 创建RAG实例并保存到app.state app.state.rag = RAGAnything( lightrag=lightrag_instance, vision_model_func=vision_model_func, ) # 直接使用app.state中已初始化的rag实例 # 修改为直接获取完整响应 resp = await app.state.rag.aquery( query=query, mode="hybrid", stream=False # 改为False获取完整响应 ) # 将完整响应作为单个块返回 #yield f"data: {json.dumps({'reply': resp})}\n\n" #print(resp, end='', flush=True) # 添加逐字输出效果 for i in range(0, len(resp), 5): # 每次输出5个字符 chunk = resp[i:i + 5] yield f"data: {json.dumps({'reply': chunk})}\n\n" await asyncio.sleep(0.1) # 控制输出速度 except Exception as e: yield f"data: {json.dumps({'error': str(e)})}\n\n" logger.error(f"处理查询时出错: {query}. 错误: {str(e)}") finally: # 清理资源 await app.state.rag.lightrag.finalize_storages() return EventSourceResponse(generate_response_stream(query=query)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)