You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

296 lines
12 KiB

3 weeks ago
# -*- coding: utf-8 -*-
"""
数学题目分析系统 v6.3稳定流式处理版
"""
import re
import json
import hashlib
import requests
from py2neo import Graph
from typing import Dict, List
from Config import NEO4J_URI, NEO4J_AUTH, MODEL_API_URL, MODEL_API_KEY, MODEL_NAME
# ================== 配置类 ==================
class Config:
NEO4J_URI = NEO4J_URI
NEO4J_AUTH = NEO4J_AUTH
MODEL_API_URL = MODEL_API_URL
MODEL_API_KEY = MODEL_API_KEY
MODEL_NAME = MODEL_NAME
MAX_CONTENT_LENGTH = 500
STREAM_TIMEOUT = 30
# ================== 知识图谱管理 ==================
class KnowledgeManager:
def __init__(self):
self.graph = Graph(Config.NEO4J_URI, auth=Config.NEO4J_AUTH)
self._verify_connection()
self.knowledge_map = self._load_knowledge()
self.literacy_map = self._load_literacy()
print("✅ 知识库加载完成")
print(f"有效知识点({len(self.knowledge_map)}个): {list(self.knowledge_map.keys())[:3]}...")
print(f"有效素养点({len(self.literacy_map)}个): {list(self.literacy_map.keys())[:3]}...")
def _verify_connection(self):
"""安全连接验证"""
try:
self.graph.run("RETURN 1 AS test")
print("✅ Neo4j连接验证成功")
except Exception as e:
print(f"❌ Neo4j连接失败: {str(e)}")
raise
def _load_knowledge(self) -> Dict[str, str]:
"""安全加载知识点"""
records = self.graph.run("MATCH (n:KnowledgePoint) WHERE n.name IS NOT NULL RETURN n.id, n.name").data()
return {rec['n.name'].strip(): rec['n.id'] for rec in records if rec['n.name'] and rec['n.name'].strip()}
def _load_literacy(self) -> Dict[str, str]:
"""安全加载素养点"""
records = self.graph.run("MATCH (n:LiteracyNode) WHERE n.title IS NOT NULL RETURN n.value, n.title").data()
return {rec['n.title'].strip(): rec['n.value'] for rec in records if rec['n.title'] and rec['n.title'].strip()}
def store_analysis(self, question_id: str, content: str,
knowledge: List[str], literacy: List[str]):
"""增强存储方法"""
tx = self.graph.begin()
try:
# 创建/更新题目节点
tx.run("""
MERGE (q:Question {id: $id})
SET q.content = $content, q.updateTime = timestamp()
""", {"id": question_id, "content": content})
# 关联知识点
valid_kp = []
for name in knowledge:
clean_name = name.strip()
if kp_id := self.knowledge_map.get(clean_name):
tx.run("""
MATCH (q:Question {id: $qid}), (kp:KnowledgePoint {id: $kpid})
MERGE (q)-[r:REQUIRES_KNOWLEDGE]->(kp)
SET r.lastUsed = timestamp()
""", {"qid": question_id, "kpid": kp_id})
valid_kp.append(clean_name)
# 关联素养点
valid_lp = []
for title in literacy:
clean_title = title.strip()
if lit_id := self.literacy_map.get(clean_title):
tx.run("""
MATCH (q:Question {id: $qid}), (lp:LiteracyNode {value: $lpid})
MERGE (q)-[r:DEVELOPS_LITERACY]->(lp)
SET r.lastUsed = timestamp()
""", {"qid": question_id, "lpid": lit_id})
valid_lp.append(clean_title)
self.graph.commit(tx)
print(f"✅ 存储成功 - 知识点: {valid_kp}, 素养点: {valid_lp}")
except Exception as e:
self.graph.rollback(tx)
print(f"❌ 存储失败: {str(e)}")
# ================== 大模型客户端 ==================
class StreamLLMClient:
def __init__(self, kg: KnowledgeManager):
self.kg = kg
self.base_url = Config.MODEL_API_URL
self.headers = {
"Authorization": f"Bearer {Config.MODEL_API_KEY}",
"Content-Type": "application/json",
"Accept": "application/json"
}
def analyze_problem(self, content: str) -> dict:
"""增强的流式分析"""
try:
prompt = self._build_prompt(content)
response = self._send_request(prompt)
return self._process_stream(response)
except Exception as e:
print(f"🔴 分析失败: {str(e)}")
return {}
def _build_prompt(self, content: str) -> str:
"""构建精准提示词"""
return f"""请严格按以下要求分析数学题目:
1. 知识点必须完全匹配以下列表中的名称不要新增或修改
{self.kg.knowledge_map.keys()}
2. 素养点必须完全匹配以下列表中的名称
{self.kg.literacy_map.keys()}
3. 返回严格JSON格式
{{
"problem_types": ["题型"],
"knowledge_points": ["匹配的知识点"],
"literacy_points": ["匹配的素养点"]
}}
题目内容{content}"""
def _send_request(self, prompt: str):
"""发送API请求"""
return requests.post(
f"{self.base_url}/chat/completions",
headers=self.headers,
json={
"model": Config.MODEL_NAME,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"stream": True
},
timeout=Config.STREAM_TIMEOUT,
stream=True
)
def _process_stream(self, response) -> dict:
"""可靠的流式处理"""
full_content = ""
try:
for chunk in response.iter_lines():
if chunk:
decoded = chunk.decode('utf-8').strip()
if decoded.startswith('data:'):
json_str = decoded[5:].strip()
if json_str == "[DONE]":
break
try:
data = json.loads(json_str)
if content := data['choices'][0]['delta'].get('content'):
full_content += content
except:
continue
# 调试日志
print(f"原始响应内容:\n{full_content}")
# 提取有效JSON
json_str = re.search(r'\{[\s\S]*\}', full_content).group()
return json.loads(json_str)
except json.JSONDecodeError:
print(f"⚠️ JSON解析失败原始内容{full_content}")
return {}
except Exception as e:
print(f"流处理异常:{str(e)}")
return {}
# ================== 核心引擎 ==================
class ProblemAnalyzer:
def __init__(self, content: str):
self.original = content
self.content = self._preprocess(content)
self.question_id = hashlib.sha256(content.encode()).hexdigest()[:12]
self.kg = KnowledgeManager()
self.llm = StreamLLMClient(self.kg)
def _preprocess(self, text: str) -> str:
"""文本预处理"""
return re.sub(r'[^\w\u4e00-\u9fa5]', '', text)[:Config.MAX_CONTENT_LENGTH]
def execute(self):
"""执行分析流程"""
print(f"\n🔍 分析题目: {self.original[:50]}...")
analysis = self.llm.analyze_problem(self.original)
if not analysis:
print("⚠️ 大模型分析失败")
return
print("\n📊 分析结果:")
print(f" 题型: {analysis.get('problem_types', [])}")
print(f" 知识点: {analysis.get('knowledge_points', [])}")
print(f" 素养点: {analysis.get('literacy_points', [])}")
self.kg.store_analysis(
question_id=self.question_id,
content=self.content,
knowledge=analysis.get('knowledge_points', []),
literacy=analysis.get('literacy_points', [])
)
# ================== 查询接口 ==================
def query_question(question_id: str):
try:
graph = Graph(Config.NEO4J_URI, auth=Config.NEO4J_AUTH)
result = graph.run("""
MATCH (q:Question {id: $id})
OPTIONAL MATCH (q)-[:REQUIRES_KNOWLEDGE]->(kp)
OPTIONAL MATCH (q)-[:DEVELOPS_LITERACY]->(lp)
RETURN q.content AS content,
collect(kp.name) AS knowledge,
collect(lp.title) AS literacy
""", id=question_id).data()
if result:
data = result[0]
print(f"\n🔍 查询结果ID: {question_id}")
print(f"内容: {data['content']}")
print(f"知识点: {data['knowledge']}")
print(f"素养点: {data['literacy']}")
else:
print("未找到相关题目")
except Exception as e:
print(f"查询错误: {str(e)}")
# ================== 测试执行 ==================
if __name__ == '__main__':
test_cases = [
# 基础运算类
"小明用50元买了3本笔记本每本8元还剩多少钱", # 知识点:四则运算应用 素养点:数学运算能力
# 几何问题
"一个长方形花坛长5米宽3米四周铺1米宽的小路求小路面积", # 知识点:面积计算 素养点:空间观念
# 统计概率
"某班30人数学成绩90分以上占1/580-89分占1/3求80分以下人数", # 知识点:分数运算 素养点:数据分析
# 典型应用题
"甲乙两车相距300公里甲车速度60km/h乙车40km/h几小时后相遇", # 知识点:相遇问题 素养点:数学建模
# 方程应用
"鸡兔同笼头共10个脚共28只问鸡兔各多少", # 知识点:二元一次方程 素养点:代数思维
# 比例问题
"配制农药药液药粉和水的比例是3:100现有水500kg需要多少药粉", # 知识点:比例应用 素养点:量化分析
# 函数图像
"某物体做匀加速运动初速度2m/s加速度0.5m/s²写出速度v与时间t的关系式", # 知识点:一次函数 素养点:数形结合
# 立体几何
"圆柱形水桶底面半径0.4米高1.2米求它的容积π取3.14", # 知识点:圆柱体积 素养点:空间想象
# 概率统计
"掷两个骰子点数和为7的概率是多少", # 知识点:概率计算 素养点:数据预测
# 工程问题
"甲队单独完成工程需20天乙队需30天两队合作需要多少天", # 知识点:工程问题 素养点:模型构建
# 最优化问题
"用20米篱笆围矩形菜地怎样围面积最大最大面积是多少", # 知识点:二次函数 素养点:优化思想
# 利率问题
"本金10000元年利率3.5%存3年到期本息合计多少", # 知识点:单利计算 素养点:金融素养
# 数列问题
"等差数列首项5公差3求第10项的值", # 知识点:等差数列 素养点:模式识别
# 测量问题
"在比例尺1:5000的地图上2cm线段代表实际距离多少米", # 知识点:比例尺应用 素养点:单位换算
# 逻辑推理
"A说B在说谎B说C在说谎C说AB都在说谎问谁在说真话" # 知识点:逻辑推理 素养点:批判性思维
]
for q in test_cases:
print("\n" + "=" * 60)
analyzer = ProblemAnalyzer(q)
analyzer.execute()