Python Multithreading: Practical Techniques for Concurrent Execution
Using _thread and threading Modules
Python provides two primary modules for thread-based concurrency. The _thread module offers low-level primitives for thread management and mutex locks, while threading builds on top of _thread to provide a higher-level, more comprehensive interface.
Low-Level Threading with _thread
import _thread
import time
# Global resource shared across threads
brick_stack = [f"brick_{i}" for i in range(1, 101)]
def move_bricks():
while True:
if not brick_stack:
break
brick = brick_stack.pop()
print(f"{_thread.get_ident()} removed: {brick}, remaining: {len(brick_stack)}")
time.sleep(0.1)
def main():
# Spawn 100 threads to move bricks
for _ in range(100):
_thread.start_new_thread(move_bricks, ())
time.sleep(10)
if __name__ == "__main__":
main()
The call _thread.start_new_thread(move_bricks, ()) starts a new thread that executes the move_bricks function.
High-Level Threading with threading
import threading
import time
brick_stack = [f"brick_{i}" for i in range(1, 101)]
def move_bricks():
while True:
if not brick_stack:
break
brick = brick_stack.pop()
print(f"{threading.current_thread().name} removed: {brick}, remaining: {len(brick_stack)}")
time.sleep(0.1)
def main():
# Create and start 100 threads
for i in range(100):
t = threading.Thread(target=move_bricks, name=f"worker-{i}")
t.start()
time.sleep(10)
if __name__ == "__main__":
main()
Creating a thread with threading.Thread(target=move_bricks, name="worker", args=()) returns a new thread object. The thread does not run until .start() is called.
Thread Synchronization and Control
Blocking the Main Thread with join()
import threading
import time
tasks = [f"task_{i}" for i in range(10)]
def process_tasks():
while tasks:
task = tasks.pop()
print(f"{threading.current_thread().name} processing {task}")
time.sleep(0.2)
def main():
t = threading.Thread(target=process_tasks, daemon=False)
t.start()
t.join() # Wait for thread t to finish before proceeding
print("All tasks completed.")
if __name__ == "__main__":
main()
.join() blocks the calling thread (usually the main thread) until the thread on which its called terminates.
Daemon vs Non-Daemon Threads
A daemon thread automatically exits when the main thread ends, regardless of its own execution state. A non-daemon thread must complete before the program can exit.
import threading
import time
def background_task():
for i in range(10):
print(f"Daemon thread iteration {i}")
time.sleep(0.5)
def main():
daemon_thread = threading.Thread(target=background_task, daemon=True)
daemon_thread.start()
time.sleep(2)
print("Main thread ending.")
if __name__ == "__main__":
main()
Locking Shared Resources
Thread locks prevent race conditions when multiple threads access shared data.
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(1000000):
lock.acquire()
counter += 1
lock.release()
print(f"{threading.current_thread().name}: {counter}")
def main():
threads = [threading.Thread(target=increment, name=f"t{i}") for i in range(3)]
for t in threads:
t.start()
if __name__ == "__main__":
main()
Semaphore for Limiting Concurrent Access
A semaphore contrrols how many threads can access a given resource simultaneously.
import threading
import time
semaphore = threading.Semaphore(5) # Allow up to 5 concurrent accesses
def enter_station(passenger_id):
print(f"Passenger {passenger_id} arrives.")
semaphore.acquire()
print(f"Passenger {passenger_id} enters.")
time.sleep(2)
semaphore.release()
print(f"Passenger {passenger_id} leaves.")
def main():
for i in range(20):
t = threading.Thread(target=enter_station, args=(i,))
t.start()
if __name__ == "__main__":
main()
Queues and Thread Pools
Queue for Safe Inter-Thread Communication
The queue.Queue class provides a thread-safe FIFO structure, ideal for producer-consumer patterns.
import random
import threading
import time
from queue import Queue
dishes = ["pizza", "pasta", "salad", "soup", "burger", "sushi", "taco", "curry", "ramen", "dumpling"]
order_queue = Queue(maxsize=10)
def chef():
while True:
if order_queue.full():
print("Kitchen full. Waiting for orders to be taken.")
order_queue.join()
dish = random.choice(dishes)
order_queue.put(dish)
print(f"Chef cooked: {dish}")
time.sleep(0.3)
def customer():
time.sleep(5) # Let chef start
while True:
if order_queue.empty():
print("No dishes ready. Waiting...")
time.sleep(1)
else:
dish = order_queue.get()
print(f"Customer ate: {dish}")
order_queue.task_done()
def main():
t1 = threading.Thread(target=chef)
t2 = threading.Thread(target=customer)
t1.start()
t2.start()
if __name__ == "__main__":
main()
Thread Pool for Controlled Parallelism
A thread pool manages a fixed number of threads, reusing them for multiple tasks. This contrasts with a semaphore, which limits concurrent access but still creates many threads.
import threading
import threadpool
import time
tasks = [f"brick_{i}" for i in range(100)]
def move_brick(brick):
print(f"{threading.current_thread().name} moved {brick}")
time.sleep(0.1)
def main():
pool = threadpool.ThreadPool(5) # 5 worker threads
requests = threadpool.makeRequests(move_brick, tasks)
for req in requests:
pool.putRequest(req)
pool.wait()
if __name__ == "__main__":
main()
Using the tomorrow Library for Easy Concurrency
The tomorrow library provides a decorator to run functions concurrently.
Installation: pip install tomorrow
import time
from random import random
from tomorrow import threads
import threading
@threads(10)
def perform_action(action_name):
print(f"{threading.current_thread().name} is doing: {action_name}")
time.sleep(random())
def main():
actions = ["listening to music", "chatting", "eating", "coding", "reading"]
for action in actions:
perform_action(action)
time.sleep(0.2)
if __name__ == "__main__":
main()
The @threads(10) decorator limits the pool to 10 simultaneous threads.