HuangHai 7 days ago
commit b9cb8beb09

@ -1,4 +1,4 @@
# routes/QuestionController.py
# routes/TeachingModelController.py
from fastapi import APIRouter, Request, Response, Depends
from auth.dependencies import *

@ -23,7 +23,7 @@ async def lifespan(app: FastAPI):
await init_database()
# 启动异步任务
asyncio.create_task(train_document_task())
# asyncio.create_task(train_document_task())
yield
await shutdown_database()
@ -52,7 +52,7 @@ app.include_router(theme_router, prefix="/api/theme", tags=["theme"])
# 文档相关
app.include_router(document_router, prefix="/api/document", tags=["document"])
# 问题相关(大模型应用)
app.include_router(question_router, prefix="/api/question", tags=["question"])
app.include_router(teaching_model_router, prefix="/api/teaching/model", tags=["question"])
# 字典相关(Dm)
app.include_router(dm_router, prefix="/api/dm", tags=["dm"])
# 测试相关

@ -2,10 +2,10 @@
from api.controller.LoginController import router as login_router
from api.controller.DocumentController import router as document_router
from api.controller.ThemeController import router as theme_router
from api.controller.QuestionController import router as question_router
from api.controller.TeachingModelController import router as teaching_model_router
from api.controller.TestController import router as test_router
from api.controller.DmController import router as dm_router
from api.controller.UserController import router as user_router
# 导出所有路由
__all__ = ["login_router", "document_router", "theme_router", "question_router", "dm_router", "test_router", "user_router"]
__all__ = ["login_router", "document_router", "theme_router", "teaching_model_router", "dm_router", "test_router", "user_router"]

@ -12,11 +12,13 @@ WORKING_DIR = f"./output"
# 后台任务,监控是否有新的未训练的文档进行训练
async def train_document_task():
print("线程5秒后开始运行【监控是否有新的未训练的文档进行训练】")
num = 1
await asyncio.sleep(5) # 使用 asyncio.sleep 而不是 time.sleep
# 这里放置你的线程逻辑
while True:
# 这里可以放置你的线程要执行的代码
logging.info("开始查询是否有未训练的文档")
logging.info("开始查询是否有未训练的文档:" + str(num))
num = num + 1
no_train_document_sql: str = " SELECT * FROM t_ai_teaching_model_document WHERE is_deleted = 0 and train_flag = 0 ORDER BY create_time DESC"
no_train_document_result = await find_by_sql(no_train_document_sql, ())
if not no_train_document_result:
@ -49,4 +51,4 @@ async def train_document_task():
# execute_sql(update_sql)
# 添加适当的等待时间,避免频繁查询
await asyncio.sleep(60) # 每分钟查询一次
await asyncio.sleep(60) # 每分钟查询一次

@ -17,7 +17,7 @@ async def create_pool():
password=POSTGRES_PASSWORD,
database=POSTGRES_DATABASE,
min_size=1, # 设置连接池最小连接数
max_size=100 # 设置连接池最大连接数
max_size=10 # 设置连接池最大连接数
)
async def get_connection():

@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.10 (4)" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="D:\anaconda3\envs\py310" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">

@ -3,5 +3,5 @@
<component name="Black">
<option name="sdkName" value="D:\anaconda3\envs\lightrag" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (4)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="D:\anaconda3\envs\py310" project-jdk-type="Python SDK" />
</project>

File diff suppressed because one or more lines are too long

