import asyncio import logging from Util.DocxUtil import get_docx_content_by_pandoc from Util.LightRagUtil import initialize_pg_rag from Util.PostgreSQLUtil import init_postgres_pool logger = logging.getLogger('lightrag') logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(handler) logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO) # 使用PG库后,这个是没有用的,但目前的项目代码要求必传,就写一个吧。 WORKING_DIR = f"./output" async def get_unprocessed_articles(): """从t_wechat_articles表获取未处理的文章""" try: pool = await init_postgres_pool() async with pool.acquire() as conn: rows = await conn.fetch(''' SELECT id, source, title, content FROM t_wechat_articles WHERE is_finish = 0 ''') return [dict(row) for row in rows] finally: await pool.close() async def main(): # 获取未处理的文章 articles = await get_unprocessed_articles() logger.info(f"共获取到{len(articles)}篇未处理的文章") for article in articles: workspace = 'ChangChun' docx_name = f"{article['source']}_{article['title']}" # 组合来源和标题作为文档名 content = article["content"] # 使用文章内容 logger.info(f"开始处理文档: {docx_name}") try: rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace) await rag.ainsert(input=content, file_paths=[docx_name]) # 标记为已处理 pool = await init_postgres_pool() async with pool.acquire() as conn: await conn.execute(''' UPDATE t_wechat_articles SET is_finish = 1 WHERE id = $1 ''', article["id"]) finally: if rag: await rag.finalize_storages() if pool: await pool.close() if __name__ == "__main__": asyncio.run(main())