diff --git a/dsAiTeachingModel/config/Config.py b/dsAiTeachingModel/Config/Config.py similarity index 100% rename from dsAiTeachingModel/config/Config.py rename to dsAiTeachingModel/Config/Config.py diff --git a/dsAiTeachingModel/config/__init__.py b/dsAiTeachingModel/Config/__init__.py similarity index 100% rename from dsAiTeachingModel/config/__init__.py rename to dsAiTeachingModel/Config/__init__.py diff --git a/dsAiTeachingModel/api/controller/QuestionController.py b/dsAiTeachingModel/api/controller/TeachingModelController.py similarity index 90% rename from dsAiTeachingModel/api/controller/QuestionController.py rename to dsAiTeachingModel/api/controller/TeachingModelController.py index 48b7ed39..881472e5 100644 --- a/dsAiTeachingModel/api/controller/QuestionController.py +++ b/dsAiTeachingModel/api/controller/TeachingModelController.py @@ -1,4 +1,4 @@ -# routes/QuestionController.py +# routes/TeachingModelController.py from fastapi import APIRouter, Request, Response, Depends from auth.dependencies import * diff --git a/dsAiTeachingModel/main.py b/dsAiTeachingModel/main.py index 8f99a901..72613efa 100644 --- a/dsAiTeachingModel/main.py +++ b/dsAiTeachingModel/main.py @@ -23,7 +23,7 @@ async def lifespan(app: FastAPI): await init_database() # 启动异步任务 - asyncio.create_task(train_document_task()) + # asyncio.create_task(train_document_task()) yield await shutdown_database() @@ -52,7 +52,7 @@ app.include_router(theme_router, prefix="/api/theme", tags=["theme"]) # 文档相关 app.include_router(document_router, prefix="/api/document", tags=["document"]) # 问题相关(大模型应用) -app.include_router(question_router, prefix="/api/question", tags=["question"]) +app.include_router(teaching_model_router, prefix="/api/teaching/model", tags=["question"]) # 字典相关(Dm) app.include_router(dm_router, prefix="/api/dm", tags=["dm"]) # 测试相关 diff --git a/dsAiTeachingModel/routes/__init__.py b/dsAiTeachingModel/routes/__init__.py index 4fa720b9..f985e11c 100644 --- a/dsAiTeachingModel/routes/__init__.py +++ b/dsAiTeachingModel/routes/__init__.py @@ -2,10 +2,10 @@ from api.controller.LoginController import router as login_router from api.controller.DocumentController import router as document_router from api.controller.ThemeController import router as theme_router -from api.controller.QuestionController import router as question_router +from api.controller.TeachingModelController import router as teaching_model_router from api.controller.TestController import router as test_router from api.controller.DmController import router as dm_router from api.controller.UserController import router as user_router # 导出所有路由 -__all__ = ["login_router", "document_router", "theme_router", "question_router", "dm_router", "test_router", "user_router"] +__all__ = ["login_router", "document_router", "theme_router", "teaching_model_router", "dm_router", "test_router", "user_router"] diff --git a/dsAiTeachingModel/tasks/BackgroundTasks.py b/dsAiTeachingModel/tasks/BackgroundTasks.py index d43dc190..b5eb4a24 100644 --- a/dsAiTeachingModel/tasks/BackgroundTasks.py +++ b/dsAiTeachingModel/tasks/BackgroundTasks.py @@ -12,11 +12,13 @@ WORKING_DIR = f"./output" # 后台任务,监控是否有新的未训练的文档进行训练 async def train_document_task(): print("线程5秒后开始运行【监控是否有新的未训练的文档进行训练】") + num = 1 await asyncio.sleep(5) # 使用 asyncio.sleep 而不是 time.sleep # 这里放置你的线程逻辑 while True: # 这里可以放置你的线程要执行的代码 - logging.info("开始查询是否有未训练的文档") + logging.info("开始查询是否有未训练的文档:" + str(num)) + num = num + 1 no_train_document_sql: str = " SELECT * FROM t_ai_teaching_model_document WHERE is_deleted = 0 and train_flag = 0 ORDER BY create_time DESC" no_train_document_result = await find_by_sql(no_train_document_sql, ()) if not no_train_document_result: @@ -49,4 +51,4 @@ async def train_document_task(): # execute_sql(update_sql) # 添加适当的等待时间,避免频繁查询 - await asyncio.sleep(60) # 每分钟查询一次 + await asyncio.sleep(60) # 每分钟查询一次 diff --git a/dsAiTeachingModel/utils/Database.py b/dsAiTeachingModel/utils/Database.py index 4ac15243..4010390d 100644 --- a/dsAiTeachingModel/utils/Database.py +++ b/dsAiTeachingModel/utils/Database.py @@ -17,7 +17,7 @@ async def create_pool(): password=POSTGRES_PASSWORD, database=POSTGRES_DATABASE, min_size=1, # 设置连接池最小连接数 - max_size=100 # 设置连接池最大连接数 + max_size=10 # 设置连接池最大连接数 ) async def get_connection(): diff --git a/dsLightRag/.idea/dsLightRag.iml b/dsLightRag/.idea/dsLightRag.iml index d8ecea1d..9c386072 100644 --- a/dsLightRag/.idea/dsLightRag.iml +++ b/dsLightRag/.idea/dsLightRag.iml @@ -2,7 +2,7 @@ - + diff --git a/dsLightRag/.idea/misc.xml b/dsLightRag/.idea/misc.xml index 0bad5868..0f9b3bc1 100644 --- a/dsLightRag/.idea/misc.xml +++ b/dsLightRag/.idea/misc.xml @@ -3,5 +3,5 @@ - + \ No newline at end of file diff --git a/dsLightRag/Config/__pycache__/Config.cpython-310.pyc b/dsLightRag/Config/__pycache__/Config.cpython-310.pyc index c551f7f6..73bb1c97 100644 Binary files a/dsLightRag/Config/__pycache__/Config.cpython-310.pyc and b/dsLightRag/Config/__pycache__/Config.cpython-310.pyc differ diff --git a/dsLightRag/Test/Test/Logs/article_bfc50bb7d7.html b/dsLightRag/Test/Test/Logs/article_bfc50bb7d7.html deleted file mode 100644 index cd460649..00000000 --- a/dsLightRag/Test/Test/Logs/article_bfc50bb7d7.html +++ /dev/null @@ -1,162 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - -
- -
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/dsLightRag/Util/__pycache__/LightRagUtil.cpython-310.pyc b/dsLightRag/Util/__pycache__/LightRagUtil.cpython-310.pyc index 9bc1e54d..d89e8018 100644 Binary files a/dsLightRag/Util/__pycache__/LightRagUtil.cpython-310.pyc and b/dsLightRag/Util/__pycache__/LightRagUtil.cpython-310.pyc differ diff --git a/dsLightRag/Util/__pycache__/PostgreSQLUtil.cpython-310.pyc b/dsLightRag/Util/__pycache__/PostgreSQLUtil.cpython-310.pyc index d2b6d3e8..4e49862d 100644 Binary files a/dsLightRag/Util/__pycache__/PostgreSQLUtil.cpython-310.pyc and b/dsLightRag/Util/__pycache__/PostgreSQLUtil.cpython-310.pyc differ diff --git a/dsLightRag/WxGzh/T2_CollectArticle.py b/dsLightRag/WxGzh/T2_CollectArticle.py index a6e073bb..051e82dc 100644 --- a/dsLightRag/WxGzh/T2_CollectArticle.py +++ b/dsLightRag/WxGzh/T2_CollectArticle.py @@ -4,21 +4,36 @@ # 微信爬爬猫---公众号文章抓取代码分析 # https://blog.csdn.net/yajuanpi4899/article/details/121584268 +""" +# 查看selenium版本 +pip show selenium +4.34.2 + +# 查看Chrome浏览器版本 +chrome://version/ +138.0.7204.101 (正式版本) (64 位) +# 下载驱动包 +https://googlechromelabs.github.io/chrome-for-testing/ +https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.94/win64/chromedriver-win64.zip +""" import asyncio import datetime import json import logging import random import re - +import time +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service as ChromeService import requests +from Util.LightRagUtil import initialize_pg_rag from Util.PostgreSQLUtil import init_postgres_pool from Util.WxGzhUtil import init_wechat_browser, get_article_content # 删除重复的日志配置,只保留以下内容 -logger = logging.getLogger('WeiXinGongZhongHao') +logger = logging.getLogger('WxGzh') logger.setLevel(logging.INFO) # 确保只添加一个handler @@ -27,6 +42,11 @@ if not logger.handlers: handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(handler) +# 添加微信请求头 +header = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', + 'Referer': 'https://mp.weixin.qq.com/' +} async def get_wechat_sources(): """从t_wechat_source表获取微信公众号列表""" @@ -39,24 +59,6 @@ async def get_wechat_sources(): await pool.close() -""" -# 查看selenium版本 -pip show selenium -4.34.2 - -# 查看Chrome浏览器版本 -chrome://version/ -138.0.7204.101 (正式版本) (64 位) - -# 下载驱动包 -https://googlechromelabs.github.io/chrome-for-testing/ -https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.94/win64/chromedriver-win64.zip -""" -import time -from selenium.webdriver.chrome.options import Options -from selenium.webdriver.chrome.service import Service as ChromeService - - async def is_article_exist(pool, article_url): """检查文章URL是否已存在数据库中""" try: @@ -72,12 +74,23 @@ async def is_article_exist(pool, article_url): return False # 出错时默认返回False,避免影响正常流程 -async def save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, id): - # 先检查文章是否已存在 - if await is_article_exist(pool, article_url): - logger.info(f"文章已存在,跳过保存: {article_url}") - return +async def save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, source_id): + # 在这里调用 lightrag进行知识库构建 + workspace = 'ChangChun' + # 使用PG库后,这个是没有用的,但目前的项目代码要求必传,就写一个吧。 + WORKING_DIR = f"./output" + docx_name = f"{account_name}_{article_title}" # 组合来源和标题作为文档名 + logger.info(f"开始处理文档: {docx_name}") + try: + rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace) + await rag.ainsert(input=content, file_paths=[docx_name]) + finally: + if rag: + await rag.finalize_storages() + if pool: + await pool.close() + logger.info(f"保存文档到知识库成功: {docx_name}") try: async with pool.acquire() as conn: await conn.execute(''' @@ -85,139 +98,155 @@ async def save_article_to_db(pool, article_title, account_name, article_url, pub (title, source, url, publish_time, content, source_id) VALUES ($1, $2, $3, $4, $5, $6) ''', article_title, account_name, article_url, - publish_time, content, id) + publish_time, content, source_id) except Exception as e: logging.error(f"保存文章失败: {e}") -if __name__ == '__main__': - # 从文件cookies.txt中获取 +async def initialize_wechat_session(): + """初始化微信会话,获取cookies和token""" with open('cookies.txt', 'r', encoding='utf-8') as f: content = f.read() - # 使用json还原为json对象 cookies = json.loads(content) - # "expiry": 1787106233 - # 检查是否有过期时间 + global driver # 添加这行 expiry = cookies["expiry"] if expiry: - # 换算出过期时间 - expiry_time = time.localtime(expiry) - expiry_date = time.strftime("%Y-%m-%d %H:%M:%S", expiry_time) - - # 获取当前时间戳 current_timestamp = time.time() - # 检查是否已过期 if current_timestamp > expiry: logger.error("Cookie已过期") exit() - # 移除expiry属性 + del cookies["expiry"] - logger.info(f"cookies的过期时间一般是4天,cookies过期时间:%s" % expiry_date) - options = Options() - options.add_argument('-headless') # 无头参数,调试时可以注释掉 - # 设置headers - 使用微信内置浏览器的User-Agent - header = { - "HOST": "mp.weixin.qq.com", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36 QBCore/4.0.1301.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2875.116 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63010200)", - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", - "Accept-Encoding": "gzip, deflate, br", - "Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.5;q=0.4", - "Connection": "keep-alive" - } - service = ChromeService(executable_path=r"C:\Windows\System32\chromedriver.exe") - # 使用统一的初始化方式 + options = Options() + options.add_argument('-headless') driver = init_wechat_browser() - # 方法3:使用requests库发送请求获取重定向URL url = 'https://mp.weixin.qq.com' response = requests.get(url=url, allow_redirects=False, cookies=cookies) + if 'Location' in response.headers: redirect_url = response.headers.get("Location") - logger.info(f"重定向URL:%s"%redirect_url) token_match = re.findall(r'token=(\d+)', redirect_url) if token_match: token = token_match[0] - logger.info(f"获取到的token:%s"%token) + return cookies, token + + return None, None + + +async def get_wechat_account_list(cookies, token, account_name): + """获取指定公众号的fakeid""" + search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?' + query_id = { + 'action': 'search_biz', + 'token': token, + 'lang': 'zh_CN', + 'f': 'json', + 'ajax': '1', + 'random': random.random(), + 'query': account_name, + 'begin': '0', + 'count': '5' + } + + search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id) + lists = search_response.json().get('list')[0] + return lists.get('fakeid') + + +async def get_article_list(cookies, token, fakeid): + """获取公众号文章列表""" + appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?' + query_id_data = { + 'token': token, + 'lang': 'zh_CN', + 'f': 'json', + 'ajax': '1', + 'random': random.random(), + 'action': 'list_ex', + 'begin': '0', + 'count': '5', + 'query': '', + 'fakeid': fakeid, + 'type': '9' + } + + query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data) + return query_fakeid_response.json().get('app_msg_list') - article_urls = [] - # 获取公众号列表 +async def process_single_article(article_info, account_info, cookies, token): + """处理单篇文章""" + article_url = article_info.get('link') + article_title = article_info.get('title') + publish_time = datetime.datetime.fromtimestamp(int(article_info.get("update_time"))) + + if '试卷' in article_title: + return False + + try: + pool = await init_postgres_pool() + # 先检查文章是否已存在 + if await is_article_exist(pool, article_url): + logger.info(f'文章已存在,跳过保存: {account_info["account_name"]}-{article_title}') + return False + content = get_article_content(article_url) + await save_article_to_db(pool, article_title, account_info["account_name"], + article_url, publish_time, content, account_info["id"]) + return True + except Exception as e: + logger.error(f"处理文章时出错: {e}") + return False + finally: + if 'pool' in locals(): + await pool.close() + + +async def process_wechat_account(account_info, cookies, token): + """处理单个公众号的所有文章""" + cnt = 0 + fakeid = await get_wechat_account_list(cookies, token, account_info["account_name"]) + articles = await get_article_list(cookies, token, fakeid) + + for article in articles: + success = await process_single_article(article, account_info, cookies, token) + if success: + cnt += 1 + time.sleep(1) + + logger.info(f"成功获取公众号: {account_info['account_name']} {cnt}篇文章。") + return cnt + + +async def main(): + """主函数""" + while True: + try: + logger.info("开始执行微信公众号文章采集任务") + cookies, token = await initialize_wechat_session() + if not cookies or not token: + logger.error("初始化微信会话失败") + continue + + account_list = await get_wechat_sources() + for account in account_list: + await process_wechat_account(account, cookies, token) + + logger.info("本次采集任务完成,等待30分钟后再次执行") + await asyncio.sleep(30 * 60) # 30分钟 + except Exception as e: + logger.error(f"主循环发生错误: {e}") + await asyncio.sleep(30 * 60) # 出错后也等待30分钟 + finally: + if 'driver' in globals(): + driver.quit() + + +if __name__ == '__main__': loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: - gzlist = loop.run_until_complete(get_wechat_sources()) + loop.run_until_complete(main()) finally: loop.close() - - # 爬取文章 - for item in gzlist: - account_name = item["account_name"] - account_id = item["account_id"] - id = item["id"] - # 搜索微信公众号的接口地址 - search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?' - # 搜索微信公众号接口需要传入的参数,有三个变量:微信公众号token、随机数random、搜索的微信公众号名字 - query_id = { - 'action': 'search_biz', - 'token': token, - 'lang': 'zh_CN', - 'f': 'json', - 'ajax': '1', - 'random': random.random(), - 'query': account_name, - 'begin': '0', - 'count': '5' - } - # 打开搜索微信公众号接口地址,需要传入相关参数信息如:cookies、params、headers - search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id) - # 取搜索结果中的第一个公众号 - lists = search_response.json().get('list')[0] - # 获取这个公众号的fakeid,后面爬取公众号文章需要此字段 - fakeid = lists.get('fakeid') - logging.info("fakeid:" + fakeid) - # 微信公众号文章接口地址 - appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?' - # 搜索文章需要传入几个参数:登录的公众号token、要爬取文章的公众号fakeid、随机数random - query_id_data = { - 'token': token, - 'lang': 'zh_CN', - 'f': 'json', - 'ajax': '1', - 'random': random.random(), - 'action': 'list_ex', - 'begin': '0', # 不同页,此参数变化,变化规则为每页加5 - 'count': '5', - 'query': '', - 'fakeid': fakeid, - 'type': '9' - } - # 打开搜索的微信公众号文章列表页 - query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data) - fakeid_list = query_fakeid_response.json().get('app_msg_list') - - for item in fakeid_list: - article_url = item.get('link') - article_title = item.get('title') - publish_time = datetime.datetime.fromtimestamp(int(item.get("update_time"))) - - if '试卷' in article_title: # 过滤掉试卷 - continue - - logger.info(f"正在处理文章: {article_title} ({publish_time})") - content = get_article_content(article_url) - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - pool = loop.run_until_complete(init_postgres_pool()) - loop.run_until_complete( - save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, - id)) - finally: - loop.run_until_complete(pool.close()) - loop.close() - - time.sleep(1) - # 关闭浏览器 - driver.quit() diff --git a/dsLightRag/WxGzh/T3_TrainIntoKG.py b/dsLightRag/WxGzh/T3_TrainIntoKG.py deleted file mode 100644 index 86473413..00000000 --- a/dsLightRag/WxGzh/T3_TrainIntoKG.py +++ /dev/null @@ -1,63 +0,0 @@ -import asyncio -import logging - -from Util.DocxUtil import get_docx_content_by_pandoc -from Util.LightRagUtil import initialize_pg_rag -from Util.PostgreSQLUtil import init_postgres_pool - -logger = logging.getLogger('lightrag') -logger.setLevel(logging.INFO) -handler = logging.StreamHandler() -handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) -logger.addHandler(handler) -logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO) - -# 使用PG库后,这个是没有用的,但目前的项目代码要求必传,就写一个吧。 -WORKING_DIR = f"./output" - - -async def get_unprocessed_articles(): - """从t_wechat_articles表获取未处理的文章""" - try: - pool = await init_postgres_pool() - async with pool.acquire() as conn: - rows = await conn.fetch(''' - SELECT id, source, title, content - FROM t_wechat_articles - WHERE is_finish = 0 - ''') - return [dict(row) for row in rows] - finally: - await pool.close() - -async def main(): - # 获取未处理的文章 - articles = await get_unprocessed_articles() - logger.info(f"共获取到{len(articles)}篇未处理的文章") - - for article in articles: - workspace = 'ChangChun' - docx_name = f"{article['source']}_{article['title']}" # 组合来源和标题作为文档名 - content = article["content"] # 使用文章内容 - - logger.info(f"开始处理文档: {docx_name}") - try: - rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace) - await rag.ainsert(input=content, file_paths=[docx_name]) - - # 标记为已处理 - pool = await init_postgres_pool() - async with pool.acquire() as conn: - await conn.execute(''' - UPDATE t_wechat_articles - SET is_finish = 1 - WHERE id = $1 - ''', article["id"]) - finally: - if rag: - await rag.finalize_storages() - if pool: - await pool.close() - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/dsLightRag/static/ChangChun.html b/dsLightRag/static/ChangChun.html index b71919b8..7b628e3a 100644 --- a/dsLightRag/static/ChangChun.html +++ b/dsLightRag/static/ChangChun.html @@ -217,6 +217,10 @@
力旺实验中学今年的中考成绩怎么样?
+
+ 在730分占比中,哪些学校表现优秀? +
+