Python Multithreading Fundamentals and Synchronization Techniques
Process vs Thread Comparison
Functional Differences
- Processes enable multitasking at the application level (e.g., running multiple QQ instances simultaneously)
- Threads enable multitasking within a single process (e.g., multiple chat windows in one QQ instance)
Definitional Distinctions
- A process is the fundamental unit of resource allocation in an operating system
- A thread is a lightweight execution unit within a process, serving as the basic scheduling unit for the CPU. Threads maintain minimal resources (program counter, register set, stack) but share all process resources
Key Differences
- Every program contains at least one process, each containing at least one thread
- Threads have smaller granularity than processes (fewer resources), enabling higher concurrency
- Processes maintain independent memory spaces, while threads share memory, significantly improving efficiency
- Threads cannot execute independently and must exist within a process context
Pros and Cons
Threads offer lower execution overhead but complicate resource management and protection. Processes provide better isolation but incur higher overhead.
Multithreading with threading Module
Python's threading module provides a higher-level interface compared to the low-level thread module.
Single-threaded Execution
#coding=utf-8
import time
def send_message():
print("Sending message...")
time.sleep(1)
if __name__ == "__main__":
for i in range(5):
send_message()
Multi-threaded Execution
#coding=utf-8
import threading
import time
def send_message():
print("Sending message...")
time.sleep(1)
if __name__ == "__main__":
for i in range(5):
worker = threading.Thread(target=send_message)
worker.start()
Thread Lifecycle Management
#coding=utf-8
import threading
from time import sleep, ctime
def play_music():
for i in range(3):
print(f"Playing track {i}")
sleep(1)
def display_video():
for i in range(3):
print(f"Displaying frame {i}")
sleep(1)
if __name__ == '__main__':
print(f'Started at: {ctime()}')
audio_thread = threading.Thread(target=play_music)
video_thread = threading.Thread(target=display_video)
audio_thread.start()
video_thread.start()
print(f'Completed at: {ctime()}')
Active Thread Monitoring
#coding=utf-8
import threading
from time import sleep, ctime
def worker_task():
for i in range(3):
print(f"Worker executing iteration {i}")
sleep(1)
if __name__ == '__main__':
print(f'Start time: {ctime()}')
threads = []
for _ in range(2):
t = threading.Thread(target=worker_task)
threads.append(t)
t.start()
while True:
active_count = len(threading.enumerate())
print(f'Active threads: {active_count}')
if active_count <= 1:
break
sleep(0.5)
Thread Implementation Best Practices
Encapsulating Thread Logic
#coding=utf-8
import threading
import time
class CustomThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
message = f"Thread {self.name} iteration {i}"
print(message)
if __name__ == '__main__':
worker = CustomThread()
worker.start()
Thread Execution Order
#coding=utf-8
import threading
import time
class RandomThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
print(f"{self.name} - Step {i}")
def launch_threads():
for _ in range(5):
t = RandomThread()
t.start()
if __name__ == '__main__':
launch_threads()
Global Variable Sharing
Basic Shared Resource Example
from threading import Thread
import time
shared_counter = 100
def increment_worker():
global shared_counter
for i in range(3):
shared_counter += 1
print(f"Increment worker result: {shared_counter}")
def display_worker():
global shared_counter
print(f"Display worker sees: {shared_counter}")
print(f"Initial value: {shared_counter}")
t1 = Thread(target=increment_worker)
t1.start()
time.sleep(1)
t2 = Thread(target=display_worker)
t2.start()
Mutable Object Sharing
from threading import Thread
import time
def modify_collection(data_list):
data_list.append(44)
print(f"Modified list: {data_list}")
def read_collection(data_list):
time.sleep(1)
print(f"Read list: {data_list}")
shared_data = [11, 22, 33]
t1 = Thread(target=modify_collection, args=(shared_data,))
t1.start()
t2 = Thread(target=read_collection, args=(shared_data,))
t2.start()
Synchronization Concepts
Race Condition Demonstration
from threading import Thread
import time
race_variable = 0
def increment_operation():
global race_variable
for i in range(1000000):
race_variable += 1
print(f"Operation complete: {race_variable}")
p1 = Thread(target=increment_operation)
p1.start()
p2 = Thread(target=increment_operation)
p2.start()
print(f"Final value: {race_variable}")
Mutex Lock Implementation
Basic Mutex Usage
from threading import Thread, Lock
import time
protected_counter = 0
access_lock = Lock()
def safe_increment():
global protected_counter
for i in range(1000000):
access_lock.acquire()
protected_counter += 1
access_lock.release()
print(f"Thread finished with count: {protected_counter}")
worker1 = Thread(target=safe_increment)
worker2 = Thread(target=safe_increment)
worker1.start()
worker2.start()
worker1.join()
worker2.join()
print(f"Final protected count: {protected_counter}")
Thread-Specific Data
Non-Shared Variables
import threading
from time import sleep
def isolated_task(delay):
local_value = 1
sleep(delay)
local_value += 1
print(f"Thread {threading.current_thread()} has value: {local_value}")
t1 = threading.Thread(target=isolated_task, args=(5,))
t2 = threading.Thread(target=isolated_task, args=(1,))
t1.start()
t2.start()
Deadlock Scenarios
Classic Deadlock Example
import threading
import time
class ThreadA(threading.Thread):
def run(self):
if resource1.acquire():
print(f"{self.name} acquired resource1")
time.sleep(1)
if resource2.acquire():
print(f"{self.name} acquired resource2")
resource2.release()
resource1.release()
class ThreadB(threading.Thread):
def run(self):
if resource2.acquire():
print(f"{self.name} acquired resource2")
time.sleep(1)
if resource1.acquire():
print(f"{self.name} acquired resource1")
resource1.release()
resource2.release()
resource1 = threading.Lock()
resource2 = threading.Lock()
t1 = ThreadA()
t2 = ThreadB()
t1.start()
t2.start()
Synchronized Thread Execution
Sequential Thread Pattern
from threading import Thread, Lock
from time import sleep
class Phase1(Thread):
def run(self):
while True:
if lock1.acquire():
print("Phase 1 executing")
sleep(0.5)
lock2.release()
class Phase2(Thread):
def run(self):
while True:
if lock2.acquire():
print("Phase 2 executing")
sleep(0.5)
lock3.release()
class Phase3(Thread):
def run(self):
while True:
if lock3.acquire():
print("Phase 3 executing")
sleep(0.5)
lock1.release()
lock1 = Lock()
lock2 = Lock()
lock2.acquire()
lock3 = Lock()
lock3.acquire()
p1 = Phase1()
p2 = Phase2()
p3 = Phase3()
p1.start()
p2.start()
p3.start()
Producer-Consumer Pattern
Queue-based Implementation
#encoding=utf-8
import threading
import time
from queue import Queue
class Producer(threading.Thread):
def run(self):
global buffer
item_id = 0
while True:
if buffer.qsize() < 1000:
for _ in range(100):
item_id += 1
product = f'Item-{item_id}'
buffer.put(product)
print(f"Produced: {product}")
time.sleep(0.5)
class Consumer(threading.Thread):
def run(self):
global buffer
while True:
if buffer.qsize() > 100:
for _ in range(3):
product = buffer.get()
print(f"{self.name} consumed: {product}")
time.sleep(1)
if __name__ == '__main__':
buffer = Queue()
for i in range(500):
buffer.put(f'Startup-{i}')
producers = [Producer() for _ in range(2)]
consumers = [Consumer() for _ in range(5)]
for p in producers:
p.start()
for c in consumers:
c.start()
Queue Operations
put()- Add items to queueget()- Retrieve items from queueqsize()- Check queue size
Producer-Consumer Benefits
The pattern decouples production and consumption rates through a buffered queue, preventing either party from blocking the other. The queue acts as a mediator, balancing processing capabilities between threads.