Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Concurrency Patterns in Web Scraping

Tech May 12 3

Coroutines

Executing Multiple Tasks Concurrently

import asyncio


async def task_one():
    for _ in range(5):
        print('task-one...')
        await asyncio.sleep(1)
        print(123)


async def task_two():
    for _ in range(5):
        print('task-two...')
        await asyncio.sleep(1)
        print(456)


loop = asyncio.get_event_loop()
coro_list = [task_one(), task_two()]
loop.run_until_complete(asyncio.wait(coro_list))

Running Coroutines in Modern Python Versions

# For Python 3.10+

import asyncio


async def task_one():
    for _ in range(5):
        print('task-one...')
        await asyncio.sleep(1)


async def task_two():
    for _ in range(5):
        print('task-two...')
        await asyncio.sleep(1)


async def main():
    tasks = [asyncio.create_task(task_one()), asyncio.create_task(task_two())]
    await asyncio.wait(tasks)


asyncio.run(main())

Retrieving Results from Coroutine Tasks

import asyncio
from asyncio import as_completed


async def task_one():
    await asyncio.sleep(3)
    return 'hello world - 1'


async def task_two():
    await asyncio.sleep(10)
    return 'hello world - 2'


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    coro_list = [task_one(), task_two()]

    # Retrieve results - approach one
    # done, pending = loop.run_until_complete(asyncio.wait(coro_list))
    # for task in done:
    #     print(task.result())

    # Retrieve results - approach two (blocking until longest task completes)
    # result_list = loop.run_until_complete(asyncio.gather(*coro_list))
    # print(result_list)

    # Retrieve results - approach three
    for task in as_completed(coro_list):
        res = loop.run_until_complete(task)
        print(res)

Implementing a Web Crawler Using Coroutines

import asyncio
import requests
from functools import partial
from bs4 import BeautifulSoup


url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
}


async def fetch_movie_info(page_number):
    # loop = asyncio.get_running_loop()
    response = await loop.run_in_executor(None, partial(requests.get, url.format(page_number * 25), headers=headers))
    soup = BeautifulSoup(response.text, 'lxml')
    div_list = soup.find_all('div', class_='hd')
    for title in div_list:
        print(title.get_text())


async def main():
    task_list = [asyncio.create_task(fetch_movie_info(i)) for i in range(10)]
    await asyncio.wait(task_list)


if __name__ == '__main__':
    # Execute in Python versions below 3.9
    loop = asyncio.get_event_loop()
    coro_list = [fetch_movie_info(i) for i in range(10)]
    loop.run_until_complete(asyncio.wait(coro_list))

    # Use asyncio.run() for Python 3.10+
    # asyncio.run(main())

    # Creating event loops
    # loop = asyncio.new_event_loop()  # Always creates a new loop
    # loop = asyncio.get_event_loop()  # Returns existing or creates a new one
    # loop = asyncio.get_running_loop()  # Returns existing or raises an error

Using aiohttp for Asynchronous HTTP Requests

The requests library does not support asynchronous operations. To use it within asyncio, a thread pool must be employed. The aiohttp library provides native asynchronous capabilities for HTTP clients.

# pip install aiohttp

import asyncio
import aiohttp

url = 'https://www.baidu.com'
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
}


# Fetching a webpage using async context manager
async def fetch_baidu_page():
    async with aiohttp.ClientSession() as session:
        async with session.get(url, headers=headers) as response:
            print(await response.text())


async def main():
    task = asyncio.create_task(fetch_baidu_page())
    result = await task
    print(result)


if __name__ == '__main__':
    # For Python versions below 3.10
    loop = asyncio.get_event_loop()
    loop.run_until_complete(fetch_baidu_page())

    # For Python 3.10+
    # asyncio.run(main())

Handling Return Values from Async Tasks

import time
import asyncio
import aiohttp

url = 'https://www.baidu.com'
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
}


async def fetch_baidu_page():
    async with aiohttp.ClientSession() as session:
        async with session.get(url, headers=headers) as response:
            await asyncio.sleep(3)
            return await response.text()


