Building a Bilibili Programming Course Data Pipeline with PyQt and Pyecharts
To construct a data pipeline for Bilibili programming course analysis, we implement a multi-threaded PyQt application that scrapes, processes, and visualizes video data. The system integrates web scraping, database storage, and real-time chart updates.
Data Scraping and Processing
A custom QThread subclas handles concurrent data collection from Bilibili search results. The scraper extracts video metadata including titles, authors, view counts, comments, and durations. Data is categorized based on programming language keywords found in video titles.
from PyQt5.QtCore import QThread, pyqtSignal
import requests
from lxml import etree
class ScraperThread(QThread):
data_ready = pyqtSignal(str, str, str, str, int, int, str)
def __init__(self):
super().__init__()
self.is_running = True
self.current_page = 1
self.programming_keywords = [
'C语言', 'C++', 'Python', 'PHP', 'Algorithm',
'Java', 'Go', 'MySQL', 'C#', 'Scratch', 'Web', 'Computer'
]
def run(self):
base_url = 'https://search.bilibili.com/all'
params = {
'keyword': '编程课程',
'search_source': '5'
}
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
while self.current_page <= 34 and self.is_running:
params['page'] = self.current_page
response = requests.get(base_url, params=params, headers=headers)
html_content = response.content.decode('utf-8')
parsed_html = etree.HTML(html_content)
video_links = parsed_html.xpath('//div[@class="bili-video-card__info--right"]/a/@href')
video_titles = parsed_html.xpath('//div[@class="bili-video-card__info--right"]//h3/@title')
video_authors = parsed_html.xpath('//div[@class="bili-video-card__info--right"]//span[@class="bili-video-card__info--author"]/text()')
view_counts = parsed_html.xpath('//div[@class="bili-video-card__stats--left"]/span[1]/span/text()')
comment_counts = parsed_html.xpath('//div[@class="bili-video-card__stats--left"]/span[2]/span/text()')
durations = parsed_html.xpath('//div[@class="bili-video-card__stats"]/span[@class="bili-video-card__stats__duration"]/text()')
for link, title, author, views, comments, duration in zip(
video_links, video_titles, video_authors, view_counts, comment_counts, durations
):
category = self._categorize_video(title)
if category:
processed_views = self._normalize_count(views)
processed_comments = self._normalize_count(comments)
self.data_ready.emit(
link, title, author, category,
processed_views, processed_comments, duration
)
self.current_page += 1
def _categorize_video(self, title):
lower_title = title.lower()
for keyword in self.programming_keywords:
if keyword.lower() in lower_title:
return keyword
return None
def _normalize_count(self, count_str):
if '万' in count_str:
return int(float(count_str.replace('万', '')) * 10000)
return int(count_str)
def stop(self):
self.is_running = False
Database Integration
Video data is stored in a MySQL database with a structured schema:
import pymysql
def initialize_database_connection():
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='bilibili_data',
charset='utf8mb4'
)
return connection
def create_videos_table(cursor):
create_table_sql = """
CREATE TABLE IF NOT EXISTS videos (
video_id VARCHAR(50) PRIMARY KEY,
title TEXT,
author VARCHAR(100),
category VARCHAR(50),
view_count INT,
comment_count INT,
duration VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_table_sql)
def insert_video_data(cursor, video_data):
insert_sql = """
INSERT INTO videos (video_id, title, author, category, view_count, comment_count, duration)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
view_count = VALUES(view_count),
comment_count = VALUES(comment_count)
"""
cursor.execute(insert_sql, video_data)
Data Visualization with Pyecharts
The application generates multiple chart types to analyze the collected data:
Top 20 Videos by Views
from pyecharts.charts import Bar
from pyecharts import options as opts
def create_view_count_chart(cursor):
query = """
SELECT category, SUM(view_count) as total_views
FROM videos
GROUP BY category
ORDER BY total_views DESC
LIMIT 20
"""
cursor.execute(query)
results = cursor.fetchall()
categories = [row[0] for row in results]
view_totals = [row[1] for row in results]
chart = Bar()
chart.add_xaxis(categories)
chart.add_yaxis("Total Views", view_totals)
chart.set_global_opts(
title_opts=opts.TitleOpts(title="Top 20 Programming Categories by Views"),
xaxis_opts=opts.AxisOpts(
axislabel_opts=opts.LabelOpts(rotate=-45, font_size=10)
),
yaxis_opts=opts.AxisOpts(name="View Count")
)
chart.render("top_views_chart.html")
return chart
Comment Analysis Chart
def create_comment_analysis_chart(cursor):
query = """
SELECT category, comment_count
FROM videos
ORDER BY comment_count DESC
LIMIT 20
"""
cursor.execute(query)
results = cursor.fetchall()
categories = [row[0] for row in results]
comments = [row[1] for row in results]
chart = Bar()
chart.add_xaxis(categories)
chart.add_yaxis("Comments", comments)
chart.reversal_axis()
chart.set_global_opts(
title_opts=opts.TitleOpts(title="Top 20 Videos by Comments"),
xaxis_opts=opts.AxisOpts(name="Comment Count")
)
chart.render("comments_chart.html")
return chart
Category Distribution Funnel
from pyecharts.charts import Funnel
from pyecharts.globals import ThemeType
def create_category_funnel(cursor):
query = "SELECT category, COUNT(*) as video_count FROM videos GROUP BY category"
cursor.execute(query)
results = cursor.fetchall()
sorted_data = sorted(results, key=lambda x: x[1], reverse=True)
funnel = Funnel(init_opts=opts.InitOpts(theme=ThemeType.LIGHT))
funnel.add(
"Video Count",
sorted_data,
gap=2,
label_opts=opts.LabelOpts(position="inside")
)
funnel.set_global_opts(
title_opts=opts.TitleOpts(title="Programming Category Distribution")
)
funnel.render("category_funnel.html")
return funnel
PyQt Application Interface
The main application window manages data collection, visualization, and user enteraction:
from PyQt5.QtWidgets import QMainWindow, QTableWidgetItem
from PyQt5.QtCore import Qt
class DataAnalysisApp(QMainWindow):
def __init__(self):
super().__init__()
self.setup_ui()
self.initialize_components()
self.category_counts = {}
def setup_ui(self):
self.setWindowTitle("Bilibili Course Analyzer")
self.setGeometry(100, 100, 1400, 800)
# Initialize UI components from Qt Designer file
self.initialize_table_widget()
self.setup_control_buttons()
def initialize_table_widget(self):
self.data_table.setColumnCount(7)
headers = [
'Video ID', 'Title', 'Author', 'Category',
'Views', 'Comments', 'Duration'
]
self.data_table.setHorizontalHeaderLabels(headers)
self.data_table.setColumnWidth(1, 400)
self.data_table.setSortingEnabled(True)
def setup_control_buttons(self):
self.start_button.clicked.connect(self.toggle_data_collection)
self.visualize_button.clicked.connect(self.generate_visualizations)
def initialize_components(self):
self.db_connection = initialize_database_connection()
self.db_cursor = self.db_connection.cursor()
create_videos_table(self.db_cursor)
self.scraper_thread = ScraperThread()
self.scraper_thread.data_ready.connect(self.process_video_data)
def toggle_data_collection(self):
if self.scraper_thread.isRunning():
self.scraper_thread.stop()
self.start_button.setText("Start Collection")
else:
self.scraper_thread.start()
self.start_button.setText("Stop Collection")
def process_video_data(self, video_id, title, author, category, views, comments, duration):
# Insert into database
video_record = (video_id, title, author, category, views, comments, duration)
insert_video_data(self.db_cursor, video_record)
self.db_connection.commit()
# Update UI table
row_position = self.data_table.rowCount()
self.data_table.insertRow(row_position)
table_items = [
QTableWidgetItem(video_id),
QTableWidgetItem(title),
QTableWidgetItem(author),
QTableWidgetItem(category),
QTableWidgetItem(str(views)),
QTableWidgetItem(str(comments)),
QTableWidgetItem(duration)
]
for col, item in enumerate(table_items):
item.setTextAlignment(Qt.AlignCenter)
self.data_table.setItem(row_position, col, item)
# Update category statistics
self.category_counts[category] = self.category_counts.get(category, 0) + 1
self.update_statistics_display()
def generate_visualizations(self):
# Create all visualization charts
view_chart = create_view_count_chart(self.db_cursor)
comment_chart = create_comment_analysis_chart(self.db_cursor)
funnel_chart = create_category_funnel(self.db_cursor)
# Display charts in web views
self.display_chart_in_view(view_chart, self.views_webview)
self.display_chart_in_view(comment_chart, self.comments_webview)
self.display_chart_in_view(funnel_chart, self.funnel_webview)
def display_chart_in_view(self, chart, webview):
chart.render("temp_chart.html")
with open("temp_chart.html", "r", encoding="utf-8") as f:
html_content = f.read()
webview.setHtml(html_content)
def update_statistics_display(self):
stats_text = "Categories: "
for category, count in sorted(self.category_counts.items()):
stats_text += f"{category}: {count} "
self.stats_label.setText(stats_text)
def closeEvent(self, event):
self.scraper_thread.stop()
self.scraper_thread.wait()
self.db_cursor.close()
self.db_connection.close()
event.accept()
Application Structure
The complete application consists of these modules:
- main.py - Application entry point
- scraper.py - Web scraping thread implementation
- database.py - Database connection and operations
- charts.py - Data visualization functions
- ui_mainwindow.py - Qt Designer generated interface
- app_logic.py - Main application logic and event handling
Key Features
- Multi-threaded Data Collection: Background scraping prevents UI freezing
- Real-time Data Processing: Immediate categorization and normalization
- Interactive Visualizations: Multiple chart types with automatic updates
- Persistent Storage: MySQL database for data retention
- Responsive Interface: PyQt-based GUI with tabbed navigation
- Error Handling: Graceful recovery from network or database issues
Performance Considerations
- Implement request rate limiting to avoid IP blocking
- Use connection pooling for database operations
- Cache frequent accessed visualization data
- Implement data validation before insertion
- Add progress indicators for long-running operations