# 详解(一)Python + Selenium 批量采集微信公众号,搭建自己的微信公众号每日AI简报,告别信息焦虑 # https://blog.csdn.net/k352733625/article/details/149222945 # 微信爬爬猫---公众号文章抓取代码分析 # https://blog.csdn.net/yajuanpi4899/article/details/121584268 import asyncio import datetime import json import logging import random import re import requests from Util.PostgreSQLUtil import init_postgres_pool from Util.WxGzhUtil import init_wechat_browser, get_article_content # 在程序开始时添加以下配置 logging.basicConfig( level=logging.INFO, # 设置日志级别为INFO format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 或者如果你想更详细地控制日志输出 logger = logging.getLogger('WeiXinGongZhongHao') logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(handler) async def get_wechat_sources(): """从t_wechat_source表获取微信公众号列表""" try: pool = await init_postgres_pool() async with pool.acquire() as conn: rows = await conn.fetch('SELECT * FROM t_wechat_source') return [dict(row) for row in rows] finally: await pool.close() """ # 查看selenium版本 pip show selenium 4.34.2 # 查看Chrome浏览器版本 chrome://version/ 138.0.7204.101 (正式版本) (64 位) # 下载驱动包 https://googlechromelabs.github.io/chrome-for-testing/ https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.94/win64/chromedriver-win64.zip """ import time from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service as ChromeService async def is_article_exist(pool, article_url): """检查文章URL是否已存在数据库中""" try: async with pool.acquire() as conn: row = await conn.fetchrow(''' SELECT 1 FROM t_wechat_articles WHERE url = $1 LIMIT 1 ''', article_url) return row is not None except Exception as e: logging.error(f"检查文章存在性失败: {e}") return False # 出错时默认返回False,避免影响正常流程 async def save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, id): # 先检查文章是否已存在 if await is_article_exist(pool, article_url): logging.info(f"文章已存在,跳过保存: {article_url}") return try: async with pool.acquire() as conn: await conn.execute(''' INSERT INTO t_wechat_articles (title, source, url, publish_time, content, source_id) VALUES ($1, $2, $3, $4, $5, $6) ''', article_title, account_name, article_url, publish_time, content, id) except Exception as e: logging.error(f"保存文章失败: {e}") if __name__ == '__main__': # 从文件cookies.txt中获取 with open('cookies.txt', 'r', encoding='utf-8') as f: content = f.read() # 使用json还原为json对象 cookies = json.loads(content) # "expiry": 1787106233 # 检查是否有过期时间 expiry = cookies["expiry"] if expiry: # 换算出过期时间 expiry_time = time.localtime(expiry) expiry_date = time.strftime("%Y-%m-%d %H:%M:%S", expiry_time) logger.info(f"cookies的过期时间一般是4天,cookies过期时间:%s" % expiry_date) # 获取当前时间戳 current_timestamp = time.time() # 检查是否已过期 if current_timestamp > expiry: logger.error("Cookie已过期") exit() # 移除expiry属性 del cookies["expiry"] options = Options() options.add_argument('-headless') # 无头参数,调试时可以注释掉 # 设置headers - 使用微信内置浏览器的User-Agent header = { "HOST": "mp.weixin.qq.com", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36 QBCore/4.0.1301.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2875.116 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63010200)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.5;q=0.4", "Connection": "keep-alive" } service = ChromeService(executable_path=r"C:\Windows\System32\chromedriver.exe") # 使用统一的初始化方式 driver = init_wechat_browser() # 方法3:使用requests库发送请求获取重定向URL url = 'https://mp.weixin.qq.com' response = requests.get(url=url, allow_redirects=False, cookies=cookies) if 'Location' in response.headers: redirect_url = response.headers.get("Location") logger.info("重定向URL:", redirect_url) token_match = re.findall(r'token=(\d+)', redirect_url) if token_match: token = token_match[0] logger.info("获取到的token:", token) article_urls = [] # 获取公众号列表 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: gzlist = loop.run_until_complete(get_wechat_sources()) finally: loop.close() # 爬取文章 for item in gzlist: account_name = item["account_name"] account_id = item["account_id"] id = item["id"] # 搜索微信公众号的接口地址 search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?' # 搜索微信公众号接口需要传入的参数,有三个变量:微信公众号token、随机数random、搜索的微信公众号名字 query_id = { 'action': 'search_biz', 'token': token, 'lang': 'zh_CN', 'f': 'json', 'ajax': '1', 'random': random.random(), 'query': account_name, 'begin': '0', 'count': '5' } # 打开搜索微信公众号接口地址,需要传入相关参数信息如:cookies、params、headers search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id) # 取搜索结果中的第一个公众号 lists = search_response.json().get('list')[0] # 获取这个公众号的fakeid,后面爬取公众号文章需要此字段 fakeid = lists.get('fakeid') logging.info("fakeid:" + fakeid) # 微信公众号文章接口地址 appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?' # 搜索文章需要传入几个参数:登录的公众号token、要爬取文章的公众号fakeid、随机数random query_id_data = { 'token': token, 'lang': 'zh_CN', 'f': 'json', 'ajax': '1', 'random': random.random(), 'action': 'list_ex', 'begin': '0', # 不同页,此参数变化,变化规则为每页加5 'count': '5', 'query': '', 'fakeid': fakeid, 'type': '9' } # 打开搜索的微信公众号文章列表页 query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data) fakeid_list = query_fakeid_response.json().get('app_msg_list') for item in fakeid_list: article_url = item.get('link') article_title = item.get('title') publish_time = datetime.datetime.fromtimestamp(int(item.get("update_time"))) if '试卷' in article_title: # 过滤掉试卷 continue logger.info(f"正在处理文章: {article_title} ({publish_time})") content = get_article_content(article_url) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: pool = loop.run_until_complete(init_postgres_pool()) loop.run_until_complete( save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, id)) finally: loop.run_until_complete(pool.close()) loop.close() time.sleep(1) # 关闭浏览器 driver.quit()