# Method one
# async def main():
#     result_1 = await fetch_baidu_page()  # Blocks execution
#     result_2 = await fetch_baidu_page()
#     print(result_1)
#     print(result_2)

# Method two
# async def main():
#     task = [asyncio.create_task(fetch_baidu_page()) for _ in range(2)]
#     done, pending = await asyncio.wait(task)
#     for temp in done:
#         print(temp.result())

# Method three
# async def main():
#     task = [asyncio.create_task(fetch_baidu_page()) for _ in range(2)]
#     result_list = await asyncio.gather(*task)
#     print(result_list)


# Method four

def callback(task):
    print(task.result())


async def main():
    task_list = [asyncio.create_task(fetch_baidu_page()) for _ in range(2)]
    for task in task_list:
        task.add_done_callback(callback)
        await task


if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(f'Total time: {end - start}')

Fetching Movie Data with Async Crawling

import asyncio
import aiohttp
from bs4 import BeautifulSoup

url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
}


async def fetch_movie_info(page):
    async with aiohttp.ClientSession() as session:
        async with session.get(url=url.format(page * 25), headers=headers) as response:
            soup = BeautifulSoup(await response.text(), 'lxml')
            div_list = soup.find_all('div', class_='hd')

            result_list = list()
            for title in div_list:
                result_list.append(title.get_text())
            return result_list


async def main():
    task_list = [asyncio.create_task(fetch_movie_info(page_number)) for page_number in range(10)]
    result_list = await asyncio.gather(*task_list)
    for temp in result_list:
        print(temp, str(len(temp)) + ' items')


if __name__ == '__main__':
    asyncio.run(main())

Using aiomysql for Database Operations

In Python 3, async/await keywords enable asynchronous operations. For example, aiohttp replaces requests for HTTP calls, and aiomysql can replace PyMySQL for MySQL access.

# pip install aiomysql

import asyncio
import aiomysql


async def get_database_result():
    async with aiomysql.connect(host='localhost', port=3306, user='root', password='root', db='py_spider') as db:
        async with db.cursor() as cursor:
            sql = 'select * from tx_work;'
            await cursor.execute(sql)
            result = await cursor.fetchall()
            print(result)


asyncio.run(get_database_result())

Async Web Crawler Example: Car Information from Che168

Using asyncio to scrape car specifications from Che168 and store them in MySQL.

import redis
import chardet
import hashlib
import asyncio
import aiohttp
import aiomysql
from lxml import etree


