You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

242 lines
8.2 KiB

1 week ago
# 详解Python + Selenium 批量采集微信公众号搭建自己的微信公众号每日AI简报告别信息焦虑
# https://blog.csdn.net/k352733625/article/details/149222945
# 微信爬爬猫---公众号文章抓取代码分析
# https://blog.csdn.net/yajuanpi4899/article/details/121584268
1 week ago
"""
# 查看selenium版本
pip show selenium
4.34.2
1 week ago
1 week ago
# 查看Chrome浏览器版本
chrome://version/
138.0.7204.101 (正式版本) 64
# 下载驱动包
https://googlechromelabs.github.io/chrome-for-testing/
https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.94/win64/chromedriver-win64.zip
"""
1 week ago
import asyncio
1 week ago
import datetime
1 week ago
import json
1 week ago
import logging
import random
import re
1 week ago
import time
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service as ChromeService
1 week ago
import requests
1 week ago
from Util.LightRagUtil import initialize_pg_rag
1 week ago
from Util.PostgreSQLUtil import init_postgres_pool
1 week ago
from Util.WxGzhUtil import init_wechat_browser, get_article_content
1 week ago
# 删除重复的日志配置,只保留以下内容
1 week ago
logger = logging.getLogger('WxGzh')
1 week ago
logger.setLevel(logging.INFO)
1 week ago
# 确保只添加一个handler
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
1 week ago
1 week ago
# 添加微信请求头
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Referer': 'https://mp.weixin.qq.com/'
}
1 week ago
1 week ago
async def get_wechat_sources():
"""从t_wechat_source表获取微信公众号列表"""
try:
pool = await init_postgres_pool()
async with pool.acquire() as conn:
1 week ago
rows = await conn.fetch('SELECT * FROM t_wechat_source')
1 week ago
return [dict(row) for row in rows]
finally:
await pool.close()
1 week ago
1 week ago
async def is_article_exist(pool, article_url):
"""检查文章URL是否已存在数据库中"""
try:
async with pool.acquire() as conn:
row = await conn.fetchrow('''
1 week ago
SELECT 1
FROM t_wechat_articles
WHERE url = $1 LIMIT 1
1 week ago
''', article_url)
1 week ago
return row is not None
except Exception as e:
logging.error(f"检查文章存在性失败: {e}")
return False # 出错时默认返回False避免影响正常流程
1 week ago
async def save_article_to_db(pool, article_title, account_name, article_url, publish_time, content, source_id):
1 week ago
# 先检查文章是否已存在
if await is_article_exist(pool, article_url):
1 week ago
logger.info(f"文章已存在,跳过保存: {account_name}-{article_title}")
1 week ago
return
1 week ago
# 在这里调用 lightrag进行知识库构建
workspace = 'ChangChun'
# 使用PG库后这个是没有用的,但目前的项目代码要求必传,就写一个吧。
WORKING_DIR = f"./output"
docx_name = f"{account_name}_{article_title}" # 组合来源和标题作为文档名
logger.info(f"开始处理文档: {docx_name}")
try:
rag = await initialize_pg_rag(WORKING_DIR=WORKING_DIR, workspace=workspace)
await rag.ainsert(input=content, file_paths=[docx_name])
finally:
if rag:
await rag.finalize_storages()
if pool:
await pool.close()
logger.info(f"保存文档到知识库成功: {docx_name}")
1 week ago
try:
async with pool.acquire() as conn:
await conn.execute('''
1 week ago
INSERT INTO t_wechat_articles
(title, source, url, publish_time, content, source_id)
VALUES ($1, $2, $3, $4, $5, $6)
''', article_title, account_name, article_url,
1 week ago
publish_time, content, source_id)
1 week ago
except Exception as e:
logging.error(f"保存文章失败: {e}")
1 week ago
1 week ago
async def initialize_wechat_session():
"""初始化微信会话获取cookies和token"""
1 week ago
with open('cookies.txt', 'r', encoding='utf-8') as f:
content = f.read()
cookies = json.loads(content)
1 week ago
global driver # 添加这行
1 week ago
expiry = cookies["expiry"]
1 week ago
if expiry:
current_timestamp = time.time()
if current_timestamp > expiry:
1 week ago
logger.error("Cookie已过期")
1 week ago
exit()
1 week ago
1 week ago
del cookies["expiry"]
1 week ago
1 week ago
options = Options()
options.add_argument('-headless')
1 week ago
driver = init_wechat_browser()
1 week ago
1 week ago
url = 'https://mp.weixin.qq.com'
response = requests.get(url=url, allow_redirects=False, cookies=cookies)
1 week ago
1 week ago
if 'Location' in response.headers:
redirect_url = response.headers.get("Location")
token_match = re.findall(r'token=(\d+)', redirect_url)
if token_match:
token = token_match[0]
1 week ago
return cookies, token
return None, None
async def get_wechat_account_list(cookies, token, account_name):
"""获取指定公众号的fakeid"""
search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?'
query_id = {
'action': 'search_biz',
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'query': account_name,
'begin': '0',
'count': '5'
}
1 week ago
1 week ago
search_response = requests.get(search_url, cookies=cookies, headers=header, params=query_id)
lists = search_response.json().get('list')[0]
return lists.get('fakeid')
async def get_article_list(cookies, token, fakeid):
"""获取公众号文章列表"""
appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?'
query_id_data = {
'token': token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'action': 'list_ex',
'begin': '0',
'count': '5',
'query': '',
'fakeid': fakeid,
'type': '9'
}
query_fakeid_response = requests.get(appmsg_url, cookies=cookies, headers=header, params=query_id_data)
return query_fakeid_response.json().get('app_msg_list')
async def process_single_article(article_info, account_info, cookies, token):
"""处理单篇文章"""
article_url = article_info.get('link')
article_title = article_info.get('title')
publish_time = datetime.datetime.fromtimestamp(int(article_info.get("update_time")))
if '试卷' in article_title:
return False
1 week ago
1 week ago
try:
1 week ago
pool = await init_postgres_pool()
content = get_article_content(article_url)
await save_article_to_db(pool, article_title, account_info["account_name"],
article_url, publish_time, content, account_info["id"])
return True
except Exception as e:
logger.error(f"处理文章时出错: {e}")
return False
1 week ago
finally:
1 week ago
if 'pool' in locals():
await pool.close()
async def process_wechat_account(account_info, cookies, token):
"""处理单个公众号的所有文章"""
cnt = 0
fakeid = await get_wechat_account_list(cookies, token, account_info["account_name"])
articles = await get_article_list(cookies, token, fakeid)
for article in articles:
success = await process_single_article(article, account_info, cookies, token)
if success:
cnt += 1
time.sleep(1)
logger.info(f"成功获取公众号: {account_info['account_name']} {cnt}篇文章。")
return cnt
async def main():
"""主函数"""
cookies, token = await initialize_wechat_session()
if not cookies or not token:
logger.error("初始化微信会话失败")
return
account_list = await get_wechat_sources()
for account in account_list:
await process_wechat_account(account, cookies, token)
1 week ago
1 week ago
driver.quit()
1 week ago
1 week ago
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()