main
HuangHai 7 days ago
parent 3af27fe9ff
commit 6d44aac079

@ -42,6 +42,11 @@ if not logger.handlers:
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
# 添加微信请求头
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Referer': 'https://mp.weixin.qq.com/'
}
async def get_wechat_sources():
"""从t_wechat_source表获取微信公众号列表"""
@ -101,139 +106,136 @@ async def save_article_to_db(pool, article_title, account_name, article_url, pub
logging.error(f"保存文章失败: {e}")
if __name__ == '__main__':
# 从文件cookies.txt中获取
async def initialize_wechat_session():
"""初始化微信会话获取cookies和token"""
with open('cookies.txt', 'r', encoding='utf-8') as f:
content = f.read()
# 使用json还原为json对象
cookies = json.loads(content)
# 检查是否有过期时间
global driver # 添加这行
expiry = cookies["expiry"]
if expiry:
# 换算出过期时间
expiry_time = time.localtime(expiry)
expiry_date = time.strftime("%Y-%m-%d %H:%M:%S", expiry_time)
# 获取当前时间戳
current_timestamp = time.time()
# 检查是否已过期
if current_timestamp > expiry:
logger.error("Cookie已过期")
exit()
# 移除expiry属性
del cookies["expiry"]
logger.info(f"cookies的过期时间一般是4天cookies过期时间%s" % expiry_date)
options = Options()
options.add_argument('-headless') # 无头参数,调试时可以注释掉
# 设置headers
header = {
"HOST": "mp.weixin.qq.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36 QBCore/4.0.1301.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2875.116 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63010200)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.5;q=0.4",
"Connection": "keep-alive"
}
service = ChromeService(executable_path=r"C:\Windows\System32\chromedriver.exe")
# 使用统一的初始化方式
options = Options()
options.add_argument('-headless')
driver = init_wechat_browser()
# 方法使用requests库发送请求获取重定向URL
url = 'https://mp.weixin.qq.com'
response = requests.get(url=url, allow_redirects=False, cookies=cookies)
if 'Location' in response.headers:
redirect_url = response.headers.get("Location")
logger.info(f"重定向URL:%s" % redirect_url)
token_match = re.findall(r'token=(\d+)', redirect_url)
if token_match:
token = token_match[0]
logger.info(f"获取到的token:%s" % token)
return cookies, token
return None, None
async def get_wechat_account_list(cookies, token, account_name):
"""获取指定公众号的fakeid"""
search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?'
query_id = {
'action': 'search_biz',
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'query': account_name,
'begin': '0',
'count': '5'
}
article_urls = []
search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id)
lists = search_response.json().get('list')[0]
return lists.get('fakeid')
async def get_article_list(cookies, token, fakeid):
"""获取公众号文章列表"""
appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?'
query_id_data = {
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'action': 'list_ex',
'begin': '0',
'count': '5',
'query': '',
'fakeid': fakeid,
'type': '9'
}
query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data)
return query_fakeid_response.json().get('app_msg_list')
async def process_single_article(article_info, account_info, cookies, token):
"""处理单篇文章"""
article_url = article_info.get('link')
article_title = article_info.get('title')
publish_time = datetime.datetime.fromtimestamp(int(article_info.get("update_time")))
if '试卷' in article_title:
return False
# 获取公众号列表
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
gzlist = loop.run_until_complete(get_wechat_sources())
pool = await init_postgres_pool()
content = get_article_content(article_url)
await save_article_to_db(pool, article_title, account_info["account_name"],
article_url, publish_time, content, account_info["id"])
return True
except Exception as e:
logger.error(f"处理文章时出错: {e}")
return False
finally:
loop.close()
if 'pool' in locals():
await pool.close()
async def process_wechat_account(account_info, cookies, token):
"""处理单个公众号的所有文章"""
cnt = 0
fakeid = await get_wechat_account_list(cookies, token, account_info["account_name"])
articles = await get_article_list(cookies, token, fakeid)
for article in articles:
success = await process_single_article(article, account_info, cookies, token)
if success:
cnt += 1
time.sleep(1)
logger.info(f"成功获取公众号: {account_info['account_name']} {cnt}篇文章。")
return cnt
async def main():
"""主函数"""
cookies, token = await initialize_wechat_session()
if not cookies or not token:
logger.error("初始化微信会话失败")
return
account_list = await get_wechat_sources()
for account in account_list:
await process_wechat_account(account, cookies, token)
# 爬取文章
for item in gzlist:
cnt = 0
account_name = item["account_name"]
account_id = item["account_id"]
id = item["id"]
# 搜索微信公众号的接口地址
search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?'
# 搜索微信公众号接口需要传入的参数有三个变量微信公众号token、随机数random、搜索的微信公众号名字
query_id = {
'action': 'search_biz',
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'query': account_name,
'begin': '0',
'count': '5'
}
# 打开搜索微信公众号接口地址需要传入相关参数信息如cookies、params、headers
search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id)
# 取搜索结果中的第一个公众号
lists = search_response.json().get('list')[0]
# 获取这个公众号的fakeid后面爬取公众号文章需要此字段
fakeid = lists.get('fakeid')
logging.info("fakeid:" + fakeid)
# 微信公众号文章接口地址
appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?'
# 搜索文章需要传入几个参数登录的公众号token、要爬取文章的公众号fakeid、随机数random
query_id_data = {
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'action': 'list_ex',
'begin': '0', # 不同页此参数变化变化规则为每页加5
'count': '5',
'query': '',
'fakeid': fakeid,
'type': '9'
}
# 打开搜索的微信公众号文章列表页
query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data)
fakeid_list = query_fakeid_response.json().get('app_msg_list')
for item in fakeid_list:
article_url = item.get('link')
article_title = item.get('title')
publish_time = datetime.datetime.fromtimestamp(int(item.get("update_time")))
if '试卷' in article_title: # 过滤掉试卷,致知物理中有大量试卷,我做教育资讯的不关心试卷
continue
logger.info(f"正在处理文章: {article_title} ({publish_time})")
logger.info(f"正在获取文章: {article_title}内容...")
content = get_article_content(article_url)
logger.info(f"成功获取文章: {article_title}内容。")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
pool = loop.run_until_complete(init_postgres_pool())
loop.run_until_complete(
save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, id))
cnt = cnt + 1
finally:
loop.run_until_complete(pool.close())
loop.close()
# 休息1秒防止频繁访问被封
time.sleep(1)
logger.info(f"成功获取公众号: {account_name} {cnt}篇文章。")
# 关闭浏览器
driver.quit()
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()

Loading…
Cancel
Save