# 详解(一)Python + Selenium 批量采集微信公众号,搭建自己的微信公众号每日AI简报,告别信息焦虑 # https://blog.csdn.net/k352733625/article/details/149222945 # 微信爬爬猫---公众号文章抓取代码分析 # https://blog.csdn.net/yajuanpi4899/article/details/121584268 """ # 查看selenium版本 pip show selenium 4.34.2 # 查看Chrome浏览器版本 chrome://version/ 138.0.7204.101 (正式版本) (64 位) # 下载驱动包 https://googlechromelabs.github.io/chrome-for-testing/ https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.94/win64/chromedriver-win64.zip """ import asyncio import datetime import json import logging import random import re import time from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service as ChromeService import requests from Util.LightRagUtil import initialize_pg_rag from Util.PostgreSQLUtil import init_postgres_pool from Util.WxGzhUtil import init_wechat_browser, get_article_content # 删除重复的日志配置,只保留以下内容 logger = logging.getLogger('WxGzh') logger.setLevel(logging.INFO) # 确保只添加一个handler if not logger.handlers: handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(handler) # 添加微信请求头 header = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', 'Referer': 'https://mp.weixin.qq.com/' } async def get_wechat_sources(): """从t_wechat_source表获取微信公众号列表""" try: pool = await init_postgres_pool() async with pool.acquire() as conn: rows = await conn.fetch('SELECT * FROM t_wechat_source') return [dict(row) for row in rows] finally: await pool.close() async def is_article_exist(pool, article_url): """检查文章URL是否已存在数据库中""" try: async with pool.acquire() as conn: row = await conn.fetchrow(''' SELECT 1 FROM t_wechat_articles WHERE url = $1 LIMIT 1 ''', article_url) return row is not None except Exception as e: logging.error(f"检查文章存在性失败: {e}") return False # 出错时默认返回False,避免影响正常流程 async def save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, source_id): # 在这里调用 lightrag进行知识库构建 workspace = 'ChangChun' # 使用PG库后,这个是没有用的,但目前的项目代码要求必传,就写一个吧。 WORKING_DIR = f"./output" docx_name = f"{account_name}_{article_title}" # 组合来源和标题作为文档名 logger.info(f"开始处理文档: {docx_name}") try: rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace) await rag.ainsert(input=content, file_paths=[docx_name]) finally: if rag: await rag.finalize_storages() if pool: await pool.close() logger.info(f"保存文档到知识库成功: {docx_name}") try: async with pool.acquire() as conn: await conn.execute(''' INSERT INTO t_wechat_articles (title, source, url, publish_time, content, source_id) VALUES ($1, $2, $3, $4, $5, $6) ''', article_title, account_name, article_url, publish_time, content, source_id) except Exception as e: logging.error(f"保存文章失败: {e}") async def initialize_wechat_session(): """初始化微信会话,获取cookies和token""" with open('cookies.txt', 'r', encoding='utf-8') as f: content = f.read() cookies = json.loads(content) global driver # 添加这行 expiry = cookies["expiry"] if expiry: current_timestamp = time.time() if current_timestamp > expiry: logger.error("Cookie已过期") exit() del cookies["expiry"] options = Options() options.add_argument('-headless') driver = init_wechat_browser() url = 'https://mp.weixin.qq.com' response = requests.get(url=url, allow_redirects=False, cookies=cookies) if 'Location' in response.headers: redirect_url = response.headers.get("Location") token_match = re.findall(r'token=(\d+)', redirect_url) if token_match: token = token_match[0] return cookies, token return None, None async def get_wechat_account_list(cookies, token, account_name): """获取指定公众号的fakeid""" search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?' query_id = { 'action': 'search_biz', 'token': token, 'lang': 'zh_CN', 'f': 'json', 'ajax': '1', 'random': random.random(), 'query': account_name, 'begin': '0', 'count': '5' } search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id) lists = search_response.json().get('list')[0] return lists.get('fakeid') async def get_article_list(cookies, token, fakeid): """获取公众号文章列表""" appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?' query_id_data = { 'token': token, 'lang': 'zh_CN', 'f': 'json', 'ajax': '1', 'random': random.random(), 'action': 'list_ex', 'begin': '0', 'count': '5', 'query': '', 'fakeid': fakeid, 'type': '9' } query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data) return query_fakeid_response.json().get('app_msg_list') async def process_single_article(article_info, account_info, cookies, token): """处理单篇文章""" article_url = article_info.get('link') article_title = article_info.get('title') publish_time = datetime.datetime.fromtimestamp(int(article_info.get("update_time"))) if '试卷' in article_title: return False try: pool = await init_postgres_pool() # 先检查文章是否已存在 if await is_article_exist(pool, article_url): logger.info(f'文章已存在,跳过保存: {account_info["account_name"]}-{article_title}') return False content = get_article_content(article_url) await save_article_to_db(pool, article_title, account_info["account_name"], article_url, publish_time, content, account_info["id"]) return True except Exception as e: logger.error(f"处理文章时出错: {e}") return False finally: if 'pool' in locals(): await pool.close() async def process_wechat_account(account_info, cookies, token): """处理单个公众号的所有文章""" cnt = 0 fakeid = await get_wechat_account_list(cookies, token, account_info["account_name"]) articles = await get_article_list(cookies, token, fakeid) for article in articles: success = await process_single_article(article, account_info, cookies, token) if success: cnt += 1 time.sleep(1) logger.info(f"成功获取公众号: {account_info['account_name']} {cnt}篇文章。") return cnt async def main(): """主函数""" while True: try: logger.info("开始执行微信公众号文章采集任务") cookies, token = await initialize_wechat_session() if not cookies or not token: logger.error("初始化微信会话失败") continue account_list = await get_wechat_sources() for account in account_list: await process_wechat_account(account, cookies, token) logger.info("本次采集任务完成,等待30分钟后再次执行") await asyncio.sleep(30 * 60) # 30分钟 except Exception as e: logger.error(f"主循环发生错误: {e}") await asyncio.sleep(30 * 60) # 出错后也等待30分钟 finally: if 'driver' in globals(): driver.quit() if __name__ == '__main__': loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(main()) finally: loop.close()