Python Concurrency: Processes, Threads, and Synchronization Techniques
Global Interpreter Lock (GIL)
The Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple native threads from executing Python bytecode at the same time within a single process. It's not a feature of the Python language itself but rather an implementation detail of the CPython interpreter.
The GIL functions similarly to a mutual exclusion lock, addressing resource competition among multiple threads in the interpreter. When one thread is executing, it acquires the GIL, and only when it releases it can another thread acquire it and execute.
Impact of GIL on Python Programs
- Due to the GIL, only one thread can execute Python bytecode at any given time in a single process.
- Since threads run within processes, and threads are the basic unit of CPU scheduling and dispatch, Python's multithreading cannot leverage multiple CPU cores because of the GIL.
- The GIL only switches to other threads when the program encounters I/O operations, making Python's multithreading suitable for I/O-bound tasks but not for CPU-bound operations.
- While the GIL ensures that individual bytecode instructions are executed without interference from other threads, thread switches can occur between any bytecode instructions.
Parallelism vs Concurrency
Most modern operating systems support running multiple tasks simultaneously. Each task typically corresponds to a program, and each running program is a process. In other words, a process is an instance of a program in execution.
Parallelism
Parallelism refers to the simultaneous execution of multiple instructions on multiple processors at the same moment.
Concurrency
Concurrency involves executing only one instruction at a time, but rapidly switching between multiple processes, creating the illusion of parallel execution from a macro perspective.
Threads and Processes
The relationship between processes and threads can be summarized as follows: An operating system can execute multiple tasks simultaneously, with each task being a process. Each process can, in turn, execute multiple tasks, with each task being a thread.
Threads
A thread is the smallest unit of program execution, consisting of a sequence of executable instructions. It's a component of a process, and a single process can contain multiple threads. In multithreaded programs, one thread serves as the main thread, handling the entire execution from start to finish, while other threads are created or terminated during the main thread's execution.
Thread Safety: Thread safety refers to ensuring data consistency and integrity when multiple threads compete for the same resources. This is typically achieved through locking mechanisms that provide mutual exclusion at critical data points.
Thread Safety Issues: These usually arise when multiple threads in an environment have write access to shared variables (global or static), potentially leading to data inconsistency or corruption. To prevent such issues, locking mechanisms are commonly implemented.
Processes
A process is the smallest unit of resource allocation, serving as the fundamental unit for resource allocation (CPU, memory, disk, network) and scheduling by the operating system. Each running program corresponds to a process.
Multithreading in Python
Using the `threading` Module
import time
import threading
def task_one(param1, param2):
print(param1)
time.sleep(2)
print(param2)
def task_two():
print('Second task - part 1')
time.sleep(4)
print('Second task - part 2')
if __name__ == '__main__':
worker1 = threading.Thread(target=task_one, args=('Task 1 - Part 1', 'Task 1 - Part 2',))
worker2 = threading.Thread(target=task_two)
# Set as daemon thread - will terminate when main thread ends
worker2.setDaemon(True)
worker1.start()
worker2.start()
start_time = time.time()
# Block main thread until worker1 completes
worker1.join()
end_time = time.time()
print('Program completed. Thread blocking time: {}'.format(round(end_time-start_time, 2)))
"""
Output:
Task 1 - Part 1
Second task - part 1
Task 1 - Part 2
Program completed. Thread blocking time: 2.02
"""
- Since worker2 is set as a daemon thread, it terminates when the main thread completes within 3 seconds, so "Second task - part 2" is not printed.
- Because worker1 is joined, the main thread blocks for approximately 2.02 seconds until worker1 completes.
"""
Alternative threading approach: Inheriting from Thread class
---
Suitable for more complex threading implementations
"""
import time
import threading
class CustomThread1(threading.Thread):
def __init__(self, params, name):
super().__init__(name=name) # Set thread name
self.param1 = params[0]
self.param2 = params[1]
def run(self):
print(self.param1)
time.sleep(2)
print(self.param2)
class CustomThread2(threading.Thread):
def run(self):
print('Alternative thread - part 1')
time.sleep(4)
print('Alternative thread - part 2')
if __name__ == '__main__':
worker1 = CustomThread1(('First parameter', 'Second parameter'), 'custom_thread_1')
worker2 = CustomThread2()
# Set as daemon thread - will terminate when other threads end
worker2.setDaemon(True)
worker1.start()
worker2.start()
start_time = time.time()
# Block main thread until worker1 completes
worker1.join()
end_time = time.time()
print('Program completed. Thread blocking time: {}'.format(round(end_time - start_time, 2)))
"""
First parameter
Alternative thread - part 1
Second parameter
Program completed. Thread blocking time: 2.0
"""
Thread Communication
Methods for communication between threads:
- Shared variables: Setting a global shared variable (not thread-safe during communication)
- Queue: Using `queue.Queue()` with common methods including:
- `Queue.qsize()`: Returns the queue size
- `Queue.empty()`: Returns True if the queue is empty, False otherwise
- `Queue.full()`: Returns True if the queue is full, False otherwise
- `Queue.get([block[, timeout]])`: Retrieves an item from the queue with optional timeout
- `Queue.get_nowait()`: Equivalent to `Queue.get(False)`
- `Queue.put(item)`: Adds an item to the queue with optional timeout
- `Queue.put_nowait(item)`: Equivalent to `Queue.put(item, False)`
- `Queue.task_done()`: Signals that a task is completed
- `Queue.join()`: Blocks until all items in the queue are processed
import queue
import threading
import time
termination_flag = 0
class WorkerThread(threading.Thread):
def __init__(self, thread_id, name, task_queue):
threading.Thread.__init__(self)
self.thread_id = thread_id
self.name = name
self.task_queue = task_queue
def run(self):
print("Starting thread: " + self.name)
self.process_tasks(self.name, self.task_queue)
print("Exiting thread: " + self.name)
def process_tasks(thread_name, task_queue):
while not termination_flag:
queue_lock.acquire()
if not task_queue.empty():
data = task_queue.get()
queue_lock.release()
print("%s processing %s" % (thread_name, data))
else:
queue_lock.release()
time.sleep(1)
thread_list = ["Worker-1", "Worker-2", "Worker-3"]
task_list = ["Alpha", "Beta", "Gamma", "Delta", "Epsilon"]
queue_lock = threading.Lock()
task_queue = queue.Queue(10)
threads = []
thread_id = 1
# Create new threads
for t_name in thread_list:
thread = WorkerThread(thread_id, t_name, task_queue)
thread.start()
threads.append(thread)
thread_id += 1
# Populate queue
queue_lock.acquire()
for item in task_list:
task_queue.put(item)
queue_lock.release()
# Wait for queue to empty
while not task_queue.empty():
pass
# Signal threads to exit
termination_flag = 1
# Wait for all threads to complete
for t in threads:
t.join()
print("Main thread exiting")
"""
Output:
Starting thread: Worker-1
Starting thread: Worker-2
Starting thread: Worker-3
Worker-2 processing Alpha
Worker-3 processing Beta
Worker-1 processing Gamma
Worker-1 processing DeltaWorker-3 processing Epsilon
Exiting thread: Worker-3Exiting thread: Worker-1
Exiting thread: Worker-2
"""
Thread Synchronization
Thread Lock (`Lock`)
- Locks impact performance; once a lock is acquired by a thread, it must be released, otherwise other threads will wait indefinitely.
- Locks can cause deadlocks (resources not released, circular waiting).
"""
Locking ensures that a computation and assignment operation completes before yielding resources
---
Thread locking is conceptually similar to transaction locks in databases, making potentially unsafe operations atomic
"""
import threading
lock = threading.Lock() # Create a lock
counter = 0
def increment():
global counter
for i in range(1000000):
lock.acquire() # Acquire lock
counter += 1
lock.release() # Release lock
def decrement():
global counter
for i in range(1000000):
lock.acquire()
counter -= 1
lock.release()
if __name__ == '__main__':
print('Starting')
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=decrement)
t1.start()
t2.start()
t1.join()
t2.join()
print(counter)
print('Completed')
Reentrant Lock (`RLock`)
A reentrant lock can be acquired multiple times within the same thread, but each acquisition must be matched with a corresponding release.
from threading import Thread, RLock
from time import sleep
lock = RLock()
containers = [0 for _ in range(10)] # 10 containers
def fill_container():
for x in range(10):
if containers[x] == 0:
sleep(0.1) # Filling container takes time
containers[x] += 1
return x
def fill_with_lock(L):
while(True):
if L.acquire(False): # Open the container
fill_container()
L.release() # Close the container
threads = [Thread(target=fill_with_lock, args=[lock]) for _ in range(10)]
for t in threads:
t.start()
Condition Variables
Condition variables implement `__enter__` and `__exit__` methods, allowing them to be used with context managers via the `with` statement.
Semaphores
Semaphores are used to control the number of threads that can access a resource simultaneously, limiting concurrency.
Thread Pools
Thread pools can be implemented using `ThreadPoolExecutor` from the `concurrent.futures` module.
Multiprocessing in Python
Using the `multiprocessing` Module
Process Pools
Process pools can be created using `ProcessPoolExecutor` from `concurrent.futures` or the `Pool` class from the `multiprocessing` module.
Inter-Process Communication
- Data between processes is isolated and independent.
- `Queue`: Standard `queue.Queue` cannot be used with `multiprocessing.Process()`. Instead, `multiprocessing.Queue` should be used.
- `multiprocessing.Queue` cannot be used with process pools. For communication within pools, use `Queue` from `multiprocessing.Manager`.
from queue import Queue
from multiprocessing import Queue
from multiprocessing import manager
Manager().Queue
Pipe Communication
Pipes can only be used for communication between two processes.
from multiprocessing import Process, Pipe
def producer(pipe):
pipe.send("Sample data")
def consumer(pipe):
print(pipe.recv())
if __name__ == '__main__':
receive_pipe, send_pipe = Pipe()
prod_process = Process(target=producer, args=(send_pipe,))
cons_process = Process(target=consumer, args=(receive_pipe,))
prod_process.start()
cons_process.start()
prod_process.join()
cons_process.join()
"""
Output:
Sample data
"""
Shared Memory through Manager
The `Manager` class can be used to maintain shared memory between processes.
from multiprocessing import Process, Manager
def update_shared(shared_dict, key, value):
shared_dict[key] = value
if __name__ == '__main__':
shared_dict = Manager().dict()
process1 = Process(target=update_shared, args=(shared_dict, 1, 'First value'))
process2 = Process(target=update_shared, args=(shared_dict, 2, 'Second value'))
process1.start()
process2.start()
process1.join()
process2.join()
print(shared_dict)
"""
Output:
{1: 'First value', 2: 'Second value'}
"""
Types of Variables Managed by Manager
- `BoundedSemaphore(value)`
- `Condition(lock)`
- `Event()`
- `Lock()`
- `Namespace()`
- `Queue(maxsize)`
- `RLock()`
- `Semaphore(value)`
- `Array(typecode, sequence)`
- `Value(typecode, value)`
- `dict(sequence)`
- `list(sequence)`