import asyncio import logging import os import sys import aiofiles # 添加aiofiles导入 from openai import AsyncOpenAI from Config.Config import * # 配置日志 logger = logging.getLogger(__name__) class LLMClient: """ 大语言模型客户端封装类,提供与LLM的交互功能 """ def __init__(self, api_key=None, base_url=None, model_name=None, system_prompt=""): """ 初始化LLM客户端 @param api_key: API密钥,默认为配置文件中的ALY_LLM_API_KEY @param base_url: API基础URL,默认为配置文件中的ALY_LLM_BASE_URL @param model_name: 模型名称,默认为配置文件中的ALY_LLM_MODEL_NAME @param system_prompt: 系统提示词,默认使用预设的学伴角色提示词 """ self.api_key = api_key or ALY_LLM_API_KEY self.base_url = base_url or ALY_LLM_BASE_URL self.model_name = model_name or ALY_LLM_MODEL_NAME self.system_prompt = system_prompt # 添加这一行来初始化system_prompt属性 async def get_response(self, query_text: str, knowledge_content: str = "", stream: bool = True): """ 异步获取大模型响应 @param query_text: 查询文本 @param knowledge_content: 可选的知识内容,将作为上下文提供给模型 @param stream: 是否使用流式输出 @return: 流式响应生成器或完整响应文本 """ try: # 创建AsyncOpenAI客户端 client = AsyncOpenAI( api_key=self.api_key, base_url=self.base_url, ) # 构建完整的查询文本 full_query = query_text if knowledge_content: full_query = f"选择作答的相应知识内容:{knowledge_content}\n下面是用户提的问题:{query_text}" # 创建请求 completion = await client.chat.completions.create( model=self.model_name, messages=[ {'role': 'system', 'content': self.system_prompt}, {'role': 'user', 'content': full_query} ], stream=stream ) if stream: # 流式输出模式,返回生成器 async for chunk in completion: # 确保 chunk.choices 存在且不为空 if chunk and chunk.choices and len(chunk.choices) > 0: # 确保 delta 存在 delta = chunk.choices[0].delta if delta: # 确保 content 存在且不为 None 或空字符串 content = delta.content if content is not None and content.strip(): print(content, end='', flush=True) yield content else: # 非流式处理 if completion and completion.choices and len(completion.choices) > 0: message = completion.choices[0].message if message: content = message.content if content is not None and content.strip(): yield content except Exception as e: print(f"大模型请求异常: {str(e)}", file=sys.stderr) yield f"处理请求时发生异常: {str(e)}" # 保留原有接口以保持兼容性 async def get_response_async(query_text: str, stream: bool = True): """ 异步获取学伴角色的大模型响应(兼容旧接口) @param query_text: 查询文本 @param stream: 是否使用流式输出 @return: 流式响应生成器或完整响应文本 """ # 创建LLM客户端实例 llm_client = LLMClient() # 打开文件读取知识内容 try: with open(r"D:\dsWork\dsProject\dsLightRag\static\YunXiao.txt", "r", encoding="utf-8") as f: zhishiContent = f.read() except Exception as e: print(f"读取知识文件失败: {str(e)}", file=sys.stderr) zhishiContent = "" # 调用封装的方法 async for chunk in llm_client.get_response(query_text, zhishiContent, stream): yield chunk # 生成万有引力导学案函数(修改为异步函数) async def generate_gravitation_lesson_plan(): """ 生成万有引力导学案 @return: 生成的导学案文本 """ # 创建LLM客户端实例,设置专门的系统提示词 system_prompt = """你是'国家中级教师资格'持证教研员,熟悉《中国义务教育物理课程标准(2022版)》,擅长设计'先学后教、以学定教'的导学案。请用'问题链'驱动学生自学,体现'学习目标—前置学习—问题探究—自主检测—疑问记录'五环节。""" llm_client = LLMClient(system_prompt=system_prompt) # 构建生成导学案的提示词 prompt = """请围绕'万有引力'一节课,输出一份适用于八年级下的导学案,供学生课前45分钟完成。要求: 1. 语言亲切,用'你'称呼学生; 2. 问题难度梯度:识记→理解→应用; 3. 至少2个生活化情境问题; 4. 留3行空白让学生写'我还有哪些困惑'。 【格式要求】 严格按以下Markdown层级输出: ### 1. 学习目标 ### 2. 前置学习(知识链接+阅读指引) ### 3. 问题探究(问题链) ### 4. 自主检测(5道客观题+1道开放题) ### 5. 疑问记录区(留空) 【变量】 教材版本:{人教版八年级下第十章第2节} 学生基础:{已学过重力、匀速圆周运动} 课时长度:{45分钟}""" # 修改为流式获取响应 # 确保这是一个异步生成器函数 async def get_lesson_plan(): async for chunk in llm_client.get_response(prompt, stream=True): yield chunk return get_lesson_plan() # 测试生成导学案的函数 async def test_generate_lesson_plan(): print("\n===== 测试生成万有引力导学案 =====") try: # 必须await生成器函数 lesson_plan_chunks = await generate_gravitation_lesson_plan() full_response = "" output_file = "万有引力导学案.md" # 使用aiofiles进行异步文件写入 async with aiofiles.open(output_file, 'w', encoding='utf-8') as f: print("\n生成的导学案:") async for chunk in lesson_plan_chunks: # 实时打印chunk并刷新缓冲区 print(chunk, end='', flush=True) full_response += chunk await f.write(chunk) await f.flush() # 刷新文件缓冲区 print(f"\n导学案已保存到:{os.path.abspath(output_file)}") except Exception as e: print(f"生成导学案时发生异常: {str(e)}", file=sys.stderr) # 修改主函数以包含导学案生成测试 async def main(): try: # 直接生成导学案,不进行交互式测试 await test_generate_lesson_plan() except KeyboardInterrupt: print("\n程序被用户中断") except Exception as e: print(f"测试过程中发生异常: {str(e)}", file=sys.stderr) finally: print("\n测试程序结束") if __name__ == '__main__': # 运行异步主函数 asyncio.run(main())