# -*- coding: utf-8 -*- """ 数学题目分析系统 v6.3(稳定流式处理版) """ import re import json import hashlib import requests from py2neo import Graph from typing import Dict, List from Config.Config import NEO4J_URI, NEO4J_AUTH, MODEL_API_URL, MODEL_API_KEY, MODEL_NAME # ================== 配置类 ================== class Config: NEO4J_URI = NEO4J_URI NEO4J_AUTH = NEO4J_AUTH MODEL_API_URL = MODEL_API_URL MODEL_API_KEY = MODEL_API_KEY MODEL_NAME = MODEL_NAME MAX_CONTENT_LENGTH = 500 STREAM_TIMEOUT = 30 # ================== 知识图谱管理 ================== class KnowledgeManager: def __init__(self): self.graph = Graph(Config.NEO4J_URI, auth=Config.NEO4J_AUTH) self._verify_connection() self.knowledge_map = self._load_knowledge() self.literacy_map = self._load_literacy() print("✅ 知识库加载完成") print(f"有效知识点({len(self.knowledge_map)}个): {list(self.knowledge_map.keys())[:3]}...") print(f"有效素养点({len(self.literacy_map)}个): {list(self.literacy_map.keys())[:3]}...") def _verify_connection(self): """安全连接验证""" try: self.graph.run("RETURN 1 AS test") print("✅ Neo4j连接验证成功") except Exception as e: print(f"❌ Neo4j连接失败: {str(e)}") raise def _load_knowledge(self) -> Dict[str, str]: """安全加载知识点""" records = self.graph.run("MATCH (n:KnowledgePoint) WHERE n.name IS NOT NULL RETURN n.id, n.name").data() return {rec['n.name'].strip(): rec['n.id'] for rec in records if rec['n.name'] and rec['n.name'].strip()} def _load_literacy(self) -> Dict[str, str]: """安全加载素养点""" records = self.graph.run("MATCH (n:LiteracyNode) WHERE n.title IS NOT NULL RETURN n.value, n.title").data() return {rec['n.title'].strip(): rec['n.value'] for rec in records if rec['n.title'] and rec['n.title'].strip()} def store_analysis(self, question_id: str, content: str, knowledge: List[str], literacy: List[str]): """增强存储方法""" tx = self.graph.begin() try: # 创建/更新题目节点 tx.run(""" MERGE (q:Question {id: $id}) SET q.content = $content, q.updateTime = timestamp() """, {"id": question_id, "content": content}) # 关联知识点 valid_kp = [] for name in knowledge: clean_name = name.strip() if kp_id := self.knowledge_map.get(clean_name): tx.run(""" MATCH (q:Question {id: $qid}), (kp:KnowledgePoint {id: $kpid}) MERGE (q)-[r:REQUIRES_KNOWLEDGE]->(kp) SET r.lastUsed = timestamp() """, {"qid": question_id, "kpid": kp_id}) valid_kp.append(clean_name) # 关联素养点 valid_lp = [] for title in literacy: clean_title = title.strip() if lit_id := self.literacy_map.get(clean_title): tx.run(""" MATCH (q:Question {id: $qid}), (lp:LiteracyNode {value: $lpid}) MERGE (q)-[r:DEVELOPS_LITERACY]->(lp) SET r.lastUsed = timestamp() """, {"qid": question_id, "lpid": lit_id}) valid_lp.append(clean_title) self.graph.commit(tx) print(f"✅ 存储成功 - 知识点: {valid_kp}, 素养点: {valid_lp}") except Exception as e: self.graph.rollback(tx) print(f"❌ 存储失败: {str(e)}") # ================== 大模型客户端 ================== class StreamLLMClient: def __init__(self, kg: KnowledgeManager): self.kg = kg self.base_url = Config.MODEL_API_URL self.headers = { "Authorization": f"Bearer {Config.MODEL_API_KEY}", "Content-Type": "application/json", "Accept": "application/json" } def analyze_problem(self, content: str) -> dict: """增强的流式分析""" try: prompt = self._build_prompt(content) response = self._send_request(prompt) return self._process_stream(response) except Exception as e: print(f"🔴 分析失败: {str(e)}") return {} def _build_prompt(self, content: str) -> str: """构建精准提示词""" return f"""请严格按以下要求分析数学题目: 1. 知识点必须完全匹配以下列表中的名称(不要新增或修改): {self.kg.knowledge_map.keys()} 2. 素养点必须完全匹配以下列表中的名称: {self.kg.literacy_map.keys()} 3. 返回严格JSON格式: {{ "problem_types": ["题型"], "knowledge_points": ["匹配的知识点"], "literacy_points": ["匹配的素养点"] }} 题目内容:{content}""" def _send_request(self, prompt: str): """发送API请求""" return requests.post( f"{self.base_url}/chat/completions", headers=self.headers, json={ "model": Config.MODEL_NAME, "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "stream": True }, timeout=Config.STREAM_TIMEOUT, stream=True ) def _process_stream(self, response) -> dict: """可靠的流式处理""" full_content = "" try: for chunk in response.iter_lines(): if chunk: decoded = chunk.decode('utf-8').strip() if decoded.startswith('data:'): json_str = decoded[5:].strip() if json_str == "[DONE]": break try: data = json.loads(json_str) if content := data['choices'][0]['delta'].get('content'): full_content += content except: continue # 调试日志 print(f"原始响应内容:\n{full_content}") # 提取有效JSON json_str = re.search(r'\{[\s\S]*\}', full_content).group() return json.loads(json_str) except json.JSONDecodeError: print(f"⚠️ JSON解析失败,原始内容:{full_content}") return {} except Exception as e: print(f"流处理异常:{str(e)}") return {} # ================== 核心引擎 ================== class ProblemAnalyzer: def __init__(self, content: str): self.original = content self.content = self._preprocess(content) self.question_id = hashlib.sha256(content.encode()).hexdigest()[:12] self.kg = KnowledgeManager() self.llm = StreamLLMClient(self.kg) def _preprocess(self, text: str) -> str: """文本预处理""" return re.sub(r'[^\w\u4e00-\u9fa5]', '', text)[:Config.MAX_CONTENT_LENGTH] def execute(self): """执行分析流程""" print(f"\n🔍 分析题目: {self.original[:50]}...") analysis = self.llm.analyze_problem(self.original) if not analysis: print("⚠️ 大模型分析失败") return print("\n📊 分析结果:") print(f" 题型: {analysis.get('problem_types', [])}") print(f" 知识点: {analysis.get('knowledge_points', [])}") print(f" 素养点: {analysis.get('literacy_points', [])}") self.kg.store_analysis( question_id=self.question_id, content=self.content, knowledge=analysis.get('knowledge_points', []), literacy=analysis.get('literacy_points', []) ) # ================== 查询接口 ================== def query_question(question_id: str): try: graph = Graph(Config.NEO4J_URI, auth=Config.NEO4J_AUTH) result = graph.run(""" MATCH (q:Question {id: $id}) OPTIONAL MATCH (q)-[:REQUIRES_KNOWLEDGE]->(kp) OPTIONAL MATCH (q)-[:DEVELOPS_LITERACY]->(lp) RETURN q.content AS content, collect(kp.name) AS knowledge, collect(lp.title) AS literacy """, id=question_id).data() if result: data = result[0] print(f"\n🔍 查询结果(ID: {question_id})") print(f"内容: {data['content']}") print(f"知识点: {data['knowledge']}") print(f"素养点: {data['literacy']}") else: print("未找到相关题目") except Exception as e: print(f"查询错误: {str(e)}") # ================== 测试执行 ================== if __name__ == '__main__': test_cases = [ # 基础运算类 "小明用50元买了3本笔记本,每本8元,还剩多少钱?", # 知识点:四则运算应用 素养点:数学运算能力 # 几何问题 "一个长方形花坛长5米,宽3米,四周铺1米宽的小路,求小路面积?", # 知识点:面积计算 素养点:空间观念 # 统计概率 "某班30人,数学成绩90分以上占1/5,80-89分占1/3,求80分以下人数?", # 知识点:分数运算 素养点:数据分析 # 典型应用题 "甲乙两车相距300公里,甲车速度60km/h,乙车40km/h,几小时后相遇?", # 知识点:相遇问题 素养点:数学建模 # 方程应用 "鸡兔同笼,头共10个,脚共28只,问鸡兔各多少?", # 知识点:二元一次方程 素养点:代数思维 # 比例问题 "配制农药药液,药粉和水的比例是3:100,现有水500kg,需要多少药粉?", # 知识点:比例应用 素养点:量化分析 # 函数图像 "某物体做匀加速运动,初速度2m/s,加速度0.5m/s²,写出速度v与时间t的关系式", # 知识点:一次函数 素养点:数形结合 # 立体几何 "圆柱形水桶底面半径0.4米,高1.2米,求它的容积(π取3.14)", # 知识点:圆柱体积 素养点:空间想象 # 概率统计 "掷两个骰子,点数和为7的概率是多少?", # 知识点:概率计算 素养点:数据预测 # 工程问题 "甲队单独完成工程需20天,乙队需30天,两队合作需要多少天?", # 知识点:工程问题 素养点:模型构建 # 最优化问题 "用20米篱笆围矩形菜地,怎样围面积最大?最大面积是多少?", # 知识点:二次函数 素养点:优化思想 # 利率问题 "本金10000元,年利率3.5%,存3年到期本息合计多少?", # 知识点:单利计算 素养点:金融素养 # 数列问题 "等差数列首项5,公差3,求第10项的值", # 知识点:等差数列 素养点:模式识别 # 测量问题 "在比例尺1:5000的地图上,2cm线段代表实际距离多少米?", # 知识点:比例尺应用 素养点:单位换算 # 逻辑推理 "A说B在说谎,B说C在说谎,C说AB都在说谎,问谁在说真话?" # 知识点:逻辑推理 素养点:批判性思维 ] for q in test_cases: print("\n" + "=" * 60) analyzer = ProblemAnalyzer(q) analyzer.execute()