@ -4,21 +4,36 @@
# 微信爬爬猫---公众号文章抓取代码分析
# https://blog.csdn.net/yajuanpi4899/article/details/121584268
"""
# 查看selenium版本
pip show selenium
4.34.2
# 查看Chrome浏览器版本
chrome://version/
138.0.7204.101 (正式版本) 64
# 下载驱动包
https://googlechromelabs.github.io/chrome-for-testing/
https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.94/win64/chromedriver-win64.zip
"""
import asyncio
import datetime
import json
import logging
import random
import re
import time
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service as ChromeService
import requests
from Util.LightRagUtil import initialize_pg_rag
from Util.PostgreSQLUtil import init_postgres_pool
from Util.WxGzhUtil import init_wechat_browser, get_article_content
# 删除重复的日志配置,只保留以下内容
logger = logging.getLogger('WeiXinGongZhongHao')
logger = logging.getLogger('WxGzh')
logger.setLevel(logging.INFO)
# 确保只添加一个handler
@ -27,6 +42,11 @@ if not logger.handlers:
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
# 添加微信请求头
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Referer': 'https://mp.weixin.qq.com/'
}
async def get_wechat_sources():
"""从t_wechat_source表获取微信公众号列表"""
@ -39,24 +59,6 @@ async def get_wechat_sources():
await pool.close()
"""
# 查看selenium版本
pip show selenium
4.34.2
# 查看Chrome浏览器版本
chrome://version/
138.0.7204.101 (正式版本) 64
# 下载驱动包
https://googlechromelabs.github.io/chrome-for-testing/
https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.94/win64/chromedriver-win64.zip
"""
import time
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service as ChromeService
async def is_article_exist(pool, article_url):
"""检查文章URL是否已存在数据库中"""
try:
@ -72,12 +74,23 @@ async def is_article_exist(pool, article_url):
return False # 出错时默认返回False避免影响正常流程
async def save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, id):
# 先检查文章是否已存在
if await is_article_exist(pool, article_url):
logger.info(f"文章已存在,跳过保存: {article_url}")
return
async def save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, source_id):
# 在这里调用 lightrag进行知识库构建
workspace = 'ChangChun'
# 使用PG库后这个是没有用的,但目前的项目代码要求必传,就写一个吧。
WORKING_DIR = f"./output"
docx_name = f"{account_name}_{article_title}" # 组合来源和标题作为文档名
logger.info(f"开始处理文档: {docx_name}")
try:
rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace)
await rag.ainsert(input=content, file_paths=[docx_name])
finally:
if rag:
await rag.finalize_storages()
if pool:
await pool.close()
logger.info(f"保存文档到知识库成功: {docx_name}")
try:
async with pool.acquire() as conn:
await conn.execute('''
@ -85,139 +98,155 @@ async def save_article_to_db(pool, article_title, account_name, article_url, pub
(title, source, url, publish_time, content, source_id)
VALUES ($1, $2, $3, $4, $5, $6)
''', article_title, account_name, article_url,
publish_time, content, id)
publish_time, content, source_id)
except Exception as e:
logging.error(f"保存文章失败: {e}")
if __name__ == '__main__':
# 从文件cookies.txt中获取
async def initialize_wechat_session():
"""初始化微信会话获取cookies和token"""
with open('cookies.txt', 'r', encoding='utf-8') as f:
content = f.read()
# 使用json还原为json对象
cookies = json.loads(content)
# "expiry": 1787106233
# 检查是否有过期时间
global driver # 添加这行
expiry = cookies["expiry"]
if expiry:
# 换算出过期时间
expiry_time = time.localtime(expiry)
expiry_date = time.strftime("%Y-%m-%d %H:%M:%S", expiry_time)
# 获取当前时间戳
current_timestamp = time.time()
# 检查是否已过期
if current_timestamp > expiry:
logger.error("Cookie已过期")
exit()
# 移除expiry属性
del cookies["expiry"]
logger.info(f"cookies的过期时间一般是4天cookies过期时间%s" % expiry_date)
options = Options()
options.add_argument('-headless') # 无头参数,调试时可以注释掉
# 设置headers - 使用微信内置浏览器的User-Agent
header = {
"HOST": "mp.weixin.qq.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36 QBCore/4.0.1301.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2875.116 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63010200)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.5;q=0.4",
"Connection": "keep-alive"
}
service = ChromeService(executable_path=r"C:\Windows\System32\chromedriver.exe")
# 使用统一的初始化方式
options = Options()
options.add_argument('-headless')
driver = init_wechat_browser()
# 方法3使用requests库发送请求获取重定向URL
url = 'https://mp.weixin.qq.com'
response = requests.get(url=url, allow_redirects=False, cookies=cookies)
if 'Location' in response.headers:
redirect_url = response.headers.get("Location")
logger.info(f"重定向URL:%s"%redirect_url)
token_match = re.findall(r'token=(\d+)', redirect_url)
if token_match:
token = token_match[0]
logger.info(f"获取到的token:%s"%token)
return cookies, token
return None, None
async def get_wechat_account_list(cookies, token, account_name):
"""获取指定公众号的fakeid"""
search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?'
query_id = {
'action': 'search_biz',
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'query': account_name,
'begin': '0',
'count': '5'
}
search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id)
lists = search_response.json().get('list')[0]
return lists.get('fakeid')
async def get_article_list(cookies, token, fakeid):
"""获取公众号文章列表"""
appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?'
query_id_data = {
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'action': 'list_ex',
'begin': '0',
'count': '5',
'query': '',
'fakeid': fakeid,
'type': '9'
}
query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data)
return query_fakeid_response.json().get('app_msg_list')
article_urls = []
# 获取公众号列表
async def process_single_article(article_info, account_info, cookies, token):
"""处理单篇文章"""
article_url = article_info.get('link')
article_title = article_info.get('title')
publish_time = datetime.datetime.fromtimestamp(int(article_info.get("update_time")))
if '试卷' in article_title:
return False
try:
pool = await init_postgres_pool()
# 先检查文章是否已存在
if await is_article_exist(pool, article_url):
logger.info(f'文章已存在,跳过保存: {account_info["account_name"]}-{article_title}')
return False
content = get_article_content(article_url)
await save_article_to_db(pool, article_title, account_info["account_name"],
article_url, publish_time, content, account_info["id"])
return True
except Exception as e:
logger.error(f"处理文章时出错: {e}")
return False
finally:
if 'pool' in locals():
await pool.close()
async def process_wechat_account(account_info, cookies, token):
"""处理单个公众号的所有文章"""
cnt = 0
fakeid = await get_wechat_account_list(cookies, token, account_info["account_name"])
articles = await get_article_list(cookies, token, fakeid)
for article in articles:
success = await process_single_article(article, account_info, cookies, token)
if success:
cnt += 1
time.sleep(1)
logger.info(f"成功获取公众号: {account_info['account_name']} {cnt}篇文章。")
return cnt
async def main():
"""主函数"""
while True:
try:
logger.info("开始执行微信公众号文章采集任务")
cookies, token = await initialize_wechat_session()
if not cookies or not token:
logger.error("初始化微信会话失败")
continue
account_list = await get_wechat_sources()
for account in account_list:
await process_wechat_account(account, cookies, token)
logger.info("本次采集任务完成等待30分钟后再次执行")
await asyncio.sleep(30 * 60) # 30分钟
except Exception as e:
logger.error(f"主循环发生错误: {e}")
await asyncio.sleep(30 * 60) # 出错后也等待30分钟
finally:
if 'driver' in globals():
driver.quit()
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
gzlist = loop.run_until_complete(get_wechat_sources())
loop.run_until_complete(main())
finally:
loop.close()
# 爬取文章
for item in gzlist:
account_name = item["account_name"]
account_id = item["account_id"]
id = item["id"]
# 搜索微信公众号的接口地址
search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?'
# 搜索微信公众号接口需要传入的参数有三个变量微信公众号token、随机数random、搜索的微信公众号名字
query_id = {
'action': 'search_biz',
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'query': account_name,
'begin': '0',
'count': '5'
}
# 打开搜索微信公众号接口地址需要传入相关参数信息如cookies、params、headers
search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id)
# 取搜索结果中的第一个公众号
lists = search_response.json().get('list')[0]
# 获取这个公众号的fakeid后面爬取公众号文章需要此字段
fakeid = lists.get('fakeid')
logging.info("fakeid:" + fakeid)
# 微信公众号文章接口地址
appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?'
# 搜索文章需要传入几个参数登录的公众号token、要爬取文章的公众号fakeid、随机数random
query_id_data = {
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'action': 'list_ex',
'begin': '0', # 不同页此参数变化变化规则为每页加5
'count': '5',
'query': '',
'fakeid': fakeid,
'type': '9'
}
# 打开搜索的微信公众号文章列表页
query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data)
fakeid_list = query_fakeid_response.json().get('app_msg_list')
for item in fakeid_list:
article_url = item.get('link')
article_title = item.get('title')
publish_time = datetime.datetime.fromtimestamp(int(item.get("update_time")))
if '试卷' in article_title: # 过滤掉试卷
continue
logger.info(f"正在处理文章: {article_title} ({publish_time})")
content = get_article_content(article_url)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
pool = loop.run_until_complete(init_postgres_pool())
loop.run_until_complete(
save_article_to_db(pool, article_title, account_name, article_url, publish_time, content,
id))
finally:
loop.run_until_complete(pool.close())
loop.close()
time.sleep(1)
# 关闭浏览器
driver.quit()

