From d2a79f0c939317d335aa977e53c2f3010e290b08 Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Wed, 16 Jul 2025 09:18:27 +0800 Subject: [PATCH] 'commit' --- dsLightRag/WxGzh/T2_CollectArticle.py | 20 +++++++-- dsLightRag/WxGzh/T3_TrainIntoKG.py | 63 --------------------------- 2 files changed, 17 insertions(+), 66 deletions(-) delete mode 100644 dsLightRag/WxGzh/T3_TrainIntoKG.py diff --git a/dsLightRag/WxGzh/T2_CollectArticle.py b/dsLightRag/WxGzh/T2_CollectArticle.py index 26ae2377..0338bae8 100644 --- a/dsLightRag/WxGzh/T2_CollectArticle.py +++ b/dsLightRag/WxGzh/T2_CollectArticle.py @@ -27,6 +27,8 @@ import time from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service as ChromeService import requests + +from Util.LightRagUtil import initialize_pg_rag from Util.PostgreSQLUtil import init_postgres_pool from Util.WxGzhUtil import init_wechat_browser, get_article_content @@ -72,9 +74,21 @@ async def save_article_to_db(pool, article_title, account_name, article_url, pub if await is_article_exist(pool, article_url): logger.info(f"文章已存在,跳过保存: {article_url}") return - # 准备在这里调用 lightrag进行知识库构建 - # TODO - + # 在这里调用 lightrag进行知识库构建 + workspace = 'ChangChun' + # 使用PG库后,这个是没有用的,但目前的项目代码要求必传,就写一个吧。 + WORKING_DIR = f"./output" + docx_name = f"{account_name}_{article_title}" # 组合来源和标题作为文档名 + logger.info(f"开始处理文档: {docx_name}") + try: + rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace) + await rag.ainsert(input=content, file_paths=[docx_name]) + finally: + if rag: + await rag.finalize_storages() + if pool: + await pool.close() + logger.info(f"保存文档到知识库成功: {docx_name}") try: async with pool.acquire() as conn: await conn.execute(''' diff --git a/dsLightRag/WxGzh/T3_TrainIntoKG.py b/dsLightRag/WxGzh/T3_TrainIntoKG.py deleted file mode 100644 index 86473413..00000000 --- a/dsLightRag/WxGzh/T3_TrainIntoKG.py +++ /dev/null @@ -1,63 +0,0 @@ -import asyncio -import logging - -from Util.DocxUtil import get_docx_content_by_pandoc -from Util.LightRagUtil import initialize_pg_rag -from Util.PostgreSQLUtil import init_postgres_pool - -logger = logging.getLogger('lightrag') -logger.setLevel(logging.INFO) -handler = logging.StreamHandler() -handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) -logger.addHandler(handler) -logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO) - -# 使用PG库后,这个是没有用的,但目前的项目代码要求必传,就写一个吧。 -WORKING_DIR = f"./output" - - -async def get_unprocessed_articles(): - """从t_wechat_articles表获取未处理的文章""" - try: - pool = await init_postgres_pool() - async with pool.acquire() as conn: - rows = await conn.fetch(''' - SELECT id, source, title, content - FROM t_wechat_articles - WHERE is_finish = 0 - ''') - return [dict(row) for row in rows] - finally: - await pool.close() - -async def main(): - # 获取未处理的文章 - articles = await get_unprocessed_articles() - logger.info(f"共获取到{len(articles)}篇未处理的文章") - - for article in articles: - workspace = 'ChangChun' - docx_name = f"{article['source']}_{article['title']}" # 组合来源和标题作为文档名 - content = article["content"] # 使用文章内容 - - logger.info(f"开始处理文档: {docx_name}") - try: - rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace) - await rag.ainsert(input=content, file_paths=[docx_name]) - - # 标记为已处理 - pool = await init_postgres_pool() - async with pool.acquire() as conn: - await conn.execute(''' - UPDATE t_wechat_articles - SET is_finish = 1 - WHERE id = $1 - ''', article["id"]) - finally: - if rag: - await rag.finalize_storages() - if pool: - await pool.close() - -if __name__ == "__main__": - asyncio.run(main())