diff --git a/dsLightRag/Util/__pycache__/LightRagUtil.cpython-310.pyc b/dsLightRag/Util/__pycache__/LightRagUtil.cpython-310.pyc index 9bc1e54d..d89e8018 100644 Binary files a/dsLightRag/Util/__pycache__/LightRagUtil.cpython-310.pyc and b/dsLightRag/Util/__pycache__/LightRagUtil.cpython-310.pyc differ diff --git a/dsLightRag/WxGzh/T3_TrainIntoKG.py b/dsLightRag/WxGzh/T3_TrainIntoKG.py index c7de823c..86473413 100644 --- a/dsLightRag/WxGzh/T3_TrainIntoKG.py +++ b/dsLightRag/WxGzh/T3_TrainIntoKG.py @@ -3,7 +3,7 @@ import logging from Util.DocxUtil import get_docx_content_by_pandoc from Util.LightRagUtil import initialize_pg_rag - +from Util.PostgreSQLUtil import init_postgres_pool logger = logging.getLogger('lightrag') logger.setLevel(logging.INFO) @@ -15,54 +15,49 @@ logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO) # 使用PG库后,这个是没有用的,但目前的项目代码要求必传,就写一个吧。 WORKING_DIR = f"./output" -#### 下面两个要注意写清楚内容 #### -# 1、工作空间【知识库名称】 -# 2、文档名称【不允许出现重复,因为后面需要以此为条件查询】 -tasks = [ - # { # 苏轼 - # "workspace": "SuShi", "docx_name": "苏轼.docx", - # }, - # { # 化学 - # "workspace": "Chemistry", "docx_name": "Chemistry.docx", - # }, - # { # 几何 - # "workspace": "JiHe", "docx_name": "JiHe.docx", - # }, - # { # 数学 - # "workspace": "Math", "docx_name": "Math.docx", - # }, - # { # 史记 - # "workspace": "ShiJi", "docx_name": "少年读史记张嘉骅.docx", - # }, - # { # 长春市一批次高中学校介绍 - # "workspace": "ChangChun", "docx_name": "长春市一批次高中学校介绍.docx", - # }, - # { # 2024长春43所高中录取分数线 - # "workspace": "ChangChun", "docx_name": "2024长春43所高中录取分数线.docx", - # }, - { # 长春市2025年中考各批次录取最低控制线 - "workspace": "ChangChun", "docx_name": "长春市2025年中考各批次录取最低控制线.docx", - } -] -for task in tasks: - task["docx_path"] = "./static/Txt/" + task["docx_name"] # 3、文档路径 python是按引用传递的& +async def get_unprocessed_articles(): + """从t_wechat_articles表获取未处理的文章""" + try: + pool = await init_postgres_pool() + async with pool.acquire() as conn: + rows = await conn.fetch(''' + SELECT id, source, title, content + FROM t_wechat_articles + WHERE is_finish = 0 + ''') + return [dict(row) for row in rows] + finally: + await pool.close() async def main(): - for task in tasks: - workspace = task["workspace"] - docx_name = task["docx_name"] - docx_path = task["docx_path"] - logger.info(f"开始处理文档: {docx_name}" + ",共%s个文档,当前是第%s个。", len(tasks), tasks.index(task) + 1) + # 获取未处理的文章 + articles = await get_unprocessed_articles() + logger.info(f"共获取到{len(articles)}篇未处理的文章") + + for article in articles: + workspace = 'ChangChun' + docx_name = f"{article['source']}_{article['title']}" # 组合来源和标题作为文档名 + content = article["content"] # 使用文章内容 + + logger.info(f"开始处理文档: {docx_name}") try: rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace) - # 获取docx文件的内容 - content = get_docx_content_by_pandoc(docx_path) - await rag.ainsert(input=content, file_paths=[docx_name]) # 添加来源参数 + await rag.ainsert(input=content, file_paths=[docx_name]) + + # 标记为已处理 + pool = await init_postgres_pool() + async with pool.acquire() as conn: + await conn.execute(''' + UPDATE t_wechat_articles + SET is_finish = 1 + WHERE id = $1 + ''', article["id"]) finally: if rag: await rag.finalize_storages() - + if pool: + await pool.close() if __name__ == "__main__": asyncio.run(main())