@ -1,63 +0,0 @@
import asyncio
import logging
from Util.DocxUtil import get_docx_content_by_pandoc
from Util.LightRagUtil import initialize_pg_rag
from Util.PostgreSQLUtil import init_postgres_pool
logger = logging.getLogger('lightrag')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
# 使用PG库后这个是没有用的,但目前的项目代码要求必传,就写一个吧。
WORKING_DIR = f"./output"
async def get_unprocessed_articles():
"""从t_wechat_articles表获取未处理的文章"""
try:
pool = await init_postgres_pool()
async with pool.acquire() as conn:
rows = await conn.fetch('''
SELECT id, source, title, content
FROM t_wechat_articles
WHERE is_finish = 0
''')
return [dict(row) for row in rows]
finally:
await pool.close()
async def main():
# 获取未处理的文章
articles = await get_unprocessed_articles()
logger.info(f"共获取到{len(articles)}篇未处理的文章")
for article in articles:
workspace = 'ChangChun'
docx_name = f"{article['source']}_{article['title']}" # 组合来源和标题作为文档名
content = article["content"] # 使用文章内容
logger.info(f"开始处理文档: {docx_name}")
try:
rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace)
await rag.ainsert(input=content, file_paths=[docx_name])
# 标记为已处理
pool = await init_postgres_pool()
async with pool.acquire() as conn:
await conn.execute('''
UPDATE t_wechat_articles
SET is_finish = 1
WHERE id = $1
''', article["id"])
finally:
if rag:
await rag.finalize_storages()
if pool:
await pool.close()
if __name__ == "__main__":
asyncio.run(main())

@ -217,6 +217,10 @@
<div class="example-item" onclick="fillExample('力旺实验中学今年的中考成绩怎么样?')">
力旺实验中学今年的中考成绩怎么样?
</div>
<div class="example-item" onclick="fillExample('在730分占比中哪些学校表现优秀')">
在730分占比中哪些学校表现优秀
</div>
</div>
</div>
</div>

Loading…
Cancel
Save