class CarCrawler:
    def __init__(self):
        self.url = 'https://www.che168.com/china/a0_0msdgscncgpi1ltocsp{}exf4x0/?pvareaid=102179#currengpostion'
        self.api_url = 'https://cacheapigo.che168.com/CarProduct/GetParam.ashx?specid={}'
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
        }

        self.redis_client = redis.Redis()

    async def get_car_ids(self, page, session, pool):
        async with session.get(self.url.format(page), headers=self.headers) as response:
            content = await response.read()
            encoding = chardet.detect(content)['encoding']
            if encoding == 'GB2312' or encoding == 'ISO-8859-1':
                result = content.decode('gbk')
            else:
                print('Request frequency too high...')
                result = content.decode(encoding)
        tree = etree.HTML(result)
        id_list = tree.xpath("//ul[@class='viewlist_ul']/li/@specid")
        if id_list:
            print('IDs:', id_list)
            tasks = [asyncio.create_task(self.fetch_car_info(spec_id, session, pool)) for spec_id in id_list]
            await asyncio.wait(tasks)

    async def fetch_car_info(self, spec_id, session, pool):
        async with session.get(self.api_url.format(spec_id), headers=self.headers) as response:
            result = await response.json()
            if result['result'].get('paramtypeitems'):
                item = dict()
                item['name'] = result['result']['paramtypeitems'][0]['paramitems'][0]['value']
                item['price'] = result['result']['paramtypeitems'][0]['paramitems'][1]['value']
                item['brand'] = result['result']['paramtypeitems'][0]['paramitems'][2]['value']
                item['altitude'] = result['result']['paramtypeitems'][1]['paramitems'][2]['value']
                item['breadth'] = result['result']['paramtypeitems'][1]['paramitems'][1]['value']
                item['length'] = result['result']['paramtypeitems'][1]['paramitems'][0]['value']
                print(item)
                await self.save_car_info(item, pool)
            else:
                print('No data available...')

    @staticmethod
    def generate_md5(data_dict):
        md5 = hashlib.md5()
        md5.update(str(data_dict).encode())
        return md5.hexdigest()

    async def save_car_info(self, item, pool):
        async with pool.acquire() as conn:
            async with conn.cursor() as cursor:
                value_md5 = self.generate_md5(item)
                redis_result = self.redis_client.sadd('car:filter', value_md5)
                if redis_result:
                    sql = """
                        insert into car_info values (%s, %s, %s, %s, %s, %s, %s);
                    """

                    try:
                        await cursor.execute(sql, (
                            0,
                            item['name'],
                            item['price'],
                            item['brand'],
                            item['altitude'],
                            item['breadth'],
                            item['length'],
                        ))
                        await conn.commit()
                        print('Data saved successfully:', item)
                    except Exception as e:
                        print('Failed to save data:', e)
                        await conn.rollback()
                else:
                    print('Data already exists...')

    async def main(self):
        async with aiomysql.create_pool(user='root', password='123456', db='py_spider') as pool:
            async with pool.acquire() as conn:
                async with conn.cursor() as cursor:
                    create_table_sql = """
                        create table car_info(
                            id int primary key auto_increment,
                            name varchar(100),
                            price varchar(100),
                            brand varchar(100),
                            altitude varchar(100),
                            breadth varchar(100),
                            length varchar(100)
                        );
                    """

                    check_table_sql = "show tables like 'car_info';"
                    result = await cursor.execute(check_table_sql)
                    if not result:
                        await cursor.execute(create_table_sql)

            async with aiohttp.ClientSession() as session:
                tasks = [asyncio.create_task(self.get_car_ids(page, session, pool)) for page in range(1, 11)]
                await asyncio.wait(tasks)


if __name__ == '__main__':
    crawler = CarCrawler()
    asyncio.run(crawler.main())

Async Web Crawler Example: Car Information from Che168 to MongoDB

import redis
import chardet
import hashlib
import asyncio
import aiohttp
from lxml import etree
from motor.motor_asyncio import AsyncIOMotorClient


class CarCrawler:
    def __init__(self):
        self.url = 'https://www.che168.com/china/a0_0msdgscncgpi1ltocsp{}exf4x0/?pvareaid=102179#currengpostion'
        self.api_url = 'https://cacheapigo.che168.com/CarProduct/GetParam.ashx?specid={}'
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
        }

        self.redis_client = redis.Redis()
        self.mongo_client = AsyncIOMotorClient('localhost', 27017)['py_spider']['car_info']

    async def get_car_ids(self, page, session):
        async with session.get(self.url.format(page), headers=self.headers) as response:
            content = await response.read()
            encoding = chardet.detect(content)['encoding']
            if encoding == 'GB2312' or encoding == 'ISO-8859-1':
                result = content.decode('gbk')
            else:
                print('Request frequency too high...')
                result = content.decode(encoding)
        tree = etree.HTML(result)
        id_list = tree.xpath("//ul[@class='viewlist_ul']/li/@specid")
        if id_list:
            print('IDs:', id_list)
            tasks = [asyncio.create_task(self.fetch_car_info(spec_id, session)) for spec_id in id_list]
            await asyncio.wait(tasks)

    async def fetch_car_info(self, spec_id, session):
        async with session.get(self.api_url.format(spec_id), headers=self.headers) as response:
            result = await response.json()
            if result['result'].get('paramtypeitems'):
                item = dict()
                item['name'] = result['result']['paramtypeitems'][0]['paramitems'][0]['value']
                item['price'] = result['result']['paramtypeitems'][0]['paramitems'][1]['value']
                item['brand'] = result['result']['paramtypeitems'][0]['paramitems'][2]['value']
                item['altitude'] = result['result']['paramtypeitems'][1]['paramitems'][2]['value']
                item['breadth'] = result['result']['paramtypeitems'][1]['paramitems'][1]['value']
                item['length'] = result['result']['paramtypeitems'][1]['paramitems'][0]['value']
                print(item)
                await self.save_car_info(item)
            else:
                print('No data available...')

    @staticmethod
    def generate_md5(data_dict):
        md5 = hashlib.md5()
        md5.update(str(data_dict).encode())
        return md5.hexdigest()

    async def save_car_info(self, item):
        md5_hash = self.generate_md5(item)
        redis_result = self.redis_client.sadd('car:filter', md5_hash)
        if redis_result:
            try:
                await self.mongo_client.insert_one(item)
                print('Data saved successfully:', item)
            except Exception as e:
                print('Failed to save data:', e)
        else:
            print('Data already exists...')

    async def main(self):
        async with aiohttp.ClientSession() as session:
            tasks = [asyncio.create_task(self.get_car_ids(page, session)) for page in range(1, 11)]
            await asyncio.wait(tasks)


