from py2neo import Graph, Node, Relationship, Subgraph from typing import List, Tuple # 修正版本查询代码 #version = db.run("CALL dbms.components() YIELD versions UNWIND versions AS version RETURN version").evaluate() #print(f"Neo4j 版本: {version}") def clear(db): # 清空数据 db.run("MATCH (n) DETACH DELETE n") # 分步删除约束和索引 try: # 删除约束 constraints = db.run("SHOW CONSTRAINTS YIELD name").data() for constr in constraints: db.run(f"DROP CONSTRAINT `{constr['name']}`") # 删除索引 indexes = db.run("SHOW INDEXES YIELD name, type WHERE type <> 'LOOKUP'").data() for idx in indexes: db.run(f"DROP INDEX `{idx['name']}`") except Exception as e: print(f"删除操作失败: {e}") def create_subgraph(db: Graph, nodes: List[Node], relations: List[Tuple[Node, str, Node]]) -> None: """统一创建子图""" subgraph = Subgraph( nodes=nodes, relationships=[Relationship(start, rel_type, end) for start, rel_type, end in relations] ) db.create(subgraph) def transactional_create(db: Graph, nodes: List[Node], relations: List[Tuple[Node, str, Node]]) -> None: """事务方式创建数据""" try: tx = db.begin() subgraph = Subgraph( nodes=nodes, relationships=[Relationship(start, rel_type, end) for start, rel_type, end in relations] ) tx.create(subgraph) db.commit(tx) except Exception as e: db.rollback(tx) print(f"事务操作失败: {str(e)}") raise