You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

235 lines
9.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 详解Python + Selenium 批量采集微信公众号搭建自己的微信公众号每日AI简报告别信息焦虑
# https://blog.csdn.net/k352733625/article/details/149222945
# 微信爬爬猫---公众号文章抓取代码分析
# https://blog.csdn.net/yajuanpi4899/article/details/121584268
"""
# 查看selenium版本
pip show selenium
4.34.2
# 查看Chrome浏览器版本
chrome://version/
138.0.7204.101 (正式版本) 64 位)
# 下载驱动包
https://googlechromelabs.github.io/chrome-for-testing/
https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.94/win64/chromedriver-win64.zip
"""
import asyncio
import datetime
import json
import logging
import random
import re
import time
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service as ChromeService
import requests
from Util.LightRagUtil import initialize_pg_rag
from Util.PostgreSQLUtil import init_postgres_pool
from Util.WxGzhUtil import init_wechat_browser, get_article_content
# 删除重复的日志配置,只保留以下内容
logger = logging.getLogger('WxGzh')
logger.setLevel(logging.INFO)
# 确保只添加一个handler
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
async def get_wechat_sources():
"""从t_wechat_source表获取微信公众号列表"""
try:
pool = await init_postgres_pool()
async with pool.acquire() as conn:
rows = await conn.fetch('SELECT * FROM t_wechat_source')
return [dict(row) for row in rows]
finally:
await pool.close()
async def is_article_exist(pool, article_url):
"""检查文章URL是否已存在数据库中"""
try:
async with pool.acquire() as conn:
row = await conn.fetchrow('''
SELECT 1
FROM t_wechat_articles
WHERE url = $1 LIMIT 1
''', article_url)
return row is not None
except Exception as e:
logging.error(f"检查文章存在性失败: {e}")
return False # 出错时默认返回False避免影响正常流程
async def save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, source_id):
# 先检查文章是否已存在
if await is_article_exist(pool, article_url):
logger.info(f"文章已存在,跳过保存: {article_url}")
return
# 在这里调用 lightrag进行知识库构建
workspace = 'ChangChun'
# 使用PG库后这个是没有用的,但目前的项目代码要求必传,就写一个吧。
WORKING_DIR = f"./output"
docx_name = f"{account_name}_{article_title}" # 组合来源和标题作为文档名
logger.info(f"开始处理文档: {docx_name}")
try:
rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace)
await rag.ainsert(input=content, file_paths=[docx_name])
finally:
if rag:
await rag.finalize_storages()
if pool:
await pool.close()
logger.info(f"保存文档到知识库成功: {docx_name}")
try:
async with pool.acquire() as conn:
await conn.execute('''
INSERT INTO t_wechat_articles
(title, source, url, publish_time, content, source_id)
VALUES ($1, $2, $3, $4, $5, $6)
''', article_title, account_name, article_url,
publish_time, content, source_id)
except Exception as e:
logging.error(f"保存文章失败: {e}")
if __name__ == '__main__':
# 从文件cookies.txt中获取
with open('cookies.txt', 'r', encoding='utf-8') as f:
content = f.read()
# 使用json还原为json对象
cookies = json.loads(content)
# 检查是否有过期时间
expiry = cookies["expiry"]
if expiry:
# 换算出过期时间
expiry_time = time.localtime(expiry)
expiry_date = time.strftime("%Y-%m-%d %H:%M:%S", expiry_time)
# 获取当前时间戳
current_timestamp = time.time()
# 检查是否已过期
if current_timestamp > expiry:
logger.error("Cookie已过期")
exit()
# 移除expiry属性
del cookies["expiry"]
logger.info(f"cookies的过期时间一般是4天cookies过期时间%s" % expiry_date)
options = Options()
options.add_argument('-headless') # 无头参数,调试时可以注释掉
# 设置headers
header = {
"HOST": "mp.weixin.qq.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36 QBCore/4.0.1301.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2875.116 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63010200)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.5;q=0.4",
"Connection": "keep-alive"
}
service = ChromeService(executable_path=r"C:\Windows\System32\chromedriver.exe")
# 使用统一的初始化方式
driver = init_wechat_browser()
# 方法使用requests库发送请求获取重定向URL
url = 'https://mp.weixin.qq.com'
response = requests.get(url=url, allow_redirects=False, cookies=cookies)
if 'Location' in response.headers:
redirect_url = response.headers.get("Location")
logger.info(f"重定向URL:%s" % redirect_url)
token_match = re.findall(r'token=(\d+)', redirect_url)
if token_match:
token = token_match[0]
logger.info(f"获取到的token:%s" % token)
article_urls = []
# 获取公众号列表
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
gzlist = loop.run_until_complete(get_wechat_sources())
finally:
loop.close()
# 爬取文章
for item in gzlist:
account_name = item["account_name"]
account_id = item["account_id"]
id = item["id"]
# 搜索微信公众号的接口地址
search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?'
# 搜索微信公众号接口需要传入的参数有三个变量微信公众号token、随机数random、搜索的微信公众号名字
query_id = {
'action': 'search_biz',
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'query': account_name,
'begin': '0',
'count': '5'
}
# 打开搜索微信公众号接口地址需要传入相关参数信息如cookies、params、headers
search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id)
# 取搜索结果中的第一个公众号
lists = search_response.json().get('list')[0]
# 获取这个公众号的fakeid后面爬取公众号文章需要此字段
fakeid = lists.get('fakeid')
logging.info("fakeid:" + fakeid)
# 微信公众号文章接口地址
appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?'
# 搜索文章需要传入几个参数登录的公众号token、要爬取文章的公众号fakeid、随机数random
query_id_data = {
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'action': 'list_ex',
'begin': '0', # 不同页此参数变化变化规则为每页加5
'count': '5',
'query': '',
'fakeid': fakeid,
'type': '9'
}
# 打开搜索的微信公众号文章列表页
query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data)
fakeid_list = query_fakeid_response.json().get('app_msg_list')
for item in fakeid_list:
article_url = item.get('link')
article_title = item.get('title')
publish_time = datetime.datetime.fromtimestamp(int(item.get("update_time")))
if '试卷' in article_title: # 过滤掉试卷,致知物理中有大量试卷,我做教育资讯的不关心试卷
continue
logger.info(f"正在处理文章: {article_title} ({publish_time})")
logger.info(f"正在获取文章: {article_title}内容...")
content = get_article_content(article_url)
logger.info(f"成功获取文章: {article_title}内容。")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
pool = loop.run_until_complete(init_postgres_pool())
loop.run_until_complete(
save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, id))
finally:
loop.run_until_complete(pool.close())
loop.close()
time.sleep(1)
# 关闭浏览器
driver.quit()