Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Python Concurrency: Processes, Threads, and Synchronization Techniques

Tech May 12 2

Global Interpreter Lock (GIL)

The Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple native threads from executing Python bytecode at the same time within a single process. It's not a feature of the Python language itself but rather an implementation detail of the CPython interpreter.

The GIL functions similarly to a mutual exclusion lock, addressing resource competition among multiple threads in the interpreter. When one thread is executing, it acquires the GIL, and only when it releases it can another thread acquire it and execute.

Impact of GIL on Python Programs

  • Due to the GIL, only one thread can execute Python bytecode at any given time in a single process.
  • Since threads run within processes, and threads are the basic unit of CPU scheduling and dispatch, Python's multithreading cannot leverage multiple CPU cores because of the GIL.
  • The GIL only switches to other threads when the program encounters I/O operations, making Python's multithreading suitable for I/O-bound tasks but not for CPU-bound operations.
  • While the GIL ensures that individual bytecode instructions are executed without interference from other threads, thread switches can occur between any bytecode instructions.

Parallelism vs Concurrency

Most modern operating systems support running multiple tasks simultaneously. Each task typically corresponds to a program, and each running program is a process. In other words, a process is an instance of a program in execution.

Parallelism

Parallelism refers to the simultaneous execution of multiple instructions on multiple processors at the same moment.

Concurrency

Concurrency involves executing only one instruction at a time, but rapidly switching between multiple processes, creating the illusion of parallel execution from a macro perspective.

Threads and Processes

The relationship between processes and threads can be summarized as follows: An operating system can execute multiple tasks simultaneously, with each task being a process. Each process can, in turn, execute multiple tasks, with each task being a thread.

Threads

A thread is the smallest unit of program execution, consisting of a sequence of executable instructions. It's a component of a process, and a single process can contain multiple threads. In multithreaded programs, one thread serves as the main thread, handling the entire execution from start to finish, while other threads are created or terminated during the main thread's execution.

Thread Safety: Thread safety refers to ensuring data consistency and integrity when multiple threads compete for the same resources. This is typically achieved through locking mechanisms that provide mutual exclusion at critical data points.

Thread Safety Issues: These usually arise when multiple threads in an environment have write access to shared variables (global or static), potentially leading to data inconsistency or corruption. To prevent such issues, locking mechanisms are commonly implemented.

Processes

A process is the smallest unit of resource allocation, serving as the fundamental unit for resource allocation (CPU, memory, disk, network) and scheduling by the operating system. Each running program corresponds to a process.

Multithreading in Python

Using the `threading` Module


import time
import threading

def task_one(param1, param2):
    print(param1)
    time.sleep(2)
    print(param2)

def task_two():
    print('Second task - part 1')
    time.sleep(4)
    print('Second task - part 2')

if __name__ == '__main__':
    worker1 = threading.Thread(target=task_one, args=('Task 1 - Part 1', 'Task 1 - Part 2',))
    worker2 = threading.Thread(target=task_two)

    # Set as daemon thread - will terminate when main thread ends
    worker2.setDaemon(True)

    worker1.start()
    worker2.start()

    start_time = time.time()
    # Block main thread until worker1 completes
    worker1.join()
    end_time = time.time()
    print('Program completed. Thread blocking time: {}'.format(round(end_time-start_time, 2)))

"""
Output:
Task 1 - Part 1
Second task - part 1
Task 1 - Part 2
Program completed. Thread blocking time: 2.02
"""
  • Since worker2 is set as a daemon thread, it terminates when the main thread completes within 3 seconds, so "Second task - part 2" is not printed.
  • Because worker1 is joined, the main thread blocks for approximately 2.02 seconds until worker1 completes.

"""
Alternative threading approach: Inheriting from Thread class
---
Suitable for more complex threading implementations
"""
import time
import threading

