from py2neo import Graph, Transaction from pathlib import Path class Neo4jExecutor: def __init__(self, uri, auth): self.graph = Graph(uri, auth=auth) self.batch_size = 5 # 每批执行的语句数量 self.delay = 0.1 # 批处理间隔(秒) def execute_cypher_file(self, file_path: str) -> dict: """执行Cypher脚本文件""" stats = { 'total': 0, 'success': 0, 'failed': 0, 'duration': 0 } try: cypher_script = Path(file_path).read_text(encoding='utf-8') return self.execute_cypher(cypher_script, stats) except Exception as e: print(f"文件读取失败: {str(e)}") return stats # 在Neo4jExecutor类中添加参数处理 def execute_cypher(self, cypher_script: str, params: dict = None) -> dict: """支持参数化查询""" if params is None: params = {} try: result = self.graph.run(cypher_script, parameters=params) return result.stats() except Exception as e: print(f"执行失败: {str(e)}") return {} def _execute_single(self, tx: Transaction, stmt: str): """执行单个语句""" try: tx.run(stmt) except Exception as e: if 'already exists' in str(e): print(f"忽略已存在的约束: {stmt[:50]}...") else: raise def _retry_single(self, stmt: str) -> bool: """重试单个语句""" try: self.graph.run(stmt) return True except Exception as e: print(f"语句执行失败: {stmt[:60]}... \n错误信息: {str(e)}") return False # 使用示例 if __name__ == "__main__": # 初始化执行器 executor = Neo4jExecutor( uri="neo4j://10.10.21.20:7687", auth=("neo4j", "DsideaL4r5t6y7u") ) # 执行Cypher文件 result = executor.execute_cypher_file("knowledge_graph.cypher") print(f"\n执行结果:") print(f"- 总语句数: {result['total']}") print(f"- 成功: {result['success']}") print(f"- 失败: {result['failed']}") print(f"- 耗时: {result['duration']}秒")