You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

216 lines
8.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
import re
import time
import hashlib
from typing import Iterator, Tuple, Dict
from py2neo import Graph
from openai import OpenAI
from openai.types.chat import ChatCompletionChunk
from Config import *
from K2_Neo4jExecutor import *
class KnowledgeGraph:
def __init__(self, content: str):
self.content = content
self.question_id = self._generate_question_id()
self.graph = self._init_graph_connection()
self.existing_knowledge = self._fetch_existing_nodes("KnowledgePoint")
self.existing_ability = self._fetch_existing_nodes("AbilityPoint")
self.client = OpenAI(api_key=MODEL_API_KEY, base_url=MODEL_API_URL)
def _init_graph_connection(self) -> Graph:
"""初始化并测试数据库连接"""
try:
graph = Graph(NEO4J_URI, auth=NEO4J_AUTH)
graph.run("RETURN 1").data()
print("✅ Neo4j连接成功")
return graph
except Exception as e:
raise ConnectionError(f"❌ 数据库连接失败: {str(e)}")
def _generate_question_id(self) -> str:
"""生成题目唯一标识符"""
return hashlib.md5(self.content.encode()).hexdigest()[:8]
def _fetch_existing_nodes(self, label: str) -> Dict[str, str]:
"""从Neo4j获取已有节点"""
try:
cypher = f"MATCH (n:{label}) RETURN n.id as id, n.name as name"
result = self.graph.run(cypher).data()
return {item['id']: item['name'] for item in result}
except Exception as e:
print(f"❌ 节点查询失败: {str(e)}")
return {}
def _generate_stream(self) -> Iterator[ChatCompletionChunk]:
"""生成限制性提示词"""
# 修改提示词中的Cypher示例部分
system_prompt = f'''
// 修改后的正确示例
MERGE (q:Question {{id: "{self.question_id}"}})
SET q.content = "题目内容摘要"
// 必须添加WITH子句
WITH q
MATCH (kp:KnowledgePoint {{id: "KP_xxxxxx"}})
MERGE (q)-[:TESTS_KNOWLEDGE {{weight: 0.8}}]->(kp)
WITH q // 多个关系需要继续传递
MATCH (ab:AbilityPoint {{id: "AB_xxxxxx"}})
MERGE (q)-[:REQUIRES_ABILITY {{weight: 0.7}}]->(ab)
'''
return self.client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": self.content}
],
stream=True,
timeout=300
)
def _format_node_list(self, nodes: Dict[str, str]) -> str:
"""格式化节点列表"""
if not nodes:
return " (无相关节点)"
sample = []
for i, (k, v) in enumerate(nodes.items()):
if i >= 5:
sample.append(f" ...(共{len(nodes)}仅显示前5个")
break
sample.append(f" - {k}: {v}")
return '\n'.join(sample)
def _extract_cypher(self, content: str) -> str:
"""安全提取Cypher"""
safe_blocks = []
for block in re.findall(r"```(?:cypher)?\n(.*?)```", content, re.DOTALL):
cleaned = self._sanitize_cypher(block)
if cleaned:
safe_blocks.append(cleaned)
return ';\n\n'.join(safe_blocks) if safe_blocks else ""
def _sanitize_cypher(self, cypher: str) -> str:
"""消毒Cypher语句"""
valid_lines = []
for line in cypher.split('\n'):
line = line.split('//')[0].strip()
if not line:
continue
# 检查非法操作
if re.search(r'\bCREATE\b', line, re.IGNORECASE):
continue
# 验证节点ID
if not self._validate_ids(line):
continue
# 验证权重范围
if not self._validate_weight(line):
continue
valid_lines.append(line)
return '\n'.join(valid_lines) if valid_lines else ''
def _validate_ids(self, line: str) -> bool:
"""验证行内的所有ID"""
kp_ids = {id_.upper() for id_ in re.findall(r'kp_[\da-f]{6}', line, re.IGNORECASE)}
ab_ids = {id_.upper() for id_ in re.findall(r'ab_[\da-f]{6}', line, re.IGNORECASE)}
valid_kp = all(kp in self.existing_knowledge for kp in kp_ids)
valid_ab = all(ab in self.existing_ability for ab in ab_ids)
return valid_kp and valid_ab
def _validate_weight(self, line: str) -> bool:
"""验证关系权重是否合法"""
weight_match = re.search(r"weight\s*:\s*([0-9.]+)", line)
if weight_match:
try:
weight = float(weight_match.group(1))
return 0.0 <= weight <= 1.0
except ValueError:
return False
return True # 没有weight属性时视为合法
def _validate_cypher_structure(self, cypher: str) -> bool:
"""验证WITH子句存在性"""
has_merge = re.search(r'\bMERGE\s*\(q:Question\b', cypher, re.IGNORECASE)
has_with = re.search(r'\bWITH\s+q\b', cypher, re.IGNORECASE)
return not has_merge or (has_merge and has_with)
def run(self) -> Tuple[bool, str, str]:
"""执行安全生成流程"""
if not self.existing_knowledge or not self.existing_ability:
print("❌ 知识库或能力点为空,请检查数据库")
return False, "节点数据为空", ""
start_time = time.time()
spinner = ['', '', '', '', '', '', '', '', '', '']
content_buffer = []
try:
print(f"🚀 开始生成(知识点:{len(self.existing_knowledge)}个,能力点:{len(self.existing_ability)}个)")
stream = self._generate_stream()
for idx, chunk in enumerate(stream):
print(f"\r{spinner[idx % 10]} 生成中({int(time.time() - start_time)}秒)", end="")
if chunk.choices and chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
content_buffer.append(content_chunk)
if len(content_buffer) == 1:
print("\n\n📝 内容生成开始:")
print(content_chunk, end="", flush=True)
if content_buffer:
full_content = ''.join(content_buffer)
cypher_script = self._extract_cypher(full_content)
print(f"\n\n✅ 生成完成!耗时 {int(time.time() - start_time)}")
print("\n================ 安全Cypher ===============")
print(cypher_script if cypher_script else "未通过安全检查")
print("==========================================")
return True, full_content, cypher_script
print("\n⚠️ 生成完成但未获取到有效内容")
return False, "空内容", ""
# 修改run方法中的异常处理
except Exception as e:
# 原错误代码
# print(f"\n\n❌ 生成失败:{str(e)}")
# 修正后的代码
error_msg = str(e) if not isinstance(e, dict) else json.dumps(e)
print(f"\n\n❌ 生成失败:{error_msg}")
return False, error_msg, ""
if __name__ == '__main__':
# 准备执行
executor = K2_Neo4jExecutor(
uri=NEO4J_URI,
auth=NEO4J_AUTH
)
# 测试用例
test_content = '''
题目一个长方体的长是8厘米宽是5厘米高是3厘米求它的表面积是多少平方厘米
'''
try:
kg = KnowledgeGraph(test_content)
success, result, cypher = kg.run()
if success and cypher:
res = executor.execute_cypher_text(cypher)
print("\n\n执行结果:" + res)
except Exception as e:
print(f"程序初始化失败: {str(e)}")