You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

75 lines
2.3 KiB

from py2neo import Graph, Transaction
from pathlib import Path
class Neo4jExecutor:
def __init__(self, uri, auth):
self.graph = Graph(uri, auth=auth)
self.batch_size = 5 # 每批执行的语句数量
self.delay = 0.1 # 批处理间隔(秒)
def execute_cypher_file(self, file_path: str) -> dict:
"""执行Cypher脚本文件"""
stats = {
'total': 0,
'success': 0,
'failed': 0,
'duration': 0
}
try:
cypher_script = Path(file_path).read_text(encoding='utf-8')
return self.execute_cypher(cypher_script, stats)
except Exception as e:
print(f"文件读取失败: {str(e)}")
return stats
# 在Neo4jExecutor类中添加参数处理
def execute_cypher(self, cypher_script: str, params: dict = None) -> dict:
"""支持参数化查询"""
if params is None:
params = {}
try:
result = self.graph.run(cypher_script, parameters=params)
return result.stats()
except Exception as e:
print(f"执行失败: {str(e)}")
return {}
def _execute_single(self, tx: Transaction, stmt: str):
"""执行单个语句"""
try:
tx.run(stmt)
except Exception as e:
if 'already exists' in str(e):
print(f"忽略已存在的约束: {stmt[:50]}...")
else:
raise
def _retry_single(self, stmt: str) -> bool:
"""重试单个语句"""
try:
self.graph.run(stmt)
return True
except Exception as e:
print(f"语句执行失败: {stmt[:60]}... \n错误信息: {str(e)}")
return False
# 使用示例
if __name__ == "__main__":
# 初始化执行器
executor = Neo4jExecutor(
uri="neo4j://10.10.21.20:7687",
auth=("neo4j", "DsideaL4r5t6y7u")
)
# 执行Cypher文件
result = executor.execute_cypher_file("knowledge_graph.cypher")
print(f"\n执行结果:")
print(f"- 总语句数: {result['total']}")
print(f"- 成功: {result['success']}")
print(f"- 失败: {result['failed']}")
print(f"- 耗时: {result['duration']}")