class CustomThread1(threading.Thread):
    def __init__(self, params, name):
        super().__init__(name=name)  # Set thread name
        self.param1 = params[0]
        self.param2 = params[1]

    def run(self):
        print(self.param1)
        time.sleep(2)
        print(self.param2)

class CustomThread2(threading.Thread):
    def run(self):
        print('Alternative thread - part 1')
        time.sleep(4)
        print('Alternative thread - part 2')

if __name__ == '__main__':
    worker1 = CustomThread1(('First parameter', 'Second parameter'), 'custom_thread_1')
    worker2 = CustomThread2()

    # Set as daemon thread - will terminate when other threads end
    worker2.setDaemon(True)

    worker1.start()
    worker2.start()

    start_time = time.time()
    # Block main thread until worker1 completes
    worker1.join()
    end_time = time.time()
    print('Program completed. Thread blocking time: {}'.format(round(end_time - start_time, 2)))

"""
First parameter
Alternative thread - part 1
Second parameter
Program completed. Thread blocking time: 2.0
"""

Thread Communication

Methods for communication between threads:

  • Shared variables: Setting a global shared variable (not thread-safe during communication)
  • Queue: Using `queue.Queue()` with common methods including:
    • `Queue.qsize()`: Returns the queue size
    • `Queue.empty()`: Returns True if the queue is empty, False otherwise
    • `Queue.full()`: Returns True if the queue is full, False otherwise
    • `Queue.get([block[, timeout]])`: Retrieves an item from the queue with optional timeout
    • `Queue.get_nowait()`: Equivalent to `Queue.get(False)`
    • `Queue.put(item)`: Adds an item to the queue with optional timeout
    • `Queue.put_nowait(item)`: Equivalent to `Queue.put(item, False)`
    • `Queue.task_done()`: Signals that a task is completed
    • `Queue.join()`: Blocks until all items in the queue are processed

import queue
import threading
import time

termination_flag = 0

class WorkerThread(threading.Thread):
    def __init__(self, thread_id, name, task_queue):
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.name = name
        self.task_queue = task_queue
        
    def run(self):
        print("Starting thread: " + self.name)
        self.process_tasks(self.name, self.task_queue)
        print("Exiting thread: " + self.name)

def process_tasks(thread_name, task_queue):
    while not termination_flag:
        queue_lock.acquire()
        if not task_queue.empty():
            data = task_queue.get()
            queue_lock.release()
            print("%s processing %s" % (thread_name, data))
        else:
            queue_lock.release()
        time.sleep(1)

thread_list = ["Worker-1", "Worker-2", "Worker-3"]
task_list = ["Alpha", "Beta", "Gamma", "Delta", "Epsilon"]
queue_lock = threading.Lock()
task_queue = queue.Queue(10)
threads = []
thread_id = 1

# Create new threads
for t_name in thread_list:
    thread = WorkerThread(thread_id, t_name, task_queue)
    thread.start()
    threads.append(thread)
    thread_id += 1

# Populate queue
queue_lock.acquire()
for item in task_list:
    task_queue.put(item)
queue_lock.release()

# Wait for queue to empty
while not task_queue.empty():
    pass

# Signal threads to exit
termination_flag = 1

# Wait for all threads to complete
for t in threads:
    t.join()
print("Main thread exiting")

"""
Output:
Starting thread: Worker-1
Starting thread: Worker-2
Starting thread: Worker-3
Worker-2 processing Alpha
Worker-3 processing Beta
Worker-1 processing Gamma
Worker-1 processing DeltaWorker-3 processing Epsilon

Exiting thread: Worker-3Exiting thread: Worker-1
Exiting thread: Worker-2
"""

Thread Synchronization

Thread Lock (`Lock`)
  • Locks impact performance; once a lock is acquired by a thread, it must be released, otherwise other threads will wait indefinitely.
  • Locks can cause deadlocks (resources not released, circular waiting).

