# -*- coding: utf-8 -*- """ 数学题目分析系统 v6.1(严格匹配版) 功能特性: 1. 纯大模型分析 2. 流式响应处理 3. 严格匹配已有节点 4. 不创建新知识点/素养点 """ import re import json import hashlib from typing import Dict, List import requests from py2neo import Graph from Config import * # ================== 配置区 ================== class Config: # Neo4j配置 NEO4J_URI = NEO4J_URI NEO4J_AUTH = NEO4J_AUTH # 大模型配置(示例为阿里云配置) MODEL_API_URL = MODEL_API_URL MODEL_API_KEY = MODEL_API_KEY MODEL_NAME = MODEL_NAME # 超时配置 STREAM_TIMEOUT = 30 # 流式响应总超时 CHUNK_TIMEOUT = 5 # 单次数据块等待超时 # 系统参数 MAX_CONTENT_LENGTH = 500 # ================== 流式大模型客户端 ================== class StreamLLMClient: """支持流式响应的大模型客户端""" def __init__(self): self.base_url = Config.MODEL_API_URL self.headers = { "Authorization": f"Bearer {Config.MODEL_API_KEY}", "Content-Type": "application/json" } def analyze_problem(self, content: str) -> dict: """流式响应分析""" try: response = requests.post( f"{self.base_url}/chat/completions", headers=self.headers, json={ "model": Config.MODEL_NAME, "messages": [{ "role": "user", "content": f"""请严格按JSON格式分析数学题目: {{ "problem_types": ["题型列表"], "knowledge_points": ["知识点名称(必须与数据库完全一致)"], "literacy_points": ["素养点名称(必须与数据库完全一致)"] }} 题目:{content}""" }], "temperature": 0.2, "stream": True }, timeout=Config.STREAM_TIMEOUT, stream=True ) response.raise_for_status() return self._process_stream(response) except requests.exceptions.RequestException as e: print(f"🌐 网络错误: {str(e)}") return {} except Exception as e: print(f"🔴 解析失败: {str(e)}") return {} def _process_stream(self, response) -> dict: """处理流式响应""" full_content = "" for chunk in response.iter_lines(): if chunk: decoded_chunk = chunk.decode('utf-8') if decoded_chunk.startswith("data:"): try: chunk_data = json.loads(decoded_chunk[5:]) content = chunk_data['choices'][0]['delta'].get('content', '') full_content += content except: continue try: json_str = re.search(r'\{.*\}', full_content, re.DOTALL).group() return json.loads(json_str) except: print("🔴 无法解析大模型输出") return {} # ================== 知识图谱管理 ================== class KnowledgeManager: """严格匹配型知识图谱管理器""" def __init__(self): self.graph = Graph(Config.NEO4J_URI, auth=Config.NEO4J_AUTH) self.knowledge_map = self._load_knowledge() self.literacy_map = self._load_literacy() def _load_knowledge(self) -> Dict[str, str]: """加载知识点映射(id -> name)""" return {rec['n.id']: rec['n.name'] for rec in self.graph.run("MATCH (n:KnowledgePoint) RETURN n.id, n.name")} def _load_literacy(self) -> Dict[str, str]: """加载素养点映射(value -> title)""" return {rec['n.value']: rec['n.title'] for rec in self.graph.run("MATCH (n:LiteracyNode) RETURN n.value, n.title")} def store_analysis(self, question_id: str, content: str, knowledge: List[str], literacy: List[str]): """使用参数化查询解决转义问题""" tx = self.graph.begin() try: # 使用参数化查询避免转义问题 tx.run( "MERGE (q:Question {id: $question_id}) " "SET q.content = $content", { "question_id": question_id, "content": content } ) # 关联知识点(参数化版本) for kp_name in knowledge: if kp_id := next((k for k, v in self.knowledge_map.items() if v == kp_name), None): tx.run( "MATCH (kp:KnowledgePoint {id: $kp_id}) " "MATCH (q:Question {id: $qid}) " "MERGE (q)-[:REQUIRES_KNOWLEDGE]->(kp)", {"kp_id": kp_id, "qid": question_id} ) # 关联素养点(参数化版本) for lit_name in literacy: if lit_id := next((k for k, v in self.literacy_map.items() if v == lit_name), None): tx.run( "MATCH (lp:LiteracyNode {value: $lit_id}) " "MATCH (q:Question {id: $qid}) " "MERGE (q)-[:DEVELOPS_LITERACY]->(lp)", {"lit_id": lit_id, "qid": question_id} ) self.graph.commit(tx) print("✅ 数据存储成功") except Exception as e: self.graph.rollback(tx) print(f"❌ 存储失败: {str(e)}") # ================== 核心分析引擎 ================== class ProblemAnalyzer: """纯大模型分析引擎""" def __init__(self, content: str): self.original = content self.content = self._preprocess(content) self.question_id = hashlib.sha256(content.encode()).hexdigest()[:12] self.kg = KnowledgeManager() self.llm = StreamLLMClient() def _preprocess(self, text: str) -> str: """文本预处理""" return re.sub(r'[^\w\u4e00-\u9fa5]', '', text)[:Config.MAX_CONTENT_LENGTH] def analyze(self) -> dict: """纯大模型分析""" result = self.llm.analyze_problem(self.original) return { "problem_id": self.question_id, "problem_types": result.get('problem_types', [])[:3], "knowledge_points": result.get('knowledge_points', [])[:2], "literacy_points": result.get('literacy_points', [])[:2] } def execute(self): """执行完整流程""" print(f"\n🔍 开始分析题目:{self.original[:50]}...") analysis = self.analyze() print("\n📊 大模型分析报告:") print(f" 题型识别:{analysis.get('problem_types', [])}") print(f" 推荐知识点:{analysis.get('knowledge_points', [])}") print(f" 关联素养点:{analysis.get('literacy_points', [])}") # 存储到知识图谱 self.kg.store_analysis( question_id=analysis['problem_id'], content=self.content, knowledge=analysis.get('knowledge_points', []), literacy=analysis.get('literacy_points', []) ) # ================== 测试用例 ================== if __name__ == '__main__': test_cases = [ "小明用50元买了3本笔记本,每本8元,还剩多少钱?", "甲乙两车相距300公里,甲车速度60km/h,乙车40km/h,几小时后相遇?" ] for question in test_cases: print("\n" + "=" * 80) print(f"📚 处理题目:{question}") analyzer = ProblemAnalyzer(question) analyzer.execute()