Prerequisites
- Install required Python packages:
pip install pyquery requests selenium xlwt
- Download PhantomJS 2.1.1 for Windows from the official repository. Extract the archive, and move the
phantomjs.exe file to your Python installation's Scripts directory (e.g., C:\Python39\Scripts).
Full Implementation Code
import time
import re
import json
import os
from urllib.parse import quote
from pyquery import PyQuery as pq
from selenium import webdriver
import xlwt
import requests
class WeChatArticleScraper:
def __init__(self, account_name):
self.account_name = account_name
# Sogou WeChat Search URL
self.sogou_search_url = f"http://weixin.sogou.com/weixin?type=1&query={quote(account_name)}&ie=utf8&s_from=input&_sug_=n&_sug_type_="
# Request headers to mimic browser
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.timeout = 10
self.session = requests.Session()
# Excel initialization
self.workbook = xlwt.Workbook(encoding='utf-8')
self.worksheet = self.workbook.add_sheet(f"Articles_{time.strftime('%Y%m%d')}")
self.excel_headers = ["ID", "Publish Date", "Title", "URL", "Summary"]
# Write Excel headers
for col, header in enumerate(self.excel_headers):
self.worksheet.write(0, col, header)
self.article_counter = 1
# Create output directory for saved data
self.output_dir = account_name
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
def log(self, message):
"""Print message with timestamp for debugging"""
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
print(f"[{timestamp}] {message}")
def fetch_search_results(self):
"""Fetch HTML from Sogou WeChat search results page"""
self.log(f"Initiating search for official account: {self.account_name}")
response = self.session.get(self.sogou_search_url, headers=self.headers, timeout=self.timeout)
response.encoding = 'utf-8'
return response.text
def get_account_homepage_url(self, search_html):
"""Extract the official account homepage URL from search results"""
doc = pq(search_html)
account_url = doc('div.txt-box p.tit a').attr('href')
if not account_url:
self.log("Failed to locate official account in search results")
return None
self.log(f"Official account homepage found: {account_url}")
return account_url
def render_dynamic_page(self, url):
"""Use PhantomJS to render JS-loaded page content"""
self.log(f"Rendering dynamic page content: {url}")
# Update the executable_path to match your PhantomJS location
driver = webdriver.PhantomJS(executable_path=r'C:\Python39\Scripts\phantomjs.exe')
driver.get(url)
time.sleep(3) # Allow time for JavaScript to load content
page_html = driver.execute_script("return document.documentElement.outerHTML")
driver.quit()
return page_html
def detect_anti_scraping(self, page_html):
"""Check if anti-scraping measures are triggered"""
doc = pq(page_html)
if doc('#verify_change').text():
self.log("Anti-scraping verification detected. Retry later or use a proxy.")
return True
return False
def extract_article_nodes(self, account_page_html):
"""Extract article elements from the official account page"""
doc = pq(account_page_html)
# Target article containers (handles both accounts with more/less than 10 articles)
articles = doc('div.weui_media_box.appmsg') or doc('div.weui_msg_card')
self.log(f"Found {len(articles)} articles on the account page")
return articles
def get_cover_image(self, article_node):
"""Extract cover image URL from article element"""
style_attr = article_node('.weui_media_hd').attr('style')
if not style_attr:
return ""
match = re.search(r'background-image:url\((.*?)\)', style_attr)
if match:
img_url = match.group(1)
self.log(f"Cover image URL: {img_url}")
return img_url
return ""
def fetch_article_content_html(self, article_url):
"""Fetch and render full article content"""
page_html = self.render_dynamic_page(article_url)
doc = pq(page_html)
return doc('#js_content').html()
def save_article_to_html(self, title, publish_date, content):
"""Save full article content to local HTML file"""
# Sanitize filename to avoid invalid characters
safe_title = title.replace('/', '_').replace('\\', '_')
safe_date = publish_date.replace(':', '-')
file_path = f"{self.output_dir}/{safe_title}_{safe_date}.html"
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
self.log(f"Article saved to: {file_path}")
def process_single_article(self, article_node):
"""Process a single article: extract data, save to Excel and HTML"""
title = article_node('h4.weui_media_title').text().strip()
if not title:
self.log("Skipping article with empty title")
return None
# Build full article URL
article_relative_url = article_node('h4.weui_media_title').attr('hrefs')
article_url = f"http://mp.weixin.qq.com{article_relative_url}"
summary = article_node('.weui_media_desc').text().strip()
publish_date = article_node('.weui_media_extra_info').text().strip()
cover_img = self.get_cover_image(article_node)
content_html = self.fetch_article_content_html(article_url)
# Save article HTML to disk
self.save_article_to_html(title, publish_date, content_html)
# Write article metadata to Excel
row = self.article_counter
self.worksheet.write(row, 0, self.article_counter)
self.worksheet.write(row, 1, publish_date)
self.worksheet.write(row, 2, title)
self.worksheet.write(row, 3, article_url)
self.worksheet.write(row, 4, summary)
self.workbook.save(f"{self.output_dir}/{self.account_name}_articles.xls")
self.article_counter += 1
# Return structured article data
return {
'title': title,
'url': article_url,
'summary': summary,
'publish_date': publish_date,
'cover_image_url': cover_img,
'content_html': content_html
}
def process_all_articles(self, article_nodes):
"""Process all extracted articles and save data to JSON"""
articles_data = []
for idx, node in enumerate(article_nodes.items(), 1):
self.log(f"Processing article {idx}/{len(article_nodes)}")
article_data = self.process_single_article(pq(node[1]))
if article_data:
articles_data.append(article_data)
# Save structured data as JSON
json_path = f"{self.output_dir}/{self.account_name}_articles.json"
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(articles_data, f, ensure_ascii=False, indent=2)
self.log(f"Structured article data saved to: {json_path}")
return articles_data
def run(self):
"""Main entry point for the scraper"""
# Step 1: Get search results from Sogou
search_html = self.fetch_search_results()
# Step 2: Extract official account homepage URL
account_url = self.get_account_homepage_url(search_html)
if not account_url:
return
# Step 3: Render dynamic account page with PhantomJS
account_page_html = self.render_dynamic_page(account_url)
# Step 4: Check for anti-scraping measures
if self.detect_anti_scraping(account_page_html):
return
# Step 5: Extract article elements from account page
article_nodes = self.extract_article_nodes(account_page_html)
if not article_nodes:
self.log("No articles found on the official account page")
return
# Step 6: Process and save all articles
self.process_all_articles(article_nodes)
self.log("Scraping process completed successfully!")
if __name__ == "__main__":
target_account = input("Enter WeChat official account name to scrape: ").strip()
# Use default account if input is empty
if not target_account:
target_account = "DataBureau"
scraper = WeChatArticleScraper(target_account)
scraper.run()
Key Notes
- PhantomJS Path: Update the
executable_path in render_dynamic_page() to match you're actual PhantomJS installasion location.
- Anti-Scraping: If the scraper detects anti-scraping verification, try again later or use a proxy IP to avoid blocks.
- Output: All scraped data (HTML articles, Excel metadata, JSON structured data) is saved to a directory named after the target official acount.