if __name__ == '__main__':
    crawler = CarCrawler()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(crawler.main())

Threading

Basic Thread Concepts

import time
import threading


def work_1():
    for _ in range(5):
        print('work_1')
        time.sleep(1)


def work_2():
    for _ in range(5):
        print('work_2')
        time.sleep(1)


t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)

t2.daemon = True
t1.start()
t2.start()

Fetching Movie Data with Threads

import requests
import threading
from lxml import etree

url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
}


def fetch_movie_info(page_number):
    response = requests.get(url.format(page_number * 25), headers=headers).text
    tree = etree.HTML(response)
    result = tree.xpath("//div[@class='hd']/a/span[1]/text()")
    print(result)


if __name__ == '__main__':
    thread_list = [threading.Thread(target=fetch_movie_info, args=(page,)) for page in range(10)]
    for thread in thread_list:
        thread.start()

Using ThreadPoolExecutor

import requests
from lxml import etree
from concurrent.futures import ThreadPoolExecutor, as_completed

url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
}


def fetch_movie_info(page_number):
    response = requests.get(url.format(page_number * 25), headers=headers).text
    tree = etree.HTML(response)
    result = tree.xpath("//div[@class='hd']/a/span[1]/text()")
    return result


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=5) as pool:
        futures = [pool.submit(fetch_movie_info, page) for page in range(10)]
        # for future in futures:
        #     print(future.result())

        for future in as_completed(futures):
            print(future.result())

Multiprocessing

Process Basics

from multiprocessing import Process

p = Process(target=func, args=(,))
p.daemon = True
p.start()

Using JoinableQueue in Processes

import pymongo
import requests
import jsonpath
from multiprocessing import Process, JoinableQueue as Queue

url = 'https://careers.tencent.com/tencentcareer/api/post/Query'
headers = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}

mongo_client = pymongo.MongoClient()
db = mongo_client['py_spider']['process_tx_work']


def fetch_job_info(page_num, queue):
    params = {
        "timestamp": "1741526713421",
        "countryId": "",
        "cityId": "",
        "bgIds": "",
        "productId": "",
        "categoryId": "",
        "parentCategoryId": "",
        "attrId": "",
        "keyword": "python",
        "pageIndex": page_num,
        "pageSize": "10",
        "language": "zh-cn",
        "area": "cn"
    }

    response = requests.get(url, params=params, headers=headers).json()
    try:
        for info in response['Data']['Posts']:
            job_info_dict = dict()
            job_info_dict['recruit_post_name'] = jsonpath.jsonpath(info, '$..RecruitPostName')[0]
            job_info_dict['country_name'] = jsonpath.jsonpath(info, '$..CountryName')[0]
            job_info_dict['location_name'] = jsonpath.jsonpath(info, '$..LocationName')[0]
            job_info_dict['category_name'] = jsonpath.jsonpath(info, '$..CategoryName')[0]
            job_info_dict['responsibility'] = jsonpath.jsonpath(info, '$..Responsibility')[0]
            job_info_dict['last_update_time'] = jsonpath.jsonpath(info, '$..LastUpdateTime')[0]

            queue.put(job_info_dict)
    except TypeError:
        print('No data:', params.get('pageIndex'))


