main
HuangHai 2 weeks ago
parent 8d69e2c707
commit 2b433cdf07

@ -7,7 +7,7 @@ from lightrag.kg.shared_storage import initialize_pipeline_status
from lightrag.utils import EmbeddingFunc
from Config.Config import EMBED_DIM, EMBED_MAX_TOKEN_SIZE, LLM_MODEL_NAME
from Util.LightRagUtil import embedding_func, llm_model_func
from Util.LightRagUtil import embedding_func, llm_model_func, initialize_pg_rag
# 在程序开始时添加以下配置
logging.basicConfig(
@ -23,44 +23,11 @@ handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s -
logger.addHandler(handler)
WORKING_DIR = f"./dickens-pg"
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
# AGE
os.environ["AGE_GRAPH_NAME"] = "dickens"
os.environ["POSTGRES_HOST"] = "10.10.14.208"
os.environ["POSTGRES_PORT"] = "5432"
os.environ["POSTGRES_USER"] = "postgres"
os.environ["POSTGRES_PASSWORD"] = "postgres"
os.environ["POSTGRES_DATABASE"] = "rag"
async def initialize_pg_rag():
rag = LightRAG(
working_dir=WORKING_DIR,
llm_model_func=llm_model_func,
llm_model_name=LLM_MODEL_NAME,
llm_model_max_async=4,
llm_model_max_token_size=32768,
enable_llm_cache_for_entity_extract=True,
embedding_func=EmbeddingFunc(
embedding_dim=EMBED_DIM,
max_token_size=EMBED_MAX_TOKEN_SIZE,
func=embedding_func
),
kv_storage="PGKVStorage",
doc_status_storage="PGDocStatusStorage",
graph_storage="PGGraphStorage",
vector_storage="PGVectorStorage",
auto_manage_storages_states=False,
)
await rag.initialize_storages()
await initialize_pipeline_status()
return rag
async def main():
try:
rag = await initialize_pg_rag()
rag = await initialize_pg_rag(WORKING_DIR)
with open(f"../Txt/sushi.txt", "r", encoding="utf-8") as f:
await rag.ainsert(f.read())
finally:

@ -8,7 +8,7 @@ from lightrag.kg.shared_storage import initialize_pipeline_status
from lightrag.utils import EmbeddingFunc
from Config.Config import LLM_MODEL_NAME, EMBED_DIM, EMBED_MAX_TOKEN_SIZE
from Util.LightRagUtil import configure_logging, print_stream, llm_model_func, embedding_func
from Util.LightRagUtil import configure_logging, print_stream, llm_model_func, embedding_func, initialize_pg_rag
# 在程序开始时添加以下配置
logging.basicConfig(
@ -23,56 +23,18 @@ handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
ROOT_DIR = '.'
WORKING_DIR = f"{ROOT_DIR}/dickens-pg"
WORKING_DIR = f"./dickens-pg"
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
if not os.path.exists(WORKING_DIR):
os.mkdir(WORKING_DIR)
# AGE
os.environ["AGE_GRAPH_NAME"] = "dickens"
os.environ["POSTGRES_HOST"] = "10.10.14.208"
os.environ["POSTGRES_PORT"] = "5432"
os.environ["POSTGRES_USER"] = "postgres"
os.environ["POSTGRES_PASSWORD"] = "postgres"
os.environ["POSTGRES_DATABASE"] = "rag"
async def initialize_pg_rag():
rag = LightRAG(
working_dir=WORKING_DIR,
llm_model_func=llm_model_func,
llm_model_name=LLM_MODEL_NAME,
llm_model_max_async=4,
llm_model_max_token_size=32768,
enable_llm_cache_for_entity_extract=True,
embedding_func=EmbeddingFunc(
embedding_dim=EMBED_DIM,
max_token_size=EMBED_MAX_TOKEN_SIZE,
func=embedding_func
),
kv_storage="PGKVStorage",
doc_status_storage="PGDocStatusStorage",
graph_storage="PGGraphStorage",
vector_storage="PGVectorStorage",
auto_manage_storages_states=False,
)
await rag.initialize_storages()
await initialize_pipeline_status()
return rag
async def main():
try:
rag = await initialize_pg_rag()
rag = await initialize_pg_rag(WORKING_DIR)
resp = await rag.aquery(
#"苏轼的家人都有谁?",
"大灰怎么了?",
# "苏轼的家人都有谁?",
"苏轼与美食",
param=QueryParam(mode="hybrid", stream=True),
# hybrid naive
)
if inspect.isasyncgen(resp):
await print_stream(resp)

@ -143,3 +143,39 @@ def create_embedding_func():
base_url=EMBED_BASE_URL,
),
)
# AGE
os.environ["AGE_GRAPH_NAME"] = "dickens"
os.environ["POSTGRES_HOST"] = "10.10.14.208"
os.environ["POSTGRES_PORT"] = "5432"
os.environ["POSTGRES_USER"] = "postgres"
os.environ["POSTGRES_PASSWORD"] = "postgres"
os.environ["POSTGRES_DATABASE"] = "rag"
async def initialize_pg_rag(WORKING_DIR):
rag = LightRAG(
working_dir=WORKING_DIR,
llm_model_func=llm_model_func,
llm_model_name=LLM_MODEL_NAME,
llm_model_max_async=4,
llm_model_max_token_size=32768,
enable_llm_cache_for_entity_extract=True,
embedding_func=EmbeddingFunc(
embedding_dim=EMBED_DIM,
max_token_size=EMBED_MAX_TOKEN_SIZE,
func=embedding_func
),
kv_storage="PGKVStorage",
doc_status_storage="PGDocStatusStorage",
graph_storage="PGGraphStorage",
vector_storage="PGVectorStorage",
auto_manage_storages_states=False,
)
await rag.initialize_storages()
await initialize_pipeline_status()
return rag

