You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

211 lines
7.7 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
import re
import time
import hashlib
from typing import Iterator, Tuple, Dict
from py2neo import Graph
from openai import OpenAI
from openai.types.chat import ChatCompletionChunk
from Config import *
from K2_Neo4jExecutor import *
class KnowledgeGraph:
def __init__(self, content: str):
self.content = content
self.question_id = self._generate_question_id()
self.graph = self._init_graph_connection()
self.existing_knowledge = self._fetch_existing_nodes("KnowledgePoint")
self.existing_ability = self._fetch_existing_nodes("AbilityPoint")
self.client = OpenAI(api_key=MODEL_API_KEY, base_url=MODEL_API_URL)
def _init_graph_connection(self) -> Graph:
"""初始化并测试数据库连接"""
try:
graph = Graph(NEO4J_URI, auth=NEO4J_AUTH)
graph.run("RETURN 1").data()
print("✅ Neo4j连接成功")
return graph
except Exception as e:
raise ConnectionError(f"❌ 数据库连接失败: {str(e)}")
def _generate_question_id(self) -> str:
"""生成题目唯一标识符"""
return hashlib.md5(self.content.encode()).hexdigest()[:8]
def _fetch_existing_nodes(self, label: str) -> Dict[str, str]:
"""从Neo4j获取已有节点"""
try:
cypher = f"MATCH (n:{label}) RETURN n.id as id, n.name as name"
result = self.graph.run(cypher).data()
return {item['id']: item['name'] for item in result}
except Exception as e:
print(f"❌ 节点查询失败: {str(e)}")
return {}
def _generate_stream(self) -> Iterator[ChatCompletionChunk]:
"""生成限制性提示词"""
system_prompt = f'''# 严格生成规则
1. 仅允许使用以下预注册节点:
- 知识点列表(共{len(self.existing_knowledge)}个):
{self._format_node_list(self.existing_knowledge)}
- 能力点列表(共{len(self.existing_ability)}个):
{self._format_node_list(self.existing_ability)}
2. 必须遵守的Cypher模式
MERGE (q:Question {{id: "{self.question_id}"}})
SET q.content = "题目内容摘要"
WITH q
MATCH (kp:KnowledgePoint {{id: "KP_xxxxxx"}})
MERGE (q)-[:TESTS_KNOWLEDGE {{weight: 0.8}}]->(kp)
MATCH (ab:AbilityPoint {{id: "AB_xxxxxx"}})
MERGE (q)-[:REQUIRES_ABILITY {{weight: 0.7}}]->(ab)
3. 绝对禁止:
- 使用CREATE创建新节点
- 修改已有节点属性
- 使用未注册的ID'''
return self.client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": self.content}
],
stream=True,
timeout=300
)
def _format_node_list(self, nodes: Dict[str, str]) -> str:
"""格式化节点列表"""
if not nodes:
return " (无相关节点)"
sample = []
for i, (k, v) in enumerate(nodes.items()):
if i >= 5:
sample.append(f" ...(共{len(nodes)}仅显示前5个")
break
sample.append(f" - {k}: {v}")
return '\n'.join(sample)
def _extract_cypher(self, content: str) -> str:
"""安全提取Cypher"""
safe_blocks = []
for block in re.findall(r"```(?:cypher)?\n(.*?)```", content, re.DOTALL):
cleaned = self._sanitize_cypher(block)
if cleaned:
safe_blocks.append(cleaned)
return ';\n\n'.join(safe_blocks) if safe_blocks else ""
def _sanitize_cypher(self, cypher: str) -> str:
"""消毒Cypher语句"""
valid_lines = []
for line in cypher.split('\n'):
line = line.split('//')[0].strip()
if not line:
continue
# 检查非法操作
if re.search(r'\bCREATE\b', line, re.IGNORECASE):
continue
# 验证节点ID
if not self._validate_ids(line):
continue
# 验证权重范围
if not self._validate_weight(line):
continue
valid_lines.append(line)
return '\n'.join(valid_lines) if valid_lines else ''
def _validate_ids(self, line: str) -> bool:
"""验证行内的所有ID"""
kp_ids = {id_.upper() for id_ in re.findall(r'kp_[\da-f]{6}', line, re.IGNORECASE)}
ab_ids = {id_.upper() for id_ in re.findall(r'ab_[\da-f]{6}', line, re.IGNORECASE)}
valid_kp = all(kp in self.existing_knowledge for kp in kp_ids)
valid_ab = all(ab in self.existing_ability for ab in ab_ids)
return valid_kp and valid_ab
def _validate_weight(self, line: str) -> bool:
"""验证关系权重是否合法"""
weight_match = re.search(r"weight\s*:\s*([0-9.]+)", line)
if weight_match:
try:
weight = float(weight_match.group(1))
return 0.0 <= weight <= 1.0
except ValueError:
return False
return True # 没有weight属性时视为合法
def run(self) -> Tuple[bool, str, str]:
"""执行安全生成流程"""
if not self.existing_knowledge or not self.existing_ability:
print("❌ 知识库或能力点为空,请检查数据库")
return False, "节点数据为空", ""
start_time = time.time()
spinner = ['', '', '', '', '', '', '', '', '', '']
content_buffer = []
try:
print(f"🚀 开始生成(知识点:{len(self.existing_knowledge)}个,能力点:{len(self.existing_ability)}个)")
stream = self._generate_stream()
for idx, chunk in enumerate(stream):
print(f"\r{spinner[idx % 10]} 生成中({int(time.time() - start_time)}秒)", end="")
if chunk.choices and chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
content_buffer.append(content_chunk)
if len(content_buffer) == 1:
print("\n\n📝 内容生成开始:")
print(content_chunk, end="", flush=True)
if content_buffer:
full_content = ''.join(content_buffer)
cypher_script = self._extract_cypher(full_content)
print(f"\n\n✅ 生成完成!耗时 {int(time.time() - start_time)}")
print("\n================ 安全Cypher ===============")
print(cypher_script if cypher_script else "未通过安全检查")
print("==========================================")
return True, full_content, cypher_script
print("\n⚠️ 生成完成但未获取到有效内容")
return False, "空内容", ""
except Exception as e:
print(f"\n\n❌ 生成失败:{str(e)}")
return False, str(e), ""
if __name__ == '__main__':
# 准备执行
executor = K2_Neo4jExecutor(
uri=NEO4J_URI,
auth=NEO4J_AUTH
)
# 测试用例
test_content = '''
题目一个长方体的长是8厘米宽是5厘米高是3厘米求它的表面积是多少平方厘米
'''
try:
kg = KnowledgeGraph(test_content)
success, result, cypher = kg.run()
if success and cypher:
res = executor.execute_cypher_text(cypher)
print("\n\n执行结果:" + res)
except Exception as e:
print(f"程序初始化失败: {str(e)}")