You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
100 lines
3.2 KiB
100 lines
3.2 KiB
import datetime
|
|
import random
|
|
import requests
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.chrome.service import Service as ChromeService
|
|
from selenium.webdriver.common.by import By
|
|
|
|
def init_wechat_browser():
|
|
"""初始化微信爬虫浏览器实例"""
|
|
options = Options()
|
|
options.add_argument('-headless')
|
|
service = ChromeService(executable_path=r"C:\Windows\System32\chromedriver.exe")
|
|
return webdriver.Chrome(service=service, options=options)
|
|
|
|
def get_wechat_articles(account_name, account_id, token, cookies, header):
|
|
"""获取指定公众号的文章列表"""
|
|
article_urls = []
|
|
|
|
# 搜索微信公众号的接口地址
|
|
search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?'
|
|
query_id = {
|
|
'action': 'search_biz',
|
|
'token': token,
|
|
'lang': 'zh_CN',
|
|
'f': 'json',
|
|
'ajax': '1',
|
|
'random': random.random(),
|
|
'query': account_name,
|
|
'begin': '0',
|
|
'count': '5'
|
|
}
|
|
|
|
# 完整实现搜索和获取文章逻辑
|
|
search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id)
|
|
lists = search_response.json().get('list')[0]
|
|
fakeid = lists.get('fakeid')
|
|
|
|
# 微信公众号文章接口
|
|
appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?'
|
|
query_id_data = {
|
|
'token': token,
|
|
'lang': 'zh_CN',
|
|
'f': 'json',
|
|
'ajax': '1',
|
|
'random': random.random(),
|
|
'action': 'list_ex',
|
|
'begin': '0',
|
|
'count': '5',
|
|
'query': '',
|
|
'fakeid': fakeid,
|
|
'type': '9'
|
|
}
|
|
|
|
query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data)
|
|
fakeid_list = query_fakeid_response.json().get('app_msg_list')
|
|
|
|
for item in fakeid_list:
|
|
article_urls.append({
|
|
'title': item.get('title'),
|
|
'url': item.get('link'),
|
|
'publish_time': datetime.datetime.fromtimestamp(int(item.get("update_time"))).strftime('%Y-%m-%d %H:%M:%S')
|
|
})
|
|
|
|
return article_urls
|
|
|
|
def get_article_content(url):
|
|
"""
|
|
获取微信公众号文章内容
|
|
:param url: 文章URL
|
|
:return: 文章内容文本
|
|
"""
|
|
options = Options()
|
|
options.add_argument('-headless')
|
|
service = ChromeService(executable_path=r"C:\Windows\System32\chromedriver.exe")
|
|
driver = webdriver.Chrome(service=service, options=options)
|
|
|
|
try:
|
|
driver.get(url)
|
|
html_content = driver.find_element(By.CLASS_NAME, "rich_media").text
|
|
|
|
# 处理内容,提取空行后的文本
|
|
lines = html_content.split('\n')
|
|
content_after_empty_line = ""
|
|
found_empty_line = False
|
|
|
|
for line in lines:
|
|
if not found_empty_line and line.strip() == "":
|
|
found_empty_line = True
|
|
continue
|
|
|
|
if found_empty_line:
|
|
content_after_empty_line += line + "\n"
|
|
|
|
if not found_empty_line:
|
|
content_after_empty_line = html_content
|
|
|
|
return content_after_empty_line.replace("\n\n", "\n")
|
|
finally:
|
|
driver.quit() |