@ -1,109 +0,0 @@
import re
from typing import List, Tuple
from py2neo import Graph, Node, Relationship, Subgraph
from Config import Config
def clear(db):
# 清空数据
db.run("MATCH (n) DETACH DELETE n")
# 分步删除约束和索引
try:
# 删除约束
constraints = db.run("SHOW CONSTRAINTS YIELD name").data()
for constr in constraints:
db.run(f"DROP CONSTRAINT `{constr['name']}`")
# 删除索引
indexes = db.run("SHOW INDEXES YIELD name, type WHERE type <> 'LOOKUP'").data()
for idx in indexes:
db.run(f"DROP INDEX `{idx['name']}`")
except Exception as e:
print(f"删除操作失败: {e}")
def create_subgraph(db: Graph, nodes: List[Node], relations: List[Tuple[Node, str, Node]]) -> None:
"""统一创建子图"""
subgraph = Subgraph(
nodes=nodes,
relationships=[Relationship(start, rel_type, end) for start, rel_type, end in relations]
)
db.create(subgraph)
def tx_create(db: Graph, nodes: List[Node], relations: List[Tuple[Node, str, Node]]) -> None:
"""事务方式创建数据"""
try:
tx = db.begin()
subgraph = Subgraph(
nodes=nodes,
relationships=[Relationship(start, rel_type, end) for start, rel_type, end in relations]
)
tx.create(subgraph)
db.commit(tx)
except Exception as e:
db.rollback(tx)
print(f"事务操作失败: {str(e)}")
raise
class Neo4jExecutor:
# 添加类变量存储连接配置
NEO4J_URI = Config.NEO4J_URI
NEO4J_AUTH = Config.NEO4J_AUTH
def __init__(self, uri=None, auth=None):
# 使用默认配置或传入参数
self.graph = Graph(uri or self.NEO4J_URI,
auth=auth or self.NEO4J_AUTH)
@classmethod
def create_default(cls):
"""使用默认配置创建执行器"""
return cls(cls.NEO4J_URI, cls.NEO4J_AUTH)
# 新增文本执行方法
def execute_cypher_text(self, cypher_text: str) -> dict:
"""直接执行Cypher文本"""
stats = {'total': 0, 'success': 0, 'failed': 0}
try:
statements = re.split(r';\s*\n', cypher_text)
statements = [s.strip() for s in statements if s.strip()]
stats['total'] = len(statements)
for stmt in statements:
try:
self.graph.run(stmt)
stats['success'] += 1
except Exception as e:
stats['failed'] += 1
print(f"执行失败: {stmt[:50]}... \n错误: {str(e)[:100]}")
return stats
except Exception as e:
print(f"执行失败: {stmt[:100]}... \n完整错误: {str(e)}") # 原为str(e)[:100]
return stats
def execute_cypher_file(self, file_path: str) -> dict: # 确保方法名称正确
"""执行Cypher文件"""
stats = {'total': 0, 'success': 0, 'failed': 0}
try:
with open(file_path, 'r', encoding='utf-8') as f:
cypher_script = f.read()
statements = re.split(r';\s*\n', cypher_script)
statements = [s.strip() for s in statements if s.strip()]
stats['total'] = len(statements)
for stmt in statements:
try:
self.graph.run(stmt)
stats['success'] += 1
except Exception as e:
stats['failed'] += 1
print(f"执行失败: {stmt[:50]}... \n错误: {str(e)[:100]}")
return stats
except Exception as e:
print(f"文件错误: {str(e)}")
return stats
Loading…
Cancel
Save