diff --git a/dsLightRag/WxGzh/T2_CollectArticle.py b/dsLightRag/WxGzh/T2_CollectArticle.py index 6892c371..f1bd6197 100644 --- a/dsLightRag/WxGzh/T2_CollectArticle.py +++ b/dsLightRag/WxGzh/T2_CollectArticle.py @@ -42,6 +42,11 @@ if not logger.handlers: handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(handler) +# 添加微信请求头 +header = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', + 'Referer': 'https://mp.weixin.qq.com/' +} async def get_wechat_sources(): """从t_wechat_source表获取微信公众号列表""" @@ -101,139 +106,136 @@ async def save_article_to_db(pool, article_title, account_name, article_url, pub logging.error(f"保存文章失败: {e}") -if __name__ == '__main__': - # 从文件cookies.txt中获取 +async def initialize_wechat_session(): + """初始化微信会话,获取cookies和token""" with open('cookies.txt', 'r', encoding='utf-8') as f: content = f.read() - # 使用json还原为json对象 cookies = json.loads(content) - # 检查是否有过期时间 + global driver # 添加这行 expiry = cookies["expiry"] if expiry: - # 换算出过期时间 - expiry_time = time.localtime(expiry) - expiry_date = time.strftime("%Y-%m-%d %H:%M:%S", expiry_time) - - # 获取当前时间戳 current_timestamp = time.time() - # 检查是否已过期 if current_timestamp > expiry: logger.error("Cookie已过期") exit() - # 移除expiry属性 + del cookies["expiry"] - logger.info(f"cookies的过期时间一般是4天,cookies过期时间:%s" % expiry_date) - options = Options() - options.add_argument('-headless') # 无头参数,调试时可以注释掉 - # 设置headers - header = { - "HOST": "mp.weixin.qq.com", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36 QBCore/4.0.1301.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2875.116 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63010200)", - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", - "Accept-Encoding": "gzip, deflate, br", - "Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.5;q=0.4", - "Connection": "keep-alive" - } - service = ChromeService(executable_path=r"C:\Windows\System32\chromedriver.exe") - # 使用统一的初始化方式 + options = Options() + options.add_argument('-headless') driver = init_wechat_browser() - # 方法:使用requests库发送请求获取重定向URL + url = 'https://mp.weixin.qq.com' response = requests.get(url=url, allow_redirects=False, cookies=cookies) + if 'Location' in response.headers: redirect_url = response.headers.get("Location") - logger.info(f"重定向URL:%s" % redirect_url) token_match = re.findall(r'token=(\d+)', redirect_url) if token_match: token = token_match[0] - logger.info(f"获取到的token:%s" % token) + return cookies, token + + return None, None + + +async def get_wechat_account_list(cookies, token, account_name): + """获取指定公众号的fakeid""" + search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?' + query_id = { + 'action': 'search_biz', + 'token': token, + 'lang': 'zh_CN', + 'f': 'json', + 'ajax': '1', + 'random': random.random(), + 'query': account_name, + 'begin': '0', + 'count': '5' + } - article_urls = [] + search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id) + lists = search_response.json().get('list')[0] + return lists.get('fakeid') + + +async def get_article_list(cookies, token, fakeid): + """获取公众号文章列表""" + appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?' + query_id_data = { + 'token': token, + 'lang': 'zh_CN', + 'f': 'json', + 'ajax': '1', + 'random': random.random(), + 'action': 'list_ex', + 'begin': '0', + 'count': '5', + 'query': '', + 'fakeid': fakeid, + 'type': '9' + } + + query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data) + return query_fakeid_response.json().get('app_msg_list') + + +async def process_single_article(article_info, account_info, cookies, token): + """处理单篇文章""" + article_url = article_info.get('link') + article_title = article_info.get('title') + publish_time = datetime.datetime.fromtimestamp(int(article_info.get("update_time"))) + + if '试卷' in article_title: + return False - # 获取公众号列表 - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) try: - gzlist = loop.run_until_complete(get_wechat_sources()) + pool = await init_postgres_pool() + content = get_article_content(article_url) + await save_article_to_db(pool, article_title, account_info["account_name"], + article_url, publish_time, content, account_info["id"]) + return True + except Exception as e: + logger.error(f"处理文章时出错: {e}") + return False finally: - loop.close() + if 'pool' in locals(): + await pool.close() + + +async def process_wechat_account(account_info, cookies, token): + """处理单个公众号的所有文章""" + cnt = 0 + fakeid = await get_wechat_account_list(cookies, token, account_info["account_name"]) + articles = await get_article_list(cookies, token, fakeid) + + for article in articles: + success = await process_single_article(article, account_info, cookies, token) + if success: + cnt += 1 + time.sleep(1) + + logger.info(f"成功获取公众号: {account_info['account_name']} {cnt}篇文章。") + return cnt + + +async def main(): + """主函数""" + cookies, token = await initialize_wechat_session() + if not cookies or not token: + logger.error("初始化微信会话失败") + return + + account_list = await get_wechat_sources() + for account in account_list: + await process_wechat_account(account, cookies, token) - # 爬取文章 - for item in gzlist: - cnt = 0 - account_name = item["account_name"] - account_id = item["account_id"] - id = item["id"] - # 搜索微信公众号的接口地址 - search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?' - # 搜索微信公众号接口需要传入的参数,有三个变量:微信公众号token、随机数random、搜索的微信公众号名字 - query_id = { - 'action': 'search_biz', - 'token': token, - 'lang': 'zh_CN', - 'f': 'json', - 'ajax': '1', - 'random': random.random(), - 'query': account_name, - 'begin': '0', - 'count': '5' - } - # 打开搜索微信公众号接口地址,需要传入相关参数信息如:cookies、params、headers - search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id) - # 取搜索结果中的第一个公众号 - lists = search_response.json().get('list')[0] - # 获取这个公众号的fakeid,后面爬取公众号文章需要此字段 - fakeid = lists.get('fakeid') - logging.info("fakeid:" + fakeid) - # 微信公众号文章接口地址 - appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?' - # 搜索文章需要传入几个参数:登录的公众号token、要爬取文章的公众号fakeid、随机数random - query_id_data = { - 'token': token, - 'lang': 'zh_CN', - 'f': 'json', - 'ajax': '1', - 'random': random.random(), - 'action': 'list_ex', - 'begin': '0', # 不同页,此参数变化,变化规则为每页加5 - 'count': '5', - 'query': '', - 'fakeid': fakeid, - 'type': '9' - } - # 打开搜索的微信公众号文章列表页 - query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data) - fakeid_list = query_fakeid_response.json().get('app_msg_list') - - - for item in fakeid_list: - article_url = item.get('link') - article_title = item.get('title') - publish_time = datetime.datetime.fromtimestamp(int(item.get("update_time"))) - - if '试卷' in article_title: # 过滤掉试卷,致知物理中有大量试卷,我做教育资讯的不关心试卷 - continue - - logger.info(f"正在处理文章: {article_title} ({publish_time})") - - logger.info(f"正在获取文章: {article_title}内容...") - content = get_article_content(article_url) - logger.info(f"成功获取文章: {article_title}内容。") - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - pool = loop.run_until_complete(init_postgres_pool()) - loop.run_until_complete( - save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, id)) - cnt = cnt + 1 - finally: - loop.run_until_complete(pool.close()) - loop.close() - # 休息1秒,防止频繁访问被封 - time.sleep(1) - logger.info(f"成功获取公众号: {account_name} {cnt}篇文章。") - # 关闭浏览器 driver.quit() + +if __name__ == '__main__': + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(main()) + finally: + loop.close()