main
HuangHai 1 week ago
parent 73033e0333
commit f639b4c9fc

@ -3,7 +3,7 @@ import logging
from Util.DocxUtil import get_docx_content_by_pandoc from Util.DocxUtil import get_docx_content_by_pandoc
from Util.LightRagUtil import initialize_pg_rag from Util.LightRagUtil import initialize_pg_rag
from Util.PostgreSQLUtil import init_postgres_pool
logger = logging.getLogger('lightrag') logger = logging.getLogger('lightrag')
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
@ -15,54 +15,49 @@ logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
# 使用PG库后这个是没有用的,但目前的项目代码要求必传,就写一个吧。 # 使用PG库后这个是没有用的,但目前的项目代码要求必传,就写一个吧。
WORKING_DIR = f"./output" WORKING_DIR = f"./output"
#### 下面两个要注意写清楚内容 ####
# 1、工作空间【知识库名称】
# 2、文档名称【不允许出现重复因为后面需要以此为条件查询】
tasks = [
# { # 苏轼
# "workspace": "SuShi", "docx_name": "苏轼.docx",
# },
# { # 化学
# "workspace": "Chemistry", "docx_name": "Chemistry.docx",
# },
# { # 几何
# "workspace": "JiHe", "docx_name": "JiHe.docx",
# },
# { # 数学
# "workspace": "Math", "docx_name": "Math.docx",
# },
# { # 史记
# "workspace": "ShiJi", "docx_name": "少年读史记张嘉骅.docx",
# },
# { # 长春市一批次高中学校介绍
# "workspace": "ChangChun", "docx_name": "长春市一批次高中学校介绍.docx",
# },
# { # 2024长春43所高中录取分数线
# "workspace": "ChangChun", "docx_name": "2024长春43所高中录取分数线.docx",
# },
{ # 长春市2025年中考各批次录取最低控制线
"workspace": "ChangChun", "docx_name": "长春市2025年中考各批次录取最低控制线.docx",
}
]
for task in tasks:
task["docx_path"] = "./static/Txt/" + task["docx_name"] # 3、文档路径 python是按引用传递的&
async def get_unprocessed_articles():
"""从t_wechat_articles表获取未处理的文章"""
try:
pool = await init_postgres_pool()
async with pool.acquire() as conn:
rows = await conn.fetch('''
SELECT id, source, title, content
FROM t_wechat_articles
WHERE is_finish = 0
''')
return [dict(row) for row in rows]
finally:
await pool.close()
async def main(): async def main():
for task in tasks: # 获取未处理的文章
workspace = task["workspace"] articles = await get_unprocessed_articles()
docx_name = task["docx_name"] logger.info(f"共获取到{len(articles)}篇未处理的文章")
docx_path = task["docx_path"]
logger.info(f"开始处理文档: {docx_name}" + ",共%s个文档,当前是第%s个。", len(tasks), tasks.index(task) + 1) for article in articles:
workspace = 'ChangChun'
docx_name = f"{article['source']}_{article['title']}" # 组合来源和标题作为文档名
content = article["content"] # 使用文章内容
logger.info(f"开始处理文档: {docx_name}")
try: try:
rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace) rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace)
# 获取docx文件的内容 await rag.ainsert(input=content, file_paths=[docx_name])
content = get_docx_content_by_pandoc(docx_path)
await rag.ainsert(input=content, file_paths=[docx_name]) # 添加来源参数 # 标记为已处理
pool = await init_postgres_pool()
async with pool.acquire() as conn:
await conn.execute('''
UPDATE t_wechat_articles
SET is_finish = 1
WHERE id = $1
''', article["id"])
finally: finally:
if rag: if rag:
await rag.finalize_storages() await rag.finalize_storages()
if pool:
await pool.close()
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())

Loading…
Cancel
Save