You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

155 lines
6.5 KiB

1 week ago
# 详解Python + Selenium 批量采集微信公众号搭建自己的微信公众号每日AI简报告别信息焦虑
# https://blog.csdn.net/k352733625/article/details/149222945
# 微信爬爬猫---公众号文章抓取代码分析
# https://blog.csdn.net/yajuanpi4899/article/details/121584268
1 week ago
1 week ago
import datetime
1 week ago
import json
1 week ago
import logging
import random
import re
import requests
"""
# 查看selenium版本
pip show selenium
4.34.2
# 查看Chrome浏览器版本
chrome://version/
138.0.7204.101 (正式版本) 64
# 下载驱动包
https://googlechromelabs.github.io/chrome-for-testing/
https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.94/win64/chromedriver-win64.zip
"""
import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service as ChromeService
if __name__ == '__main__':
1 week ago
# 从文件cookies.txt中获取
with open('cookies.txt', 'r', encoding='utf-8') as f:
content = f.read()
# 使用json还原为json对象
cookies = json.loads(content)
1 week ago
# "expiry": 1787106233
# 检查是否有过期时间
expiry=cookies["expiry"]
if expiry:
# 换算出过期时间
expiry_time = time.localtime(expiry)
expiry_date = time.strftime("%Y-%m-%d %H:%M:%S", expiry_time)
print("cookies的过期时间一般是4天cookies过期时间", expiry_date)
# 获取当前时间戳
current_timestamp = time.time()
# 检查是否已过期
if current_timestamp > expiry:
print("Cookie已过期")
exit()
1 week ago
options = Options()
1 week ago
options.add_argument('-headless') # 无头参数,调试时可以注释掉
1 week ago
# 设置headers - 使用微信内置浏览器的User-Agent
header = {
"HOST": "mp.weixin.qq.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36 QBCore/4.0.1301.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2875.116 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63010200)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.5;q=0.4",
"Connection": "keep-alive"
}
service = ChromeService(executable_path=r"C:\Windows\System32\chromedriver.exe")
driver = webdriver.Chrome(service=service, options=options)
# 方法3使用requests库发送请求获取重定向URL
url = 'https://mp.weixin.qq.com'
response = requests.get(url=url, allow_redirects=False, cookies=cookies)
if 'Location' in response.headers:
redirect_url = response.headers.get("Location")
print("重定向URL:", redirect_url)
token_match = re.findall(r'token=(\d+)', redirect_url)
if token_match:
token = token_match[0]
print("获取到的token:", token)
logging.info("微信token:" + token)
article_urls = []
gzlist = [{"account_name": "长春教育八卦阵", "account_id": "jybg100"}]
for item in gzlist:
account_name = item["account_name"]
account_id = item["account_id"]
# 搜索微信公众号的接口地址
search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?'
# 搜索微信公众号接口需要传入的参数有三个变量微信公众号token、随机数random、搜索的微信公众号名字
query_id = {
'action': 'search_biz',
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'query': account_name,
'begin': '0',
'count': '5'
}
# 打开搜索微信公众号接口地址需要传入相关参数信息如cookies、params、headers
search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id)
# 取搜索结果中的第一个公众号
lists = search_response.json().get('list')[0]
# 获取这个公众号的fakeid后面爬取公众号文章需要此字段
fakeid = lists.get('fakeid')
logging.info("fakeid:" + fakeid)
# 微信公众号文章接口地址
appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?'
# 搜索文章需要传入几个参数登录的公众号token、要爬取文章的公众号fakeid、随机数random
query_id_data = {
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'action': 'list_ex',
'begin': '0', # 不同页此参数变化变化规则为每页加5
'count': '5',
'query': '',
'fakeid': fakeid,
'type': '9'
}
# 打开搜索的微信公众号文章列表页
query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data)
fakeid_list = query_fakeid_response.json().get('app_msg_list')
for item in fakeid_list:
# 采集item示例
new_article = {
'title': item.get('title'),
'article_url': item.get('link'),
'account_id': account_id,
'account_name': account_name,
'publish_time': datetime.datetime.fromtimestamp(int(item.get("update_time"))).strftime(
'%Y-%m-%d %H:%M:%S'),
'collection_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
logging.info("new_article:", new_article)
1 week ago
article_urls.append({"title": item.get('title'), "url": item.get('link'),
"publish_time": datetime.datetime.fromtimestamp(int(item.get("update_time"))).strftime(
'%Y-%m-%d %H:%M:%S')})
1 week ago
time.sleep(1)
for x in article_urls:
print(x)
1 week ago
# 将返回的地址写入到文件
with open('article_urls.txt', 'w', encoding='utf-8') as f:
1 week ago
for record in article_urls:
f.write(record['title']+" "+record['publish_time']+" "+record['url'] + '\n')
1 week ago
# 关闭浏览器
driver.quit()
print("所有文章爬取完成!")