def save_job_info(queue):
    while True:
        dict_info = queue.get()
        db.insert_one(dict_info)
        print('Data inserted successfully:', dict_info)
        queue.task_done()


if __name__ == '__main__':
    dict_info_queue = Queue()

    process_list = list()
    for page in range(1, 39):
        p_get_info = Process(target=fetch_job_info, args=(page, dict_info_queue))
        process_list.append(p_get_info)

    p_save_job = Process(target=save_job_info, args=(dict_info_queue,))

    for p in process_list:
        p.start()

    p_save_job.daemon = True
    p_save_job.start()

    for p in process_list:
        p.join()

    dict_info_queue.join()

    print('Task completed...')

    mongo_client.close()
    print('Connection closed...')

Practical Examples

Case 1: Using Queue and Threading for Iqiyi Movies

import pymongo
import requests
import threading
from queue import Queue


class IqiyiCrawler:
    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
        }
        self.mongo_client = pymongo.MongoClient()
        self.db = self.mongo_client['py_spider']['thread_aqy_movie']
        self.api_url = 'https://pcw-api.iqiyi.com/search/recommend/list?channel_id=2&data_type=1&mode=11&page_id={}&ret_num=48&session=85dd981b69cead4b60f6d980438a5664&three_category_id=15;must'

        self.url_queue = Queue()
        self.json_queue = Queue()
        self.content_dict_queue = Queue()

    def get_urls(self):
        for page in range(1, 11):
            self.url_queue.put(self.api_url.format(page))

    def fetch_api_data(self):
        while True:
            url = self.url_queue.get()
            response = requests.get(url, headers=self.headers).json()
            self.json_queue.put(response)
            self.url_queue.task_done()

    def parse_movie_info(self):
        while True:
            json_info = self.json_queue.get()
            for movie in json_info['data']['list']:
                item = dict()
                item['title'] = movie['title']
                item['playUrl'] = movie['playUrl']
                item['description'] = movie['description']
                self.content_dict_queue.put(item)
            self.json_queue.task_done()

    def save_movie_info(self):
        while True:
            item = self.content_dict_queue.get()
            self.db.insert_one(item)
            print('Data saved successfully:', item)
            self.content_dict_queue.task_done()

    def main(self):
        thread_list = list()
        self.get_urls()

        for _ in range(3):
            t_get_json = threading.Thread(target=self.fetch_api_data)
            thread_list.append(t_get_json)

        for _ in range(3):
            t_parse = threading.Thread(target=self.parse_movie_info)
            thread_list.append(t_parse)

        t_save_info = threading.Thread(target=self.save_movie_info)
        thread_list.append(t_save_info)

        for thread_obj in thread_list:
            thread_obj.daemon = True
            thread_obj.start()

        for q in [self.url_queue, self.json_queue, self.content_dict_queue]:
            q.join()

        self.mongo_client.close()
        print('Crawling finished...')


if __name__ == '__main__':
    iqiyi = IqiyiCrawler()
    iqiyi.main()

Case 2: Using ThreadPoolExecutor for Baidu Jobs

import pymysql
import requests
from dbutils.pooled_db import PooledDB
from concurrent.futures import ThreadPoolExecutor, as_completed


