You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

86 lines
4.9 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import asyncio
import os
from utils.Database import *
from utils.DocxUtil import get_docx_content_by_pandoc
from utils.LightRagUtil import initialize_rag
# 更详细地控制日志输出
logger = logging.getLogger('lightrag')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
# 后台任务,监控是否有新的未训练的文档进行训练
async def train_document_task():
print(datetime.datetime.now(), "线程5秒后开始运行【监控是否有需要处理的文档】")
await asyncio.sleep(5) # 使用 asyncio.sleep 而不是 time.sleep
# 这里放置你的线程逻辑
while True:
# 这里可以放置你的线程要执行的代码
logging.info("开始查询是否有待处理文档:")
no_train_document_sql: str = " SELECT * FROM t_ai_teaching_model_document WHERE train_flag in (0,3) ORDER BY create_time DESC"
no_train_document_result = await find_by_sql(no_train_document_sql, ())
logger.info(no_train_document_result)
if not no_train_document_result:
print(datetime.datetime.now(), "没有需要处理的文档")
else:
print(datetime.datetime.now(), "存在未训练的文档" + str(len(no_train_document_result))+"")
# 这里可以根据train_flag的值来判断是训练还是删除
document = no_train_document_result[0]
theme = await find_by_id("t_ai_teaching_model_theme", "id", document["theme_id"])
document_name = document["document_name"] + "." + document["document_suffix"]
working_dir = "Topic/" + theme["short_name"]
document_path = document["document_path"]
if document["train_flag"] == 0:
# 训练文档
update_sql: str = " UPDATE t_ai_teaching_model_document SET train_flag = 1 WHERE id = " + str(document["id"])
await execute_sql(update_sql, ())
logging.info(f"开始处理文档:{document_name}, 还有{len(no_train_document_result) - 1}个文档需要处理!")
# 训练代码开始
try:
# 注意默认设置使用NetworkX
rag = await initialize_rag(working_dir)
# 获取docx文件的内容
content = get_docx_content_by_pandoc(document_path)
await rag.ainsert(content, ids=[document_name], file_paths=[document_name])
logger.info(f"Inserted content from {document_name}")
except Exception as e:
logger.error(f"An error occurred: {e}")
finally:
await rag.finalize_storages()
# 训练结束,更新训练状态
update_document_sql: str = " UPDATE t_ai_teaching_model_document SET train_flag = 2 WHERE id = " + str(document["id"])
await execute_sql(update_document_sql, ())
elif document["train_flag"] == 3:
update_sql: str = " UPDATE t_ai_teaching_model_document SET train_flag = 4 WHERE id = " + str(document["id"])
await execute_sql(update_sql, ())
logging.info(f"开始删除文档:{document_name}, 还有{len(no_train_document_result) - 1}个文档需要处理!")
# 删除文档开始
try:
# 注意默认设置使用NetworkX
rag = await initialize_rag(working_dir)
await rag.adelete_by_doc_id(doc_id = document_name)
logger.info(f"Deleted content from {document_name}")
except Exception as e:
logger.error(f"An error occurred: {e}")
finally:
await rag.finalize_storages()
# 删除结束,更新训练状态
update_document_sql: str = " UPDATE t_ai_teaching_model_document SET train_flag = 5 WHERE id = " + str(document["id"])
await execute_sql(update_document_sql, ())
# 整体更新主题状态
select_document_sql: str = " SELECT * FROM t_ai_teaching_model_document WHERE theme_id = " + str(theme["id"]) + " and train_flag = 2"
select_document_result = await find_by_sql(select_document_sql, ())
if select_document_result is None:
update_theme_sql: str = " UPDATE t_ai_teaching_model_theme SET search_flag = 0 WHERE id = " + str(theme["id"])
await execute_sql(update_theme_sql, ())
else:
update_theme_sql: str = " UPDATE t_ai_teaching_model_theme SET search_flag = 1 WHERE id = " + str(theme["id"])
await execute_sql(update_theme_sql, ())
# 添加适当的等待时间,避免频繁查询
await asyncio.sleep(10) # 开发阶段每10秒一次
# await asyncio.sleep(600) # 每十分钟查询一次