You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

256 lines
9.4 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
"""
数学题目分析系统 v6.3(稳定流式处理版)
"""
import re
import json
import hashlib
import requests
from py2neo import Graph
from typing import Dict, List
from Config import NEO4J_URI, NEO4J_AUTH, MODEL_API_URL, MODEL_API_KEY, MODEL_NAME
# ================== 配置类 ==================
class Config:
NEO4J_URI = NEO4J_URI
NEO4J_AUTH = NEO4J_AUTH
MODEL_API_URL = MODEL_API_URL
MODEL_API_KEY = MODEL_API_KEY
MODEL_NAME = MODEL_NAME
MAX_CONTENT_LENGTH = 500
STREAM_TIMEOUT = 30
# ================== 知识图谱管理 ==================
class KnowledgeManager:
def __init__(self):
self.graph = Graph(Config.NEO4J_URI, auth=Config.NEO4J_AUTH)
self._verify_connection()
self.knowledge_map = self._load_knowledge()
self.literacy_map = self._load_literacy()
print("✅ 知识库加载完成")
print(f"有效知识点({len(self.knowledge_map)}个): {list(self.knowledge_map.keys())[:3]}...")
print(f"有效素养点({len(self.literacy_map)}个): {list(self.literacy_map.keys())[:3]}...")
def _verify_connection(self):
"""安全连接验证"""
try:
self.graph.run("RETURN 1 AS test")
print("✅ Neo4j连接验证成功")
except Exception as e:
print(f"❌ Neo4j连接失败: {str(e)}")
raise
def _load_knowledge(self) -> Dict[str, str]:
"""安全加载知识点"""
records = self.graph.run("MATCH (n:KnowledgePoint) WHERE n.name IS NOT NULL RETURN n.id, n.name").data()
return {rec['n.name'].strip(): rec['n.id'] for rec in records if rec['n.name'] and rec['n.name'].strip()}
def _load_literacy(self) -> Dict[str, str]:
"""安全加载素养点"""
records = self.graph.run("MATCH (n:LiteracyNode) WHERE n.title IS NOT NULL RETURN n.value, n.title").data()
return {rec['n.title'].strip(): rec['n.value'] for rec in records if rec['n.title'] and rec['n.title'].strip()}
def store_analysis(self, question_id: str, content: str,
knowledge: List[str], literacy: List[str]):
"""增强存储方法"""
tx = self.graph.begin()
try:
# 创建/更新题目节点
tx.run("""
MERGE (q:Question {id: $id})
SET q.content = $content, q.updateTime = timestamp()
""", {"id": question_id, "content": content})
# 关联知识点
valid_kp = []
for name in knowledge:
clean_name = name.strip()
if kp_id := self.knowledge_map.get(clean_name):
tx.run("""
MATCH (q:Question {id: $qid}), (kp:KnowledgePoint {id: $kpid})
MERGE (q)-[r:REQUIRES_KNOWLEDGE]->(kp)
SET r.lastUsed = timestamp()
""", {"qid": question_id, "kpid": kp_id})
valid_kp.append(clean_name)
# 关联素养点
valid_lp = []
for title in literacy:
clean_title = title.strip()
if lit_id := self.literacy_map.get(clean_title):
tx.run("""
MATCH (q:Question {id: $qid}), (lp:LiteracyNode {value: $lpid})
MERGE (q)-[r:DEVELOPS_LITERACY]->(lp)
SET r.lastUsed = timestamp()
""", {"qid": question_id, "lpid": lit_id})
valid_lp.append(clean_title)
self.graph.commit(tx)
print(f"✅ 存储成功 - 知识点: {valid_kp}, 素养点: {valid_lp}")
except Exception as e:
self.graph.rollback(tx)
print(f"❌ 存储失败: {str(e)}")
# ================== 大模型客户端 ==================
class StreamLLMClient:
def __init__(self, kg: KnowledgeManager):
self.kg = kg
self.base_url = Config.MODEL_API_URL
self.headers = {
"Authorization": f"Bearer {Config.MODEL_API_KEY}",
"Content-Type": "application/json",
"Accept": "application/json"
}
def analyze_problem(self, content: str) -> dict:
"""增强的流式分析"""
try:
prompt = self._build_prompt(content)
response = self._send_request(prompt)
return self._process_stream(response)
except Exception as e:
print(f"🔴 分析失败: {str(e)}")
return {}
def _build_prompt(self, content: str) -> str:
"""构建精准提示词"""
return f"""请严格按以下要求分析数学题目:
1. 知识点必须完全匹配以下列表中的名称(不要新增或修改):
{self.kg.knowledge_map.keys()}
2. 素养点必须完全匹配以下列表中的名称:
{self.kg.literacy_map.keys()}
3. 返回严格JSON格式
{{
"problem_types": ["题型"],
"knowledge_points": ["匹配的知识点"],
"literacy_points": ["匹配的素养点"]
}}
题目内容:{content}"""
def _send_request(self, prompt: str):
"""发送API请求"""
return requests.post(
f"{self.base_url}/chat/completions",
headers=self.headers,
json={
"model": Config.MODEL_NAME,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"stream": True
},
timeout=Config.STREAM_TIMEOUT,
stream=True
)
def _process_stream(self, response) -> dict:
"""可靠的流式处理"""
full_content = ""
try:
for chunk in response.iter_lines():
if chunk:
decoded = chunk.decode('utf-8').strip()
if decoded.startswith('data:'):
json_str = decoded[5:].strip()
if json_str == "[DONE]":
break
try:
data = json.loads(json_str)
if content := data['choices'][0]['delta'].get('content'):
full_content += content
except:
continue
# 调试日志
print(f"原始响应内容:\n{full_content}")
# 提取有效JSON
json_str = re.search(r'\{[\s\S]*\}', full_content).group()
return json.loads(json_str)
except json.JSONDecodeError:
print(f"⚠️ JSON解析失败原始内容{full_content}")
return {}
except Exception as e:
print(f"流处理异常:{str(e)}")
return {}
# ================== 核心引擎 ==================
class ProblemAnalyzer:
def __init__(self, content: str):
self.original = content
self.content = self._preprocess(content)
self.question_id = hashlib.sha256(content.encode()).hexdigest()[:12]
self.kg = KnowledgeManager()
self.llm = StreamLLMClient(self.kg)
def _preprocess(self, text: str) -> str:
"""文本预处理"""
return re.sub(r'[^\w\u4e00-\u9fa5]', '', text)[:Config.MAX_CONTENT_LENGTH]
def execute(self):
"""执行分析流程"""
print(f"\n🔍 分析题目: {self.original[:50]}...")
analysis = self.llm.analyze_problem(self.original)
if not analysis:
print("⚠️ 大模型分析失败")
return
print("\n📊 分析结果:")
print(f" 题型: {analysis.get('problem_types', [])}")
print(f" 知识点: {analysis.get('knowledge_points', [])}")
print(f" 素养点: {analysis.get('literacy_points', [])}")
self.kg.store_analysis(
question_id=self.question_id,
content=self.content,
knowledge=analysis.get('knowledge_points', []),
literacy=analysis.get('literacy_points', [])
)
# ================== 查询接口 ==================
def query_question(question_id: str):
try:
graph = Graph(Config.NEO4J_URI, auth=Config.NEO4J_AUTH)
result = graph.run("""
MATCH (q:Question {id: $id})
OPTIONAL MATCH (q)-[:REQUIRES_KNOWLEDGE]->(kp)
OPTIONAL MATCH (q)-[:DEVELOPS_LITERACY]->(lp)
RETURN q.content AS content,
collect(kp.name) AS knowledge,
collect(lp.title) AS literacy
""", id=question_id).data()
if result:
data = result[0]
print(f"\n🔍 查询结果ID: {question_id}")
print(f"内容: {data['content']}")
print(f"知识点: {data['knowledge']}")
print(f"素养点: {data['literacy']}")
else:
print("未找到相关题目")
except Exception as e:
print(f"查询错误: {str(e)}")
# ================== 测试执行 ==================
if __name__ == '__main__':
test_cases = [
"小明用50元买了3本笔记本每本8元还剩多少钱",
"甲乙两车相距300公里甲车速度60km/h乙车40km/h几小时后相遇"
]
for q in test_cases:
print("\n" + "=" * 60)
analyzer = ProblemAnalyzer(q)
analyzer.execute()
query_question('6fff79108736')