"""
Locking ensures that a computation and assignment operation completes before yielding resources
---
Thread locking is conceptually similar to transaction locks in databases, making potentially unsafe operations atomic
"""
import threading
lock = threading.Lock()  # Create a lock
counter = 0

def increment():
    global counter
    for i in range(1000000):
        lock.acquire()  # Acquire lock
        counter += 1
        lock.release()  # Release lock

def decrement():
    global counter
    for i in range(1000000):
        lock.acquire()
        counter -= 1
        lock.release()

if __name__ == '__main__':
    print('Starting')
    t1 = threading.Thread(target=increment)
    t2 = threading.Thread(target=decrement)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(counter)
    print('Completed')
Reentrant Lock (`RLock`)

A reentrant lock can be acquired multiple times within the same thread, but each acquisition must be matched with a corresponding release.


from threading import Thread, RLock
from time import sleep

lock = RLock()
containers = [0 for _ in range(10)]  # 10 containers

def fill_container():
    for x in range(10):
        if containers[x] == 0:
            sleep(0.1)  # Filling container takes time
            containers[x] += 1
            return x

def fill_with_lock(L):
    while(True):
        if L.acquire(False):  # Open the container
            fill_container()
            L.release()  # Close the container

threads = [Thread(target=fill_with_lock, args=[lock]) for _ in range(10)]
for t in threads:
    t.start()
Condition Variables

Condition variables implement `__enter__` and `__exit__` methods, allowing them to be used with context managers via the `with` statement.

Semaphores

Semaphores are used to control the number of threads that can access a resource simultaneously, limiting concurrency.

Thread Pools

Thread pools can be implemented using `ThreadPoolExecutor` from the `concurrent.futures` module.

Multiprocessing in Python

Using the `multiprocessing` Module

Process Pools

Process pools can be created using `ProcessPoolExecutor` from `concurrent.futures` or the `Pool` class from the `multiprocessing` module.

Inter-Process Communication
  • Data between processes is isolated and independent.
  • `Queue`: Standard `queue.Queue` cannot be used with `multiprocessing.Process()`. Instead, `multiprocessing.Queue` should be used.
  • `multiprocessing.Queue` cannot be used with process pools. For communication within pools, use `Queue` from `multiprocessing.Manager`.

from queue import Queue
from multiprocessing import Queue
from multiprocessing import manager
Manager().Queue
Pipe Communication

Pipes can only be used for communication between two processes.


from multiprocessing import Process, Pipe

def producer(pipe):
    pipe.send("Sample data")

def consumer(pipe):
    print(pipe.recv())

if __name__ == '__main__':
    receive_pipe, send_pipe = Pipe()
    prod_process = Process(target=producer, args=(send_pipe,))
    cons_process = Process(target=consumer, args=(receive_pipe,))
    prod_process.start()
    cons_process.start()
    prod_process.join()
    cons_process.join()

"""
Output:
Sample data
"""
Shared Memory through Manager

The `Manager` class can be used to maintain shared memory between processes.


from multiprocessing import Process, Manager

def update_shared(shared_dict, key, value):
    shared_dict[key] = value

if __name__ == '__main__':
    shared_dict = Manager().dict()
    process1 = Process(target=update_shared, args=(shared_dict, 1, 'First value'))
    process2 = Process(target=update_shared, args=(shared_dict, 2, 'Second value'))

    process1.start()
    process2.start()
    process1.join()
    process2.join()

    print(shared_dict)

"""
Output:
{1: 'First value', 2: 'Second value'}
"""
Types of Variables Managed by Manager
  • `BoundedSemaphore(value)`
  • `Condition(lock)`
  • `Event()`
  • `Lock()`
  • `Namespace()`
  • `Queue(maxsize)`
  • `RLock()`
  • `Semaphore(value)`
  • `Array(typecode, sequence)`
  • `Value(typecode, value)`
  • `dict(sequence)`
  • `list(sequence)`

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.