import asyncio import os from utils.Database import * from utils.DocxUtil import get_docx_content_by_pandoc from utils.LightRagUtil import initialize_rag # 更详细地控制日志输出 logger = logging.getLogger('lightrag') logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(handler) # 后台任务,监控是否有新的未训练的文档进行训练 async def train_document_task(): print("线程5秒后开始运行【监控是否有新的未训练的文档进行训练】") await asyncio.sleep(5) # 使用 asyncio.sleep 而不是 time.sleep # 这里放置你的线程逻辑 while True: # 这里可以放置你的线程要执行的代码 logging.info("开始查询是否有未训练的文档:") no_train_document_sql: str = " SELECT * FROM t_ai_teaching_model_document WHERE train_flag = 0 ORDER BY create_time DESC" no_train_document_result = await find_by_sql(no_train_document_sql, ()) logger.info(no_train_document_result) if not no_train_document_result: logging.info("没有未训练的文档") else: logging.info("存在未训练的文档" + str(len(no_train_document_result))+"个") document = no_train_document_result[0] theme = await find_by_id("t_ai_teaching_model_theme", "id", document["theme_id"]) # 训练开始前,更新训练状态 update_sql: str = " UPDATE t_ai_teaching_model_document SET train_flag = 1 WHERE id = " + str(document["id"]) await execute_sql(update_sql, ()) document_name = document["document_name"] + "." + document["document_suffix"] WORKING_DIR = "Topic/" + theme["short_name"] document_path = document["document_path"] logging.info(f"开始处理文档:{document_name}, 还有{len(no_train_document_result) - 1}个文档需要处理!") # 训练代码开始 try: # 注意:默认设置使用NetworkX rag = await initialize_rag(WORKING_DIR) # 获取docx文件的内容 content = get_docx_content_by_pandoc(document_path) await rag.ainsert(content, ids=[document_name], file_paths=[document_name]) logger.info(f"Inserted content from {document_name}") except Exception as e: logger.error(f"An error occurred: {e}") finally: await rag.finalize_storages() # 训练结束,更新训练状态 update_document_sql: str = " UPDATE t_ai_teaching_model_document SET train_flag = 2 WHERE id = " + str(document["id"]) await execute_sql(update_document_sql, ()) if theme["search_flag"] == 0: update_theme_sql: str = " UPDATE t_ai_teaching_model_theme SET search_flag = 1 WHERE id = " + str(theme["id"]) await execute_sql(update_theme_sql, ()) # 添加适当的等待时间,避免频繁查询 await asyncio.sleep(60) # 每分钟查询一次