main
HuangHai 2 weeks ago
parent beedccf163
commit 09fd16e4b2

@ -25,6 +25,7 @@ VISION_MODEL_NAME = "GLM-4.1V-9B-Thinking"
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USERNAME = "neo4j"
NEO4J_PASSWORD = "DsideaL147258369"
NEO4J_AUTH = (NEO4J_USERNAME, NEO4J_PASSWORD)
# 写入Neo4j的批次大小
BATCH_SIZE_NODES = 100

@ -4,10 +4,10 @@ import json # 用于JSON数据处理
import xml.etree.ElementTree as ET # 用于解析XML文件
from neo4j import GraphDatabase # Neo4j数据库驱动
from Config.Config import * # 导入配置文件
from Util.Neo4jExecutor import Neo4jExecutor
# 动画学院
WORKING_DIR = "./Topic/DongHua"
#TXT_FILE = "sushi.txt"
def xml_to_json(xml_file):
"""
@ -113,7 +113,13 @@ def process_in_batches(tx, query, data, batch_size):
tx.run(query, {"nodes": batch} if "nodes" in query else {"edges": batch})
def main():
if __name__ == "__main__":
# 创建Neo4jExecutor实例
executor = Neo4jExecutor.create_default()
executor.graph.run("MATCH (n) DETACH DELETE n")
print("清库成功")
# 文件路径设置
xml_file = os.path.join(WORKING_DIR, "graph_chunk_entity_relation.graphml")
json_file = os.path.join(WORKING_DIR, "graph_data.json")
@ -121,7 +127,7 @@ def main():
# 将XML转换为JSON
json_data = convert_xml_to_json(xml_file, json_file)
if json_data is None:
return
exit(0)
# 加载节点和边数据
nodes = json_data.get("nodes", [])
@ -196,7 +202,3 @@ def main():
finally:
# 关闭数据库连接
driver.close()
if __name__ == "__main__":
main()

@ -0,0 +1,109 @@
import re
from typing import List, Tuple
from py2neo import Graph, Node, Relationship, Subgraph
from Config import Config
def clear(db):
# 清空数据
db.run("MATCH (n) DETACH DELETE n")
# 分步删除约束和索引
try:
# 删除约束
constraints = db.run("SHOW CONSTRAINTS YIELD name").data()
for constr in constraints:
db.run(f"DROP CONSTRAINT `{constr['name']}`")
# 删除索引
indexes = db.run("SHOW INDEXES YIELD name, type WHERE type <> 'LOOKUP'").data()
for idx in indexes:
db.run(f"DROP INDEX `{idx['name']}`")
except Exception as e:
print(f"删除操作失败: {e}")
def create_subgraph(db: Graph, nodes: List[Node], relations: List[Tuple[Node, str, Node]]) -> None:
"""统一创建子图"""
subgraph = Subgraph(
nodes=nodes,
relationships=[Relationship(start, rel_type, end) for start, rel_type, end in relations]
)
db.create(subgraph)
def tx_create(db: Graph, nodes: List[Node], relations: List[Tuple[Node, str, Node]]) -> None:
"""事务方式创建数据"""
try:
tx = db.begin()
subgraph = Subgraph(
nodes=nodes,
relationships=[Relationship(start, rel_type, end) for start, rel_type, end in relations]
)
tx.create(subgraph)
db.commit(tx)
except Exception as e:
db.rollback(tx)
print(f"事务操作失败: {str(e)}")
raise
class Neo4jExecutor:
# 添加类变量存储连接配置
NEO4J_URI = Config.NEO4J_URI
NEO4J_AUTH = Config.NEO4J_AUTH
def __init__(self, uri=None, auth=None):
# 使用默认配置或传入参数
self.graph = Graph(uri or self.NEO4J_URI,
auth=auth or self.NEO4J_AUTH)
@classmethod
def create_default(cls):
"""使用默认配置创建执行器"""
return cls(cls.NEO4J_URI, cls.NEO4J_AUTH)
# 新增文本执行方法
def execute_cypher_text(self, cypher_text: str) -> dict:
"""直接执行Cypher文本"""
stats = {'total': 0, 'success': 0, 'failed': 0}
try:
statements = re.split(r';\s*\n', cypher_text)
statements = [s.strip() for s in statements if s.strip()]
stats['total'] = len(statements)
for stmt in statements:
try:
self.graph.run(stmt)
stats['success'] += 1
except Exception as e:
stats['failed'] += 1
print(f"执行失败: {stmt[:50]}... \n错误: {str(e)[:100]}")
return stats
except Exception as e:
print(f"执行失败: {stmt[:100]}... \n完整错误: {str(e)}") # 原为str(e)[:100]
return stats
def execute_cypher_file(self, file_path: str) -> dict: # 确保方法名称正确
"""执行Cypher文件"""
stats = {'total': 0, 'success': 0, 'failed': 0}
try:
with open(file_path, 'r', encoding='utf-8') as f:
cypher_script = f.read()
statements = re.split(r';\s*\n', cypher_script)
statements = [s.strip() for s in statements if s.strip()]
stats['total'] = len(statements)
for stmt in statements:
try:
self.graph.run(stmt)
stats['success'] += 1
except Exception as e:
stats['failed'] += 1
print(f"执行失败: {stmt[:50]}... \n错误: {str(e)[:100]}")
return stats
except Exception as e:
print(f"文件错误: {str(e)}")
return stats
Loading…
Cancel
Save