You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
6.8 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 详解Python + Selenium 批量采集微信公众号搭建自己的微信公众号每日AI简报告别信息焦虑
# https://blog.csdn.net/k352733625/article/details/149222945
# 微信爬爬猫---公众号文章抓取代码分析
# https://blog.csdn.net/yajuanpi4899/article/details/121584268
import asyncio
import datetime
import json
import logging
import random
import re
import requests
from Util.PostgreSQLUtil import init_postgres_pool
from Util.WxGzhUtil import init_wechat_browser, get_article_content
async def get_wechat_sources():
"""从t_wechat_source表获取微信公众号列表"""
try:
pool = await init_postgres_pool()
async with pool.acquire() as conn:
rows = await conn.fetch('SELECT account_id, account_name FROM t_wechat_source')
return [dict(row) for row in rows]
finally:
await pool.close()
"""
# 查看selenium版本
pip show selenium
4.34.2
# 查看Chrome浏览器版本
chrome://version/
138.0.7204.101 (正式版本) 64 位)
# 下载驱动包
https://googlechromelabs.github.io/chrome-for-testing/
https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.94/win64/chromedriver-win64.zip
"""
import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service as ChromeService
if __name__ == '__main__':
# 从文件cookies.txt中获取
with open('cookies.txt', 'r', encoding='utf-8') as f:
content = f.read()
# 使用json还原为json对象
cookies = json.loads(content)
# "expiry": 1787106233
# 检查是否有过期时间
expiry = cookies["expiry"]
if expiry:
# 换算出过期时间
expiry_time = time.localtime(expiry)
expiry_date = time.strftime("%Y-%m-%d %H:%M:%S", expiry_time)
print("cookies的过期时间一般是4天cookies过期时间", expiry_date)
# 获取当前时间戳
current_timestamp = time.time()
# 检查是否已过期
if current_timestamp > expiry:
print("Cookie已过期")
exit()
# 移除expiry属性
del cookies["expiry"]
options = Options()
options.add_argument('-headless') # 无头参数,调试时可以注释掉
# 设置headers - 使用微信内置浏览器的User-Agent
header = {
"HOST": "mp.weixin.qq.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36 QBCore/4.0.1301.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2875.116 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63010200)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.5;q=0.4",
"Connection": "keep-alive"
}
service = ChromeService(executable_path=r"C:\Windows\System32\chromedriver.exe")
driver = webdriver.Chrome(service=service, options=options) # 删除这行
# 使用统一的初始化方式
driver = init_wechat_browser()
# 方法3使用requests库发送请求获取重定向URL
url = 'https://mp.weixin.qq.com'
response = requests.get(url=url, allow_redirects=False, cookies=cookies)
if 'Location' in response.headers:
redirect_url = response.headers.get("Location")
print("重定向URL:", redirect_url)
token_match = re.findall(r'token=(\d+)', redirect_url)
if token_match:
token = token_match[0]
print("获取到的token:", token)
logging.info("微信token:" + token)
article_urls = []
# 初始化浏览器
driver = init_wechat_browser()
# 获取公众号列表
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
gzlist = loop.run_until_complete(get_wechat_sources())
finally:
loop.close()
# 爬取文章
for item in gzlist:
account_name = item["account_name"]
account_id = item["account_id"]
# 搜索微信公众号的接口地址
search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?'
# 搜索微信公众号接口需要传入的参数有三个变量微信公众号token、随机数random、搜索的微信公众号名字
query_id = {
'action': 'search_biz',
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'query': account_name,
'begin': '0',
'count': '5'
}
# 打开搜索微信公众号接口地址需要传入相关参数信息如cookies、params、headers
search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id)
# 取搜索结果中的第一个公众号
lists = search_response.json().get('list')[0]
# 获取这个公众号的fakeid后面爬取公众号文章需要此字段
fakeid = lists.get('fakeid')
logging.info("fakeid:" + fakeid)
# 微信公众号文章接口地址
appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?'
# 搜索文章需要传入几个参数登录的公众号token、要爬取文章的公众号fakeid、随机数random
query_id_data = {
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'action': 'list_ex',
'begin': '0', # 不同页此参数变化变化规则为每页加5
'count': '5',
'query': '',
'fakeid': fakeid,
'type': '9'
}
# 打开搜索的微信公众号文章列表页
query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data)
fakeid_list = query_fakeid_response.json().get('app_msg_list')
for item in fakeid_list:
# 采集item示例
article_url = item.get('link')
article_title = item.get('title')
publish_time = datetime.datetime.fromtimestamp(int(item.get("update_time"))).strftime('%Y-%m-%d %H:%M:%S')
# 直接获取并显示文章内容
print(f"正在处理文章: {article_title} ({publish_time})")
content = get_article_content(article_url)
print(f"文章内容预览: {content[:200]}...")
time.sleep(1)
# 关闭浏览器
driver.quit()