From cba58a63e1c10daa0f0c8b0684d3046aa9d1fef4 Mon Sep 17 00:00:00 2001 From: chengminglong <123204464@qq.com> Date: Mon, 21 Jul 2025 15:45:56 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=99=E8=82=B2=E5=9E=82=E7=9B=B4=E9=A2=86?= =?UTF-8?q?=E5=9F=9F=E5=A4=A7=E6=A8=A1=E5=9E=8B=E5=B9=B3=E5=8F=B0=20Modify?= =?UTF-8?q?=20by=20Kalman.CHENG=20=E2=98=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dsAiTeachingModel/tasks/BackgroundTasks.py | 84 ++++++++++++++-------- 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/dsAiTeachingModel/tasks/BackgroundTasks.py b/dsAiTeachingModel/tasks/BackgroundTasks.py index def88d74..6b929378 100644 --- a/dsAiTeachingModel/tasks/BackgroundTasks.py +++ b/dsAiTeachingModel/tasks/BackgroundTasks.py @@ -14,46 +14,72 @@ logger.addHandler(handler) # 后台任务,监控是否有新的未训练的文档进行训练 async def train_document_task(): - print("线程5秒后开始运行【监控是否有新的未训练的文档进行训练】") + print(datetime.datetime.now(), "线程5秒后开始运行【监控是否有需要处理的文档】") await asyncio.sleep(5) # 使用 asyncio.sleep 而不是 time.sleep # 这里放置你的线程逻辑 while True: # 这里可以放置你的线程要执行的代码 - logging.info("开始查询是否有未训练的文档:") - no_train_document_sql: str = " SELECT * FROM t_ai_teaching_model_document WHERE train_flag = 0 ORDER BY create_time DESC" + logging.info("开始查询是否有待处理文档:") + no_train_document_sql: str = " SELECT * FROM t_ai_teaching_model_document WHERE train_flag in (0,3) ORDER BY create_time DESC" no_train_document_result = await find_by_sql(no_train_document_sql, ()) logger.info(no_train_document_result) if not no_train_document_result: - logging.info("没有未训练的文档") + print(datetime.datetime.now(), "没有需要处理的文档") else: - logging.info("存在未训练的文档" + str(len(no_train_document_result))+"个") - + print(datetime.datetime.now(), "存在未训练的文档" + str(len(no_train_document_result))+"个") + # 这里可以根据train_flag的值来判断是训练还是删除 document = no_train_document_result[0] theme = await find_by_id("t_ai_teaching_model_theme", "id", document["theme_id"]) - # 训练开始前,更新训练状态 - update_sql: str = " UPDATE t_ai_teaching_model_document SET train_flag = 1 WHERE id = " + str(document["id"]) - await execute_sql(update_sql, ()) document_name = document["document_name"] + "." + document["document_suffix"] - WORKING_DIR = "Topic/" + theme["short_name"] + working_dir = "Topic/" + theme["short_name"] document_path = document["document_path"] - logging.info(f"开始处理文档:{document_name}, 还有{len(no_train_document_result) - 1}个文档需要处理!") - # 训练代码开始 - try: - # 注意:默认设置使用NetworkX - rag = await initialize_rag(WORKING_DIR) - # 获取docx文件的内容 - content = get_docx_content_by_pandoc(document_path) - await rag.ainsert(content, ids=[document_name], file_paths=[document_name]) - logger.info(f"Inserted content from {document_name}") - except Exception as e: - logger.error(f"An error occurred: {e}") - finally: - await rag.finalize_storages() - # 训练结束,更新训练状态 - update_document_sql: str = " UPDATE t_ai_teaching_model_document SET train_flag = 2 WHERE id = " + str(document["id"]) - await execute_sql(update_document_sql, ()) - if theme["search_flag"] == 0: + if document["train_flag"] == 0: + # 训练文档 + update_sql: str = " UPDATE t_ai_teaching_model_document SET train_flag = 1 WHERE id = " + str(document["id"]) + await execute_sql(update_sql, ()) + logging.info(f"开始处理文档:{document_name}, 还有{len(no_train_document_result) - 1}个文档需要处理!") + # 训练代码开始 + try: + # 注意:默认设置使用NetworkX + rag = await initialize_rag(working_dir) + # 获取docx文件的内容 + content = get_docx_content_by_pandoc(document_path) + await rag.ainsert(content, ids=[document_name], file_paths=[document_name]) + logger.info(f"Inserted content from {document_name}") + except Exception as e: + logger.error(f"An error occurred: {e}") + finally: + await rag.finalize_storages() + # 训练结束,更新训练状态 + update_document_sql: str = " UPDATE t_ai_teaching_model_document SET train_flag = 2 WHERE id = " + str(document["id"]) + await execute_sql(update_document_sql, ()) + elif document["train_flag"] == 3: + update_sql: str = " UPDATE t_ai_teaching_model_document SET train_flag = 4 WHERE id = " + str(document["id"]) + await execute_sql(update_sql, ()) + logging.info(f"开始删除文档:{document_name}, 还有{len(no_train_document_result) - 1}个文档需要处理!") + # 删除文档开始 + try: + # 注意:默认设置使用NetworkX + rag = await initialize_rag(working_dir) + await rag.adelete_by_doc_id(doc_id = document_name) + logger.info(f"Deleted content from {document_name}") + except Exception as e: + logger.error(f"An error occurred: {e}") + finally: + await rag.finalize_storages() + # 删除结束,更新训练状态 + update_document_sql: str = " UPDATE t_ai_teaching_model_document SET train_flag = 5 WHERE id = " + str(document["id"]) + await execute_sql(update_document_sql, ()) + + # 整体更新主题状态 + select_document_sql: str = " SELECT * FROM t_ai_teaching_model_document WHERE theme_id = " + str(theme["id"]) + " and train_flag = 2" + select_document_result = await find_by_sql(select_document_sql, ()) + if select_document_result is None: + update_theme_sql: str = " UPDATE t_ai_teaching_model_theme SET search_flag = 0 WHERE id = " + str(theme["id"]) + await execute_sql(update_theme_sql, ()) + else: update_theme_sql: str = " UPDATE t_ai_teaching_model_theme SET search_flag = 1 WHERE id = " + str(theme["id"]) await execute_sql(update_theme_sql, ()) - # 添加适当的等待时间,避免频繁查询 - await asyncio.sleep(60) # 每分钟查询一次 + # 添加适当的等待时间,避免频繁查询 + await asyncio.sleep(10) # 开发阶段每10秒一次 + # await asyncio.sleep(600) # 每十分钟查询一次