main
黄海 5 months ago
parent a87bd06177
commit 623fd11a53

@ -1,8 +1,6 @@
import json
from Neo4j.Neo4jExecutor import *
from Neo4j.Util import clear
def json_to_cypher(data):
"""将知识体系JSON转换为Cypher插入脚本"""

@ -1,7 +1,49 @@
import re
from Util import *
from Config import *
from py2neo import Graph, Node, Relationship, Subgraph
from typing import List, Tuple
def clear(db):
# 清空数据
db.run("MATCH (n) DETACH DELETE n")
# 分步删除约束和索引
try:
# 删除约束
constraints = db.run("SHOW CONSTRAINTS YIELD name").data()
for constr in constraints:
db.run(f"DROP CONSTRAINT `{constr['name']}`")
# 删除索引
indexes = db.run("SHOW INDEXES YIELD name, type WHERE type <> 'LOOKUP'").data()
for idx in indexes:
db.run(f"DROP INDEX `{idx['name']}`")
except Exception as e:
print(f"删除操作失败: {e}")
def create_subgraph(db: Graph, nodes: List[Node], relations: List[Tuple[Node, str, Node]]) -> None:
"""统一创建子图"""
subgraph = Subgraph(
nodes=nodes,
relationships=[Relationship(start, rel_type, end) for start, rel_type, end in relations]
)
db.create(subgraph)
def tx_create(db: Graph, nodes: List[Node], relations: List[Tuple[Node, str, Node]]) -> None:
"""事务方式创建数据"""
try:
tx = db.begin()
subgraph = Subgraph(
nodes=nodes,
relationships=[Relationship(start, rel_type, end) for start, rel_type, end in relations]
)
tx.create(subgraph)
db.commit(tx)
except Exception as e:
db.rollback(tx)
print(f"事务操作失败: {str(e)}")
raise
class Neo4jExecutor:
def __init__(self, uri, auth):

@ -1,48 +0,0 @@
from py2neo import Graph, Node, Relationship, Subgraph
from typing import List, Tuple
# 修正版本查询代码
#version = db.run("CALL dbms.components() YIELD versions UNWIND versions AS version RETURN version").evaluate()
#print(f"Neo4j 版本: {version}")
def clear(db):
# 清空数据
db.run("MATCH (n) DETACH DELETE n")
# 分步删除约束和索引
try:
# 删除约束
constraints = db.run("SHOW CONSTRAINTS YIELD name").data()
for constr in constraints:
db.run(f"DROP CONSTRAINT `{constr['name']}`")
# 删除索引
indexes = db.run("SHOW INDEXES YIELD name, type WHERE type <> 'LOOKUP'").data()
for idx in indexes:
db.run(f"DROP INDEX `{idx['name']}`")
except Exception as e:
print(f"删除操作失败: {e}")
def create_subgraph(db: Graph, nodes: List[Node], relations: List[Tuple[Node, str, Node]]) -> None:
"""统一创建子图"""
subgraph = Subgraph(
nodes=nodes,
relationships=[Relationship(start, rel_type, end) for start, rel_type, end in relations]
)
db.create(subgraph)
def tx_create(db: Graph, nodes: List[Node], relations: List[Tuple[Node, str, Node]]) -> None:
"""事务方式创建数据"""
try:
tx = db.begin()
subgraph = Subgraph(
nodes=nodes,
relationships=[Relationship(start, rel_type, end) for start, rel_type, end in relations]
)
tx.create(subgraph)
db.commit(tx)
except Exception as e:
db.rollback(tx)
print(f"事务操作失败: {str(e)}")
raise
Loading…
Cancel
Save