Concurrency Patterns in Web Scraping
Coroutines
Executing Multiple Tasks Concurrently
import asyncio
async def task_one():
for _ in range(5):
print('task-one...')
await asyncio.sleep(1)
print(123)
async def task_two():
for _ in range(5):
print('task-two...')
await asyncio.sleep(1)
print(456)
loop = asyncio.get_event_loop()
coro_list = [task_one(), task_two()]
loop.run_until_complete(asyncio.wait(coro_list))
Running Coroutines in Modern Python Versions
# For Python 3.10+
import asyncio
async def task_one():
for _ in range(5):
print('task-one...')
await asyncio.sleep(1)
async def task_two():
for _ in range(5):
print('task-two...')
await asyncio.sleep(1)
async def main():
tasks = [asyncio.create_task(task_one()), asyncio.create_task(task_two())]
await asyncio.wait(tasks)
asyncio.run(main())
Retrieving Results from Coroutine Tasks
import asyncio
from asyncio import as_completed
async def task_one():
await asyncio.sleep(3)
return 'hello world - 1'
async def task_two():
await asyncio.sleep(10)
return 'hello world - 2'
if __name__ == '__main__':
loop = asyncio.get_event_loop()
coro_list = [task_one(), task_two()]
# Retrieve results - approach one
# done, pending = loop.run_until_complete(asyncio.wait(coro_list))
# for task in done:
# print(task.result())
# Retrieve results - approach two (blocking until longest task completes)
# result_list = loop.run_until_complete(asyncio.gather(*coro_list))
# print(result_list)
# Retrieve results - approach three
for task in as_completed(coro_list):
res = loop.run_until_complete(task)
print(res)
Implementing a Web Crawler Using Coroutines
import asyncio
import requests
from functools import partial
from bs4 import BeautifulSoup
url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
}
async def fetch_movie_info(page_number):
# loop = asyncio.get_running_loop()
response = await loop.run_in_executor(None, partial(requests.get, url.format(page_number * 25), headers=headers))
soup = BeautifulSoup(response.text, 'lxml')
div_list = soup.find_all('div', class_='hd')
for title in div_list:
print(title.get_text())
async def main():
task_list = [asyncio.create_task(fetch_movie_info(i)) for i in range(10)]
await asyncio.wait(task_list)
if __name__ == '__main__':
# Execute in Python versions below 3.9
loop = asyncio.get_event_loop()
coro_list = [fetch_movie_info(i) for i in range(10)]
loop.run_until_complete(asyncio.wait(coro_list))
# Use asyncio.run() for Python 3.10+
# asyncio.run(main())
# Creating event loops
# loop = asyncio.new_event_loop() # Always creates a new loop
# loop = asyncio.get_event_loop() # Returns existing or creates a new one
# loop = asyncio.get_running_loop() # Returns existing or raises an error
Using aiohttp for Asynchronous HTTP Requests
The requests library does not support asynchronous operations. To use it within asyncio, a thread pool must be employed. The aiohttp library provides native asynchronous capabilities for HTTP clients.
# pip install aiohttp
import asyncio
import aiohttp
url = 'https://www.baidu.com'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
}
# Fetching a webpage using async context manager
async def fetch_baidu_page():
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
print(await response.text())
async def main():
task = asyncio.create_task(fetch_baidu_page())
result = await task
print(result)
if __name__ == '__main__':
# For Python versions below 3.10
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_baidu_page())
# For Python 3.10+
# asyncio.run(main())
Handling Return Values from Async Tasks
import time
import asyncio
import aiohttp
url = 'https://www.baidu.com'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
}
async def fetch_baidu_page():
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
await asyncio.sleep(3)
return await response.text()
# Method one
# async def main():
# result_1 = await fetch_baidu_page() # Blocks execution
# result_2 = await fetch_baidu_page()
# print(result_1)
# print(result_2)
# Method two
# async def main():
# task = [asyncio.create_task(fetch_baidu_page()) for _ in range(2)]
# done, pending = await asyncio.wait(task)
# for temp in done:
# print(temp.result())
# Method three
# async def main():
# task = [asyncio.create_task(fetch_baidu_page()) for _ in range(2)]
# result_list = await asyncio.gather(*task)
# print(result_list)
# Method four
def callback(task):
print(task.result())
async def main():
task_list = [asyncio.create_task(fetch_baidu_page()) for _ in range(2)]
for task in task_list:
task.add_done_callback(callback)
await task
if __name__ == '__main__':
start = time.time()
asyncio.run(main())
end = time.time()
print(f'Total time: {end - start}')
Fetching Movie Data with Async Crawling
import asyncio
import aiohttp
from bs4 import BeautifulSoup
url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
}
async def fetch_movie_info(page):
async with aiohttp.ClientSession() as session:
async with session.get(url=url.format(page * 25), headers=headers) as response:
soup = BeautifulSoup(await response.text(), 'lxml')
div_list = soup.find_all('div', class_='hd')
result_list = list()
for title in div_list:
result_list.append(title.get_text())
return result_list
async def main():
task_list = [asyncio.create_task(fetch_movie_info(page_number)) for page_number in range(10)]
result_list = await asyncio.gather(*task_list)
for temp in result_list:
print(temp, str(len(temp)) + ' items')
if __name__ == '__main__':
asyncio.run(main())
Using aiomysql for Database Operations
In Python 3, async/await keywords enable asynchronous operations. For example, aiohttp replaces requests for HTTP calls, and aiomysql can replace PyMySQL for MySQL access.
# pip install aiomysql
import asyncio
import aiomysql
async def get_database_result():
async with aiomysql.connect(host='localhost', port=3306, user='root', password='root', db='py_spider') as db:
async with db.cursor() as cursor:
sql = 'select * from tx_work;'
await cursor.execute(sql)
result = await cursor.fetchall()
print(result)
asyncio.run(get_database_result())
Async Web Crawler Example: Car Information from Che168
Using asyncio to scrape car specifications from Che168 and store them in MySQL.
import redis
import chardet
import hashlib
import asyncio
import aiohttp
import aiomysql
from lxml import etree
class CarCrawler:
def __init__(self):
self.url = 'https://www.che168.com/china/a0_0msdgscncgpi1ltocsp{}exf4x0/?pvareaid=102179#currengpostion'
self.api_url = 'https://cacheapigo.che168.com/CarProduct/GetParam.ashx?specid={}'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
}
self.redis_client = redis.Redis()
async def get_car_ids(self, page, session, pool):
async with session.get(self.url.format(page), headers=self.headers) as response:
content = await response.read()
encoding = chardet.detect(content)['encoding']
if encoding == 'GB2312' or encoding == 'ISO-8859-1':
result = content.decode('gbk')
else:
print('Request frequency too high...')
result = content.decode(encoding)
tree = etree.HTML(result)
id_list = tree.xpath("//ul[@class='viewlist_ul']/li/@specid")
if id_list:
print('IDs:', id_list)
tasks = [asyncio.create_task(self.fetch_car_info(spec_id, session, pool)) for spec_id in id_list]
await asyncio.wait(tasks)
async def fetch_car_info(self, spec_id, session, pool):
async with session.get(self.api_url.format(spec_id), headers=self.headers) as response:
result = await response.json()
if result['result'].get('paramtypeitems'):
item = dict()
item['name'] = result['result']['paramtypeitems'][0]['paramitems'][0]['value']
item['price'] = result['result']['paramtypeitems'][0]['paramitems'][1]['value']
item['brand'] = result['result']['paramtypeitems'][0]['paramitems'][2]['value']
item['altitude'] = result['result']['paramtypeitems'][1]['paramitems'][2]['value']
item['breadth'] = result['result']['paramtypeitems'][1]['paramitems'][1]['value']
item['length'] = result['result']['paramtypeitems'][1]['paramitems'][0]['value']
print(item)
await self.save_car_info(item, pool)
else:
print('No data available...')
@staticmethod
def generate_md5(data_dict):
md5 = hashlib.md5()
md5.update(str(data_dict).encode())
return md5.hexdigest()
async def save_car_info(self, item, pool):
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
value_md5 = self.generate_md5(item)
redis_result = self.redis_client.sadd('car:filter', value_md5)
if redis_result:
sql = """
insert into car_info values (%s, %s, %s, %s, %s, %s, %s);
"""
try:
await cursor.execute(sql, (
0,
item['name'],
item['price'],
item['brand'],
item['altitude'],
item['breadth'],
item['length'],
))
await conn.commit()
print('Data saved successfully:', item)
except Exception as e:
print('Failed to save data:', e)
await conn.rollback()
else:
print('Data already exists...')
async def main(self):
async with aiomysql.create_pool(user='root', password='123456', db='py_spider') as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
create_table_sql = """
create table car_info(
id int primary key auto_increment,
name varchar(100),
price varchar(100),
brand varchar(100),
altitude varchar(100),
breadth varchar(100),
length varchar(100)
);
"""
check_table_sql = "show tables like 'car_info';"
result = await cursor.execute(check_table_sql)
if not result:
await cursor.execute(create_table_sql)
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(self.get_car_ids(page, session, pool)) for page in range(1, 11)]
await asyncio.wait(tasks)
if __name__ == '__main__':
crawler = CarCrawler()
asyncio.run(crawler.main())
Async Web Crawler Example: Car Information from Che168 to MongoDB
import redis
import chardet
import hashlib
import asyncio
import aiohttp
from lxml import etree
from motor.motor_asyncio import AsyncIOMotorClient
class CarCrawler:
def __init__(self):
self.url = 'https://www.che168.com/china/a0_0msdgscncgpi1ltocsp{}exf4x0/?pvareaid=102179#currengpostion'
self.api_url = 'https://cacheapigo.che168.com/CarProduct/GetParam.ashx?specid={}'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
}
self.redis_client = redis.Redis()
self.mongo_client = AsyncIOMotorClient('localhost', 27017)['py_spider']['car_info']
async def get_car_ids(self, page, session):
async with session.get(self.url.format(page), headers=self.headers) as response:
content = await response.read()
encoding = chardet.detect(content)['encoding']
if encoding == 'GB2312' or encoding == 'ISO-8859-1':
result = content.decode('gbk')
else:
print('Request frequency too high...')
result = content.decode(encoding)
tree = etree.HTML(result)
id_list = tree.xpath("//ul[@class='viewlist_ul']/li/@specid")
if id_list:
print('IDs:', id_list)
tasks = [asyncio.create_task(self.fetch_car_info(spec_id, session)) for spec_id in id_list]
await asyncio.wait(tasks)
async def fetch_car_info(self, spec_id, session):
async with session.get(self.api_url.format(spec_id), headers=self.headers) as response:
result = await response.json()
if result['result'].get('paramtypeitems'):
item = dict()
item['name'] = result['result']['paramtypeitems'][0]['paramitems'][0]['value']
item['price'] = result['result']['paramtypeitems'][0]['paramitems'][1]['value']
item['brand'] = result['result']['paramtypeitems'][0]['paramitems'][2]['value']
item['altitude'] = result['result']['paramtypeitems'][1]['paramitems'][2]['value']
item['breadth'] = result['result']['paramtypeitems'][1]['paramitems'][1]['value']
item['length'] = result['result']['paramtypeitems'][1]['paramitems'][0]['value']
print(item)
await self.save_car_info(item)
else:
print('No data available...')
@staticmethod
def generate_md5(data_dict):
md5 = hashlib.md5()
md5.update(str(data_dict).encode())
return md5.hexdigest()
async def save_car_info(self, item):
md5_hash = self.generate_md5(item)
redis_result = self.redis_client.sadd('car:filter', md5_hash)
if redis_result:
try:
await self.mongo_client.insert_one(item)
print('Data saved successfully:', item)
except Exception as e:
print('Failed to save data:', e)
else:
print('Data already exists...')
async def main(self):
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(self.get_car_ids(page, session)) for page in range(1, 11)]
await asyncio.wait(tasks)
if __name__ == '__main__':
crawler = CarCrawler()
loop = asyncio.get_event_loop()
loop.run_until_complete(crawler.main())
Threading
Basic Thread Concepts
import time
import threading
def work_1():
for _ in range(5):
print('work_1')
time.sleep(1)
def work_2():
for _ in range(5):
print('work_2')
time.sleep(1)
t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)
t2.daemon = True
t1.start()
t2.start()
Fetching Movie Data with Threads
import requests
import threading
from lxml import etree
url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
}
def fetch_movie_info(page_number):
response = requests.get(url.format(page_number * 25), headers=headers).text
tree = etree.HTML(response)
result = tree.xpath("//div[@class='hd']/a/span[1]/text()")
print(result)
if __name__ == '__main__':
thread_list = [threading.Thread(target=fetch_movie_info, args=(page,)) for page in range(10)]
for thread in thread_list:
thread.start()
Using ThreadPoolExecutor
import requests
from lxml import etree
from concurrent.futures import ThreadPoolExecutor, as_completed
url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
}
def fetch_movie_info(page_number):
response = requests.get(url.format(page_number * 25), headers=headers).text
tree = etree.HTML(response)
result = tree.xpath("//div[@class='hd']/a/span[1]/text()")
return result
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=5) as pool:
futures = [pool.submit(fetch_movie_info, page) for page in range(10)]
# for future in futures:
# print(future.result())
for future in as_completed(futures):
print(future.result())
Multiprocessing
Process Basics
from multiprocessing import Process
p = Process(target=func, args=(,))
p.daemon = True
p.start()
Using JoinableQueue in Processes
import pymongo
import requests
import jsonpath
from multiprocessing import Process, JoinableQueue as Queue
url = 'https://careers.tencent.com/tencentcareer/api/post/Query'
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
mongo_client = pymongo.MongoClient()
db = mongo_client['py_spider']['process_tx_work']
def fetch_job_info(page_num, queue):
params = {
"timestamp": "1741526713421",
"countryId": "",
"cityId": "",
"bgIds": "",
"productId": "",
"categoryId": "",
"parentCategoryId": "",
"attrId": "",
"keyword": "python",
"pageIndex": page_num,
"pageSize": "10",
"language": "zh-cn",
"area": "cn"
}
response = requests.get(url, params=params, headers=headers).json()
try:
for info in response['Data']['Posts']:
job_info_dict = dict()
job_info_dict['recruit_post_name'] = jsonpath.jsonpath(info, '$..RecruitPostName')[0]
job_info_dict['country_name'] = jsonpath.jsonpath(info, '$..CountryName')[0]
job_info_dict['location_name'] = jsonpath.jsonpath(info, '$..LocationName')[0]
job_info_dict['category_name'] = jsonpath.jsonpath(info, '$..CategoryName')[0]
job_info_dict['responsibility'] = jsonpath.jsonpath(info, '$..Responsibility')[0]
job_info_dict['last_update_time'] = jsonpath.jsonpath(info, '$..LastUpdateTime')[0]
queue.put(job_info_dict)
except TypeError:
print('No data:', params.get('pageIndex'))
def save_job_info(queue):
while True:
dict_info = queue.get()
db.insert_one(dict_info)
print('Data inserted successfully:', dict_info)
queue.task_done()
if __name__ == '__main__':
dict_info_queue = Queue()
process_list = list()
for page in range(1, 39):
p_get_info = Process(target=fetch_job_info, args=(page, dict_info_queue))
process_list.append(p_get_info)
p_save_job = Process(target=save_job_info, args=(dict_info_queue,))
for p in process_list:
p.start()
p_save_job.daemon = True
p_save_job.start()
for p in process_list:
p.join()
dict_info_queue.join()
print('Task completed...')
mongo_client.close()
print('Connection closed...')
Practical Examples
Case 1: Using Queue and Threading for Iqiyi Movies
import pymongo
import requests
import threading
from queue import Queue
class IqiyiCrawler:
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
}
self.mongo_client = pymongo.MongoClient()
self.db = self.mongo_client['py_spider']['thread_aqy_movie']
self.api_url = 'https://pcw-api.iqiyi.com/search/recommend/list?channel_id=2&data_type=1&mode=11&page_id={}&ret_num=48&session=85dd981b69cead4b60f6d980438a5664&three_category_id=15;must'
self.url_queue = Queue()
self.json_queue = Queue()
self.content_dict_queue = Queue()
def get_urls(self):
for page in range(1, 11):
self.url_queue.put(self.api_url.format(page))
def fetch_api_data(self):
while True:
url = self.url_queue.get()
response = requests.get(url, headers=self.headers).json()
self.json_queue.put(response)
self.url_queue.task_done()
def parse_movie_info(self):
while True:
json_info = self.json_queue.get()
for movie in json_info['data']['list']:
item = dict()
item['title'] = movie['title']
item['playUrl'] = movie['playUrl']
item['description'] = movie['description']
self.content_dict_queue.put(item)
self.json_queue.task_done()
def save_movie_info(self):
while True:
item = self.content_dict_queue.get()
self.db.insert_one(item)
print('Data saved successfully:', item)
self.content_dict_queue.task_done()
def main(self):
thread_list = list()
self.get_urls()
for _ in range(3):
t_get_json = threading.Thread(target=self.fetch_api_data)
thread_list.append(t_get_json)
for _ in range(3):
t_parse = threading.Thread(target=self.parse_movie_info)
thread_list.append(t_parse)
t_save_info = threading.Thread(target=self.save_movie_info)
thread_list.append(t_save_info)
for thread_obj in thread_list:
thread_obj.daemon = True
thread_obj.start()
for q in [self.url_queue, self.json_queue, self.content_dict_queue]:
q.join()
self.mongo_client.close()
print('Crawling finished...')
if __name__ == '__main__':
iqiyi = IqiyiCrawler()
iqiyi.main()
Case 2: Using ThreadPoolExecutor for Baidu Jobs
import pymysql
import requests
from dbutils.pooled_db import PooledDB
from concurrent.futures import ThreadPoolExecutor, as_completed
class BaiduJobSpider:
def __init__(self):
self.pool = PooledDB(
creator=pymysql,
maxconnections=5,
mincached=1,
maxcached=2,
maxshared=3,
blocking=True,
host='localhost',
port=3306,
user='root',
password='123456',
database='py_spider',
charset='utf8'
)
self.api_url = 'https://talent.baidu.com/httservice/getPostListNew'
self.headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8",
"Origin": "https://talent.baidu.com",
"Pragma": "no-cache",
"Referer": "https://talent.baidu.com/jobs/social-list?search=",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36",
"sec-ch-ua": "\"Chromium\";v=\"136\", \"Google Chrome\";v=\"136\", \"Not.A/Brand\";v=\"99\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"macOS\""
}
def fetch_job_info(self, page_num):
post_data = {
'recruitType': 'SOCIAL',
'pageSize': 10,
'keyWord': '',
'curPage': page_num,
'projectType': '',
}
cookies = {
"BIDUPSID": "76C653EDEB40C10F7F060DF3835854E3",
"PSTM": "1742473418",
"BAIDUID": "AD2B22AA47105A367D8F718EF45D4DB7:FG=1",
"H_WISE_SIDS": "62325_62969_63018_63056",
"BAIDUID_BFESS": "AD2B22AA47105A367D8F718EF45D4DB7:FG=1",
"ZFY": "wyT2CVZBsmiD2N7HpYAmmlD2cuhAfyL:Bl:AO7H4Z:BHN0:C",
"H_PS_PSSID": "61027_62325_62484_62969_63056_63140_63188_63195_63211_63241_63248_63253_63266_63074",
"Hm_lvt_50e85ccdd6c1e538eb1290bc92327926": "1747748555",
"HMACCOUNT": "B332573DD7B815D7",
"Hm_lpvt_50e85ccdd6c1e538eb1290bc92327926": "1747748942",
"RT": '"z=1&dm=baidu.com&si=abb7eb1e-1e10-4864-89ec-1bf5c2cae096&ss=mawkd7ob&sl=4&tt=41q9&bcn=https%3A%2F%2Ffclog.baidu.com%2Flog%2Fweirwood%3Ftype%3Dperf&ld=8gcx"'
}
response = requests.post(self.api_url, headers=self.headers, data=post_data, cookies=cookies).json()
return response
def parse_job_info(self, response):
job_list = response['data']['list']
for job_info in job_list:
education = job_info['education'] if job_info['education'] else '空'
name = job_info['name']
service_condition = job_info['serviceCondition']
self.save_job_info(0, name, education, service_condition)
def save_job_info(self, *args):
with self.pool.connection() as db:
with db.cursor() as cursor:
sql = "insert into baidu_work_thread_pool values (%s, %s, %s, %s);"
try:
cursor.execute(sql, args)
db.commit()
print(f'Saved data successfully: {args}')
except Exception as e:
db.rollback()
print(f'Failed to save data: {e}')
def create_table(self):
with self.pool.connection() as db:
with db.cursor() as cursor:
sql = """
create table if not exists baidu_work_thread_pool(
id int primary key auto_increment,
name varchar(100),
education varchar(200),
service_condition text
);
"""
try:
cursor.execute(sql)
print('Table created successfully')
except Exception as e:
print(f'Failed to create table: {e}')
def main(self):
self.create_table()
with ThreadPoolExecutor(max_workers=10) as pool:
futures = [pool.submit(self.fetch_job_info, page) for page in range(1, 51)]
for future in as_completed(futures):
pool.submit(self.parse_job_info, future.result())
if __name__ == '__main__':
spider = BaiduJobSpider()
spider.main()
Case 3: Using Queue and Multiprocessing for Mango Movies
import redis
import pymongo
import hashlib
import requests
from multiprocessing import Process, JoinableQueue as Queue
class MovieCrawler:
redis_client = redis.Redis(host='localhost', port=6379, db=0)
mongo_client = pymongo.MongoClient(host='localhost', port=27017)
mongo_db = mongo_client['py_spider']['process_mongo_movie']
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
}
self.api_url = 'https://pianku.api.mgtv.com/rider/list/pcweb/v3'
self.params_queue = Queue()
self.json_queue = Queue()
self.content_queue = Queue()
def populate_params(self):
for page in range(1, 6):
params_dict = {
"allowedRC": "1",
"platform": "pcweb",
"channelId": "2",
"pn": page,
"pc": "80",
"hudong": "1",
"_support": "10000000",
"kind": "19",
"area": "10",
"year": "all",
"chargeInfo": "a1",
"sort": "c2",
"feature": "all"
}
self.params_queue.put(params_dict)
def fetch_movie_info(self):
while True:
params_dict = self.params_queue.get()
response = requests.get(url=self.api_url, params=params_dict, headers=self.headers).json()
self.json_queue.put(response)
self.params_queue.task_done()
def parse_movie_info(self):
while True:
response = self.json_queue.get()
movie_list = response['data']['hitDocs']
for movie in movie_list:
item = dict()
item['title'] = movie['title']
item['subtitle'] = movie['subtitle']
item['story'] = movie['story']
self.content_queue.put(item)
self.json_queue.task_done()
@staticmethod
def generate_md5(data_dict):
md5_hash = hashlib.md5(str(data_dict).encode('utf-8')).hexdigest()
return md5_hash
def save_movie_info(self):
while True:
item = self.content_queue.get()
md5_hash = self.generate_md5(item)
redis_result = self.redis_client.sadd('process_mg_movie:filter', md5_hash)
if redis_result:
try:
self.mongo_db.insert_one(item)
print('Data inserted successfully:', item)
except Exception as e:
print('Failed to insert data:', e)
else:
print('Data already exists...')
self.content_queue.task_done()
def close_spider(self):
self.mongo_client.close()
self.redis_client.close()
print('Crawling task completed...')
def main(self):
self.populate_params()
process_list = list()
for _ in range(3):
p_fetch_movie = Process(target=self.fetch_movie_info)
process_list.append(p_fetch_movie)
p_parse_movie = Process(target=self.parse_movie_info)
process_list.append(p_parse_movie)
p_save_movie = Process(target=self.save_movie_info)
process_list.append(p_save_movie)
for process_obj in process_list:
process_obj.daemon = True
process_obj.start()
for q in [self.params_queue, self.json_queue, self.content_queue]:
q.join()
self.close_spider()
if __name__ == '__main__':
crawler = MovieCrawler()
crawler.main()
Case 4: Async Hero Skins Download
import os
import asyncio
import aiohttp
import aiofile
class HeroSkinDownloader:
def __init__(self):
self.json_url = 'https://pvp.qq.com/web201605/js/herolist.json'
self.skin_url = 'https://game.gtimg.cn/images/yxzj/img201606/skin/hero-info/{}/{}-bigskin-{}.jpg'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
}
async def download_image(self, session, e_name, c_name):
for skin_id in range(1, 31):
async with session.get(self.skin_url.format(e_name, e_name, skin_id), headers=self.headers) as response:
if response.status == 200:
content = await response.read()
async with aiofile.async_open('./images/' + c_name + '-' + str(skin_id) + '.jpg', 'wb') as f:
await f.write(content)
print('Downloaded:', c_name + '-' + str(skin_id) + '.jpg')
else:
break
async def main(self):
task_list = list()
async with aiohttp.ClientSession() as session:
async with session.get(self.json_url, headers=self.headers) as response:
result = await response.json(content_type=None)
for item in result:
e_name = item['ename']
c_name = item['cname']
task_obj = asyncio.create_task(self.download_image(session, e_name, c_name))
task_list.append(task_obj)
await asyncio.wait(task_list)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
if not os.path.exists('./images'):
os.mkdir('./images')
downloader = HeroSkinDownloader()
loop.run_until_complete(downloader.main())