# 详解(一)Python + Selenium 批量采集微信公众号,搭建自己的微信公众号每日AI简报,告别信息焦虑 # https://blog.csdn.net/k352733625/article/details/149222945 # 微信爬爬猫---公众号文章抓取代码分析 # https://blog.csdn.net/yajuanpi4899/article/details/121584268 """ # 查看selenium版本 pip show selenium 4.34.2 # 查看Chrome浏览器版本 chrome://version/ 138.0.7204.101 (正式版本) (64 位) # 下载驱动包 https://googlechromelabs.github.io/chrome-for-testing/ https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.94/win64/chromedriver-win64.zip """ import asyncio import datetime import json import logging import random import re import time import requests from selenium.webdriver.chrome.options import Options from Util.LightRagUtil import initialize_pg_rag, initialize_rag from Util.PostgreSQLUtil import init_postgres_pool from Util.WxGzhUtil import init_wechat_browser, get_article_content # 删除重复的日志配置,只保留以下内容 logger = logging.getLogger('WxGzh') logger.setLevel(logging.INFO) # 确保只添加一个handler if not logger.handlers: handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(handler) # 添加微信请求头 header = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', 'Referer': 'https://mp.weixin.qq.com/' } async def get_wechat_sources(): """从t_wechat_source表获取微信公众号列表""" try: pool = await init_postgres_pool() async with pool.acquire() as conn: rows = await conn.fetch('SELECT * FROM t_wechat_source') return [dict(row) for row in rows] finally: await pool.close() async def is_article_exist(pool, article_url): """检查文章URL是否已存在数据库中""" try: async with pool.acquire() as conn: row = await conn.fetchrow(''' SELECT 1 FROM t_wechat_articles WHERE url = $1 LIMIT 1 ''', article_url) return row is not None except Exception as e: logging.error(f"检查文章存在性失败: {e}") return False # 出错时默认返回False,避免影响正常流程 async def save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, source_id): # 在这里调用 lightrag进行知识库构建 WORKING_DIR = f"../Topic/ChangChun" docx_name = f"{account_name}_{article_title}" # 组合来源和标题作为文档名 logger.info(f"开始处理文档: {docx_name}") try: # 注意:默认设置使用NetworkX rag = await initialize_rag(WORKING_DIR) await rag.ainsert(content) logger.info(f"索引完成: {docx_name}") except Exception as e: print(f"An error occurred: {e}") finally: await rag.finalize_storages() try: async with pool.acquire() as conn: await conn.execute(''' INSERT INTO t_wechat_articles (title, source, url, publish_time, content, source_id) VALUES ($1, $2, $3, $4, $5, $6) ''', article_title, account_name, article_url, publish_time, content, source_id) logger.info(f"保存文档到知识库成功: {docx_name}") except Exception as e: logging.error(f"保存文章失败: {e}") async def initialize_wechat_session(): """初始化微信会话,获取cookies和token""" with open('cookies.txt', 'r', encoding='utf-8') as f: content = f.read() cookies = json.loads(content) global driver # 添加这行 expiry = cookies["expiry"] if expiry: current_timestamp = time.time() if current_timestamp > expiry: logger.error("Cookie已过期") exit() del cookies["expiry"] options = Options() options.add_argument('-headless') driver = init_wechat_browser() url = 'https://mp.weixin.qq.com' response = requests.get(url=url, allow_redirects=False, cookies=cookies) if 'Location' in response.headers: redirect_url = response.headers.get("Location") token_match = re.findall(r'token=(\d+)', redirect_url) if token_match: token = token_match[0] return cookies, token return None, None async def get_wechat_account_list(cookies, token, account_name): """获取指定公众号的fakeid""" search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?' query_id = { 'action': 'search_biz', 'token': token, 'lang': 'zh_CN', 'f': 'json', 'ajax': '1', 'random': random.random(), 'query': account_name, 'begin': '0', 'count': '5' } search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id) lists = search_response.json().get('list')[0] return lists.get('fakeid') async def get_article_list(cookies, token, fakeid): """获取公众号文章列表""" appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?' query_id_data = { 'token': token, 'lang': 'zh_CN', 'f': 'json', 'ajax': '1', 'random': random.random(), 'action': 'list_ex', 'begin': '0', 'count': '5', 'query': '', 'fakeid': fakeid, 'type': '9' } query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data) return query_fakeid_response.json().get('app_msg_list') async def process_single_article(article_info, account_info, cookies, token): """处理单篇文章""" article_url = article_info.get('link') article_title = article_info.get('title') publish_time = datetime.datetime.fromtimestamp(int(article_info.get("update_time"))) if '试卷' in article_title: return False try: pool = await init_postgres_pool() # 先检查文章是否已存在 if await is_article_exist(pool, article_url): logger.info(f'文章已存在,跳过保存: {account_info["account_name"]}-{article_title}') return False content = get_article_content(article_url) await save_article_to_db(pool, article_title, account_info["account_name"], article_url, publish_time, content, account_info["id"]) return True except Exception as e: logger.error(f"处理文章时出错: {e}") return False finally: if 'pool' in locals(): await pool.close() async def process_wechat_account(account_info, cookies, token): """处理单个公众号的所有文章""" cnt = 0 fakeid = await get_wechat_account_list(cookies, token, account_info["account_name"]) articles = await get_article_list(cookies, token, fakeid) for article in articles: success = await process_single_article(article, account_info, cookies, token) if success: cnt += 1 time.sleep(1) logger.info(f"成功获取公众号: {account_info['account_name']} {cnt}篇文章。") return cnt async def main(): """主函数""" while True: try: logger.info("开始执行微信公众号文章采集任务") cookies, token = await initialize_wechat_session() if not cookies or not token: logger.error("初始化微信会话失败") continue account_list = await get_wechat_sources() for account in account_list: await process_wechat_account(account, cookies, token) logger.info("本次采集任务完成,等待30分钟后再次执行") await asyncio.sleep(30 * 60) # 30分钟 except Exception as e: logger.error(f"主循环发生错误: {e}") await asyncio.sleep(30 * 60) # 出错后也等待30分钟 finally: if 'driver' in globals(): driver.quit() if __name__ == '__main__': loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(main()) finally: loop.close()