class BaiduJobSpider:
    def __init__(self):
        self.pool = PooledDB(
            creator=pymysql,
            maxconnections=5,
            mincached=1,
            maxcached=2,
            maxshared=3,
            blocking=True,
            host='localhost',
            port=3306,
            user='root',
            password='123456',
            database='py_spider',
            charset='utf8'
        )

        self.api_url = 'https://talent.baidu.com/httservice/getPostListNew'
        self.headers = {
            "Accept": "application/json, text/plain, */*",
            "Accept-Language": "zh-CN,zh;q=0.9",
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8",
            "Origin": "https://talent.baidu.com",
            "Pragma": "no-cache",
            "Referer": "https://talent.baidu.com/jobs/social-list?search=",
            "Sec-Fetch-Dest": "empty",
            "Sec-Fetch-Mode": "cors",
            "Sec-Fetch-Site": "same-origin",
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36",
            "sec-ch-ua": "\"Chromium\";v=\"136\", \"Google Chrome\";v=\"136\", \"Not.A/Brand\";v=\"99\"",
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": "\"macOS\""
        }

    def fetch_job_info(self, page_num):
        post_data = {
            'recruitType': 'SOCIAL',
            'pageSize': 10,
            'keyWord': '',
            'curPage': page_num,
            'projectType': '',
        }

        cookies = {
            "BIDUPSID": "76C653EDEB40C10F7F060DF3835854E3",
            "PSTM": "1742473418",
            "BAIDUID": "AD2B22AA47105A367D8F718EF45D4DB7:FG=1",
            "H_WISE_SIDS": "62325_62969_63018_63056",
            "BAIDUID_BFESS": "AD2B22AA47105A367D8F718EF45D4DB7:FG=1",
            "ZFY": "wyT2CVZBsmiD2N7HpYAmmlD2cuhAfyL:Bl:AO7H4Z:BHN0:C",
            "H_PS_PSSID": "61027_62325_62484_62969_63056_63140_63188_63195_63211_63241_63248_63253_63266_63074",
            "Hm_lvt_50e85ccdd6c1e538eb1290bc92327926": "1747748555",
            "HMACCOUNT": "B332573DD7B815D7",
            "Hm_lpvt_50e85ccdd6c1e538eb1290bc92327926": "1747748942",
            "RT": '"z=1&dm=baidu.com&si=abb7eb1e-1e10-4864-89ec-1bf5c2cae096&ss=mawkd7ob&sl=4&tt=41q9&bcn=https%3A%2F%2Ffclog.baidu.com%2Flog%2Fweirwood%3Ftype%3Dperf&ld=8gcx"'
        }

        response = requests.post(self.api_url, headers=self.headers, data=post_data, cookies=cookies).json()
        return response

    def parse_job_info(self, response):
        job_list = response['data']['list']
        for job_info in job_list:
            education = job_info['education'] if job_info['education'] else '空'
            name = job_info['name']
            service_condition = job_info['serviceCondition']
            self.save_job_info(0, name, education, service_condition)

    def save_job_info(self, *args):
        with self.pool.connection() as db:
            with db.cursor() as cursor:
                sql = "insert into baidu_work_thread_pool values (%s, %s, %s, %s);"
                try:
                    cursor.execute(sql, args)
                    db.commit()
                    print(f'Saved data successfully: {args}')
                except Exception as e:
                    db.rollback()
                    print(f'Failed to save data: {e}')

    def create_table(self):
        with self.pool.connection() as db:
            with db.cursor() as cursor:
                sql = """
                    create table if not exists baidu_work_thread_pool(
                        id int primary key auto_increment,
                        name varchar(100),
                        education varchar(200),
                        service_condition text
                    );
                """
                try:
                    cursor.execute(sql)
                    print('Table created successfully')
                except Exception as e:
                    print(f'Failed to create table: {e}')

    def main(self):
        self.create_table()
        with ThreadPoolExecutor(max_workers=10) as pool:
            futures = [pool.submit(self.fetch_job_info, page) for page in range(1, 51)]

            for future in as_completed(futures):
                pool.submit(self.parse_job_info, future.result())


if __name__ == '__main__':
    spider = BaiduJobSpider()
    spider.main()

Case 3: Using Queue and Multiprocessing for Mango Movies

import redis
import pymongo
import hashlib
import requests
from multiprocessing import Process, JoinableQueue as Queue


