You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

235 lines
8.2 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
"""
数学题目分析系统 v6.1(严格匹配版)
功能特性:
1. 纯大模型分析
2. 流式响应处理
3. 严格匹配已有节点
4. 不创建新知识点/素养点
"""
import re
import json
import hashlib
from typing import Dict, List
import requests
from py2neo import Graph
from Config import *
# ================== 配置区 ==================
class Config:
# Neo4j配置
NEO4J_URI = NEO4J_URI
NEO4J_AUTH = NEO4J_AUTH
# 大模型配置(示例为阿里云配置)
MODEL_API_URL = MODEL_API_URL
MODEL_API_KEY = MODEL_API_KEY
MODEL_NAME = MODEL_NAME
# 超时配置
STREAM_TIMEOUT = 30 # 流式响应总超时
CHUNK_TIMEOUT = 5 # 单次数据块等待超时
# 系统参数
MAX_CONTENT_LENGTH = 500
# ================== 流式大模型客户端 ==================
class StreamLLMClient:
"""支持流式响应的大模型客户端"""
def __init__(self):
self.base_url = Config.MODEL_API_URL
self.headers = {
"Authorization": f"Bearer {Config.MODEL_API_KEY}",
"Content-Type": "application/json"
}
def analyze_problem(self, content: str) -> dict:
"""流式响应分析"""
try:
response = requests.post(
f"{self.base_url}/chat/completions",
headers=self.headers,
json={
"model": Config.MODEL_NAME,
"messages": [{
"role": "user",
"content": f"""请严格按JSON格式分析数学题目
{{
"problem_types": ["题型列表"],
"knowledge_points": ["知识点名称(必须与数据库完全一致)"],
"literacy_points": ["素养点名称(必须与数据库完全一致)"]
}}
题目:{content}"""
}],
"temperature": 0.2,
"stream": True
},
timeout=Config.STREAM_TIMEOUT,
stream=True
)
response.raise_for_status()
return self._process_stream(response)
except requests.exceptions.RequestException as e:
print(f"🌐 网络错误: {str(e)}")
return {}
except Exception as e:
print(f"🔴 解析失败: {str(e)}")
return {}
def _process_stream(self, response) -> dict:
"""处理流式响应"""
full_content = ""
for chunk in response.iter_lines():
if chunk:
decoded_chunk = chunk.decode('utf-8')
if decoded_chunk.startswith("data:"):
try:
chunk_data = json.loads(decoded_chunk[5:])
content = chunk_data['choices'][0]['delta'].get('content', '')
full_content += content
except:
continue
try:
json_str = re.search(r'\{.*\}', full_content, re.DOTALL).group()
return json.loads(json_str)
except:
print("🔴 无法解析大模型输出")
return {}
# ================== 知识图谱管理 ==================
class KnowledgeManager:
"""严格匹配型知识图谱管理器"""
def __init__(self):
self.graph = Graph(Config.NEO4J_URI, auth=Config.NEO4J_AUTH)
self.knowledge_map = self._load_knowledge()
self.literacy_map = self._load_literacy()
def _load_knowledge(self) -> Dict[str, str]:
"""加载知识点映射id -> name"""
return {rec['n.id']: rec['n.name']
for rec in self.graph.run("MATCH (n:KnowledgePoint) RETURN n.id, n.name")}
def _load_literacy(self) -> Dict[str, str]:
"""加载素养点映射value -> title"""
return {rec['n.value']: rec['n.title']
for rec in self.graph.run("MATCH (n:LiteracyNode) RETURN n.value, n.title")}
def store_analysis(self, question_id: str, content: str,
knowledge: List[str], literacy: List[str]):
"""使用参数化查询解决转义问题"""
tx = self.graph.begin()
try:
# 使用参数化查询避免转义问题
tx.run(
"MERGE (q:Question {id: $question_id}) "
"SET q.content = $content",
{
"question_id": question_id,
"content": content
}
)
# 关联知识点(参数化版本)
for kp_name in knowledge:
if kp_id := next((k for k, v in self.knowledge_map.items() if v == kp_name), None):
tx.run(
"MATCH (kp:KnowledgePoint {id: $kp_id}) "
"MATCH (q:Question {id: $qid}) "
"MERGE (q)-[:REQUIRES_KNOWLEDGE]->(kp)",
{"kp_id": kp_id, "qid": question_id}
)
# 关联素养点(参数化版本)
for lit_name in literacy:
if lit_id := next((k for k, v in self.literacy_map.items() if v == lit_name), None):
tx.run(
"MATCH (lp:LiteracyNode {value: $lit_id}) "
"MATCH (q:Question {id: $qid}) "
"MERGE (q)-[:DEVELOPS_LITERACY]->(lp)",
{"lit_id": lit_id, "qid": question_id}
)
self.graph.commit(tx)
print("✅ 数据存储成功")
except Exception as e:
self.graph.rollback(tx)
print(f"❌ 存储失败: {str(e)}")
# ================== 核心分析引擎 ==================
class ProblemAnalyzer:
"""纯大模型分析引擎"""
def __init__(self, content: str):
self.original = content
self.content = self._preprocess(content)
self.question_id = hashlib.sha256(content.encode()).hexdigest()[:12]
self.kg = KnowledgeManager()
self.llm = StreamLLMClient()
def _preprocess(self, text: str) -> str:
"""文本预处理"""
return re.sub(r'[^\w\u4e00-\u9fa5]', '', text)[:Config.MAX_CONTENT_LENGTH]
def analyze(self) -> dict:
"""纯大模型分析"""
result = self.llm.analyze_problem(self.original)
return {
"problem_id": self.question_id,
"problem_types": result.get('problem_types', [])[:3],
"knowledge_points": result.get('knowledge_points', [])[:2],
"literacy_points": result.get('literacy_points', [])[:2]
}
def execute(self):
"""执行完整流程"""
print(f"\n🔍 开始分析题目:{self.original[:50]}...")
analysis = self.analyze()
print("\n📊 大模型分析报告:")
print(f" 题型识别:{analysis.get('problem_types', [])}")
print(f" 推荐知识点:{analysis.get('knowledge_points', [])}")
print(f" 关联素养点:{analysis.get('literacy_points', [])}")
# 存储到知识图谱
self.kg.store_analysis(
question_id=analysis['problem_id'],
content=self.content,
knowledge=analysis.get('knowledge_points', []),
literacy=analysis.get('literacy_points', [])
)
# ================== 测试用例 ==================
if __name__ == '__main__':
test_cases = [
"小明用50元买了3本笔记本每本8元还剩多少钱",
"甲乙两车相距300公里甲车速度60km/h乙车40km/h几小时后相遇"
]
for question in test_cases:
print("\n" + "=" * 80)
print(f"📚 处理题目:{question}")
analyzer = ProblemAnalyzer(question)
analyzer.execute()
'''
// 查询题目关联信息
MATCH (q:Question {id: '6fff79108736'})
OPTIONAL MATCH (q)-[:REQUIRES_KNOWLEDGE]->(kp:KnowledgePoint)
OPTIONAL MATCH (q)-[:DEVELOPS_LITERACY]->(lp:LiteracyNode)
RETURN
q.content AS 题目内容,
COLLECT(DISTINCT kp.name) AS 关联知识点,
COLLECT(DISTINCT lp.title) AS 关联素养点
'''