Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Python Multithreading: Practical Techniques for Concurrent Execution

Tech 1

Using _thread and threading Modules

Python provides two primary modules for thread-based concurrency. The _thread module offers low-level primitives for thread management and mutex locks, while threading builds on top of _thread to provide a higher-level, more comprehensive interface.

Low-Level Threading with _thread

import _thread
import time

# Global resource shared across threads
brick_stack = [f"brick_{i}" for i in range(1, 101)]

def move_bricks():
    while True:
        if not brick_stack:
            break
        brick = brick_stack.pop()
        print(f"{_thread.get_ident()} removed: {brick}, remaining: {len(brick_stack)}")
        time.sleep(0.1)

def main():
    # Spawn 100 threads to move bricks
    for _ in range(100):
        _thread.start_new_thread(move_bricks, ())
    time.sleep(10)

if __name__ == "__main__":
    main()

The call _thread.start_new_thread(move_bricks, ()) starts a new thread that executes the move_bricks function.

High-Level Threading with threading

import threading
import time

brick_stack = [f"brick_{i}" for i in range(1, 101)]

def move_bricks():
    while True:
        if not brick_stack:
            break
        brick = brick_stack.pop()
        print(f"{threading.current_thread().name} removed: {brick}, remaining: {len(brick_stack)}")
        time.sleep(0.1)

def main():
    # Create and start 100 threads
    for i in range(100):
        t = threading.Thread(target=move_bricks, name=f"worker-{i}")
        t.start()
    time.sleep(10)

if __name__ == "__main__":
    main()

Creating a thread with threading.Thread(target=move_bricks, name="worker", args=()) returns a new thread object. The thread does not run until .start() is called.

Thread Synchronization and Control

Blocking the Main Thread with join()

import threading
import time

tasks = [f"task_{i}" for i in range(10)]

def process_tasks():
    while tasks:
        task = tasks.pop()
        print(f"{threading.current_thread().name} processing {task}")
        time.sleep(0.2)

def main():
    t = threading.Thread(target=process_tasks, daemon=False)
    t.start()
    t.join()  # Wait for thread t to finish before proceeding
    print("All tasks completed.")

if __name__ == "__main__":
    main()

.join() blocks the calling thread (usually the main thread) until the thread on which its called terminates.

Daemon vs Non-Daemon Threads

A daemon thread automatically exits when the main thread ends, regardless of its own execution state. A non-daemon thread must complete before the program can exit.

import threading
import time

def background_task():
    for i in range(10):
        print(f"Daemon thread iteration {i}")
        time.sleep(0.5)

def main():
    daemon_thread = threading.Thread(target=background_task, daemon=True)
    daemon_thread.start()
    time.sleep(2)
    print("Main thread ending.")

if __name__ == "__main__":
    main()

Locking Shared Resources

Thread locks prevent race conditions when multiple threads access shared data.

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(1000000):
        lock.acquire()
        counter += 1
        lock.release()
    print(f"{threading.current_thread().name}: {counter}")

def main():
    threads = [threading.Thread(target=increment, name=f"t{i}") for i in range(3)]
    for t in threads:
        t.start()

if __name__ == "__main__":
    main()

Semaphore for Limiting Concurrent Access

A semaphore contrrols how many threads can access a given resource simultaneously.

import threading
import time

semaphore = threading.Semaphore(5)  # Allow up to 5 concurrent accesses

def enter_station(passenger_id):
    print(f"Passenger {passenger_id} arrives.")
    semaphore.acquire()
    print(f"Passenger {passenger_id} enters.")
    time.sleep(2)
    semaphore.release()
    print(f"Passenger {passenger_id} leaves.")

def main():
    for i in range(20):
        t = threading.Thread(target=enter_station, args=(i,))
        t.start()

if __name__ == "__main__":
    main()

Queues and Thread Pools

Queue for Safe Inter-Thread Communication

The queue.Queue class provides a thread-safe FIFO structure, ideal for producer-consumer patterns.

import random
import threading
import time
from queue import Queue

dishes = ["pizza", "pasta", "salad", "soup", "burger", "sushi", "taco", "curry", "ramen", "dumpling"]
order_queue = Queue(maxsize=10)

def chef():
    while True:
        if order_queue.full():
            print("Kitchen full. Waiting for orders to be taken.")
            order_queue.join()
        dish = random.choice(dishes)
        order_queue.put(dish)
        print(f"Chef cooked: {dish}")
        time.sleep(0.3)

def customer():
    time.sleep(5)  # Let chef start
    while True:
        if order_queue.empty():
            print("No dishes ready. Waiting...")
            time.sleep(1)
        else:
            dish = order_queue.get()
            print(f"Customer ate: {dish}")
            order_queue.task_done()

def main():
    t1 = threading.Thread(target=chef)
    t2 = threading.Thread(target=customer)
    t1.start()
    t2.start()

if __name__ == "__main__":
    main()

Thread Pool for Controlled Parallelism

A thread pool manages a fixed number of threads, reusing them for multiple tasks. This contrasts with a semaphore, which limits concurrent access but still creates many threads.

import threading
import threadpool
import time

tasks = [f"brick_{i}" for i in range(100)]

def move_brick(brick):
    print(f"{threading.current_thread().name} moved {brick}")
    time.sleep(0.1)

def main():
    pool = threadpool.ThreadPool(5)  # 5 worker threads
    requests = threadpool.makeRequests(move_brick, tasks)
    for req in requests:
        pool.putRequest(req)
    pool.wait()

if __name__ == "__main__":
    main()

Using the tomorrow Library for Easy Concurrency

The tomorrow library provides a decorator to run functions concurrently.

Installation: pip install tomorrow

import time
from random import random
from tomorrow import threads
import threading

@threads(10)
def perform_action(action_name):
    print(f"{threading.current_thread().name} is doing: {action_name}")
    time.sleep(random())

def main():
    actions = ["listening to music", "chatting", "eating", "coding", "reading"]
    for action in actions:
        perform_action(action)
        time.sleep(0.2)

if __name__ == "__main__":
    main()

The @threads(10) decorator limits the pool to 10 simultaneous threads.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.