class MovieCrawler:
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    mongo_client = pymongo.MongoClient(host='localhost', port=27017)
    mongo_db = mongo_client['py_spider']['process_mongo_movie']

    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
        }
        self.api_url = 'https://pianku.api.mgtv.com/rider/list/pcweb/v3'

        self.params_queue = Queue()
        self.json_queue = Queue()
        self.content_queue = Queue()

    def populate_params(self):
        for page in range(1, 6):
            params_dict = {
                "allowedRC": "1",
                "platform": "pcweb",
                "channelId": "2",
                "pn": page,
                "pc": "80",
                "hudong": "1",
                "_support": "10000000",
                "kind": "19",
                "area": "10",
                "year": "all",
                "chargeInfo": "a1",
                "sort": "c2",
                "feature": "all"
            }
            self.params_queue.put(params_dict)

    def fetch_movie_info(self):
        while True:
            params_dict = self.params_queue.get()
            response = requests.get(url=self.api_url, params=params_dict, headers=self.headers).json()
            self.json_queue.put(response)
            self.params_queue.task_done()

    def parse_movie_info(self):
        while True:
            response = self.json_queue.get()
            movie_list = response['data']['hitDocs']
            for movie in movie_list:
                item = dict()
                item['title'] = movie['title']
                item['subtitle'] = movie['subtitle']
                item['story'] = movie['story']
                self.content_queue.put(item)

            self.json_queue.task_done()

    @staticmethod
    def generate_md5(data_dict):
        md5_hash = hashlib.md5(str(data_dict).encode('utf-8')).hexdigest()
        return md5_hash

    def save_movie_info(self):
        while True:
            item = self.content_queue.get()
            md5_hash = self.generate_md5(item)
            redis_result = self.redis_client.sadd('process_mg_movie:filter', md5_hash)
            if redis_result:
                try:
                    self.mongo_db.insert_one(item)
                    print('Data inserted successfully:', item)
                except Exception as e:
                    print('Failed to insert data:', e)
            else:
                print('Data already exists...')

            self.content_queue.task_done()

    def close_spider(self):
        self.mongo_client.close()
        self.redis_client.close()
        print('Crawling task completed...')

    def main(self):
        self.populate_params()

        process_list = list()
        for _ in range(3):
            p_fetch_movie = Process(target=self.fetch_movie_info)
            process_list.append(p_fetch_movie)

        p_parse_movie = Process(target=self.parse_movie_info)
        process_list.append(p_parse_movie)

        p_save_movie = Process(target=self.save_movie_info)
        process_list.append(p_save_movie)

        for process_obj in process_list:
            process_obj.daemon = True
            process_obj.start()

        for q in [self.params_queue, self.json_queue, self.content_queue]:
            q.join()

        self.close_spider()


if __name__ == '__main__':
    crawler = MovieCrawler()
    crawler.main()

Case 4: Async Hero Skins Download

import os
import asyncio
import aiohttp
import aiofile


class HeroSkinDownloader:
    def __init__(self):
        self.json_url = 'https://pvp.qq.com/web201605/js/herolist.json'
        self.skin_url = 'https://game.gtimg.cn/images/yxzj/img201606/skin/hero-info/{}/{}-bigskin-{}.jpg'
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
        }

    async def download_image(self, session, e_name, c_name):
        for skin_id in range(1, 31):
            async with session.get(self.skin_url.format(e_name, e_name, skin_id), headers=self.headers) as response:
                if response.status == 200:
                    content = await response.read()
                    async with aiofile.async_open('./images/' + c_name + '-' + str(skin_id) + '.jpg', 'wb') as f:
                        await f.write(content)
                        print('Downloaded:', c_name + '-' + str(skin_id) + '.jpg')
                else:
                    break

    async def main(self):
        task_list = list()
        async with aiohttp.ClientSession() as session:
            async with session.get(self.json_url, headers=self.headers) as response:
                result = await response.json(content_type=None)
                for item in result:
                    e_name = item['ename']
                    c_name = item['cname']
                    task_obj = asyncio.create_task(self.download_image(session, e_name, c_name))
                    task_list.append(task_obj)

                await asyncio.wait(task_list)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    if not os.path.exists('./images'):
        os.mkdir('./images')

    downloader = HeroSkinDownloader()
    loop.run_until_complete(downloader.main())

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.