Python Threading

Python Threading

Threading allows Python programs to run multiple operations concurrently. The threading module provides tools for creating and managing threads, enabling parallel execution and better resource utilization.

Threading Basics

Threads are lightweight processes that run concurrently within a single process.

Creating Threads

import threading
import time

def worker(thread_name, delay):
    """Function to be executed by thread."""
    print(f"{thread_name} starting")
    time.sleep(delay)
    print(f"{thread_name} finished")

# Create and start threads
thread1 = threading.Thread(target=worker, args=("Thread-1", 2))
thread2 = threading.Thread(target=worker, args=("Thread-2", 3))

print("Main thread: Starting threads")
thread1.start()
thread2.start()

# Wait for threads to complete
thread1.join()
thread2.join()

print("Main thread: All threads completed")

threading.Thread() creates a thread. start() begins execution, join() waits for completion.

Thread Subclassing

import threading
import time

class WorkerThread(threading.Thread):
    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay
    
    def run(self):
        """Override run method."""
        print(f"{self.name} starting")
        time.sleep(self.delay)
        print(f"{self.name} finished")

# Create and start threads
threads = []
for i in range(3):
    t = WorkerThread(f"Worker-{i+1}", i + 1)
    threads.append(t)
    t.start()

# Wait for all threads
for t in threads:
    t.join()

print("All worker threads completed")

Subclass Thread and override run() for custom thread behavior.

Synchronization

Prevent race conditions with synchronization primitives.

Locks

import threading
import time

counter = 0
lock = threading.Lock()

def increment_counter(thread_id):
    global counter
    
    for _ in range(100000):
        # Acquire lock before modifying shared resource
        with lock:
            counter += 1
    
    print(f"Thread {thread_id} completed")

# Create threads
threads = []
for i in range(3):
    t = threading.Thread(target=increment_counter, args=(i,))
    threads.append(t)
    t.start()

# Wait for completion
for t in threads:
    t.join()

print(f"Final counter value: {counter}")  # Should be 300000

Use Lock to synchronize access to shared resources. with statement automatically acquires and releases.

Race Condition Example

import threading

# Shared resource
shared_list = []

def add_to_list(thread_id):
    for i in range(100):
        # No synchronization - race condition!
        shared_list.append(f"Thread-{thread_id}-Item-{i}")

# Without synchronization
threads = []
for i in range(3):
    t = threading.Thread(target=add_to_list, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"List length: {len(shared_list)}")  # May not be 300!
print("Items may be interleaved or missing due to race condition")

Without synchronization, concurrent access can corrupt shared data.

Other Synchronization Tools

import threading
import time

# Semaphore - limits concurrent access
semaphore = threading.Semaphore(2)  # Allow 2 threads at once

def limited_resource(thread_id):
    with semaphore:
        print(f"Thread {thread_id} acquired resource")
        time.sleep(1)
        print(f"Thread {thread_id} released resource")

# Event - signal between threads
event = threading.Event()

def waiter():
    print("Waiting for event...")
    event.wait()  # Block until event is set
    print("Event received!")

def setter():
    time.sleep(2)
    print("Setting event")
    event.set()  # Unblock waiting threads

# Condition - wait for condition
condition = threading.Condition()
shared_data = []

def producer():
    with condition:
        for i in range(5):
            shared_data.append(f"Item-{i}")
            condition.notify()  # Notify waiting consumer
            time.sleep(0.5)

def consumer():
    with condition:
        while len(shared_data) < 5:
            condition.wait()  # Wait for notification
            if shared_data:
                item = shared_data.pop(0)
                print(f"Consumed: {item}")

# Run examples
print("Semaphore example:")
threads = [threading.Thread(target=limited_resource, args=(i,)) for i in range(5)]
for t in threads: t.start()
for t in threads: t.join()

print("\nEvent example:")
threading.Thread(target=waiter).start()
threading.Thread(target=setter).start()

print("\nCondition example:")
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

Use appropriate synchronization tools for different scenarios.

Thread Communication

Exchange data between threads safely.

Queues

import threading
import queue
import time
import random

def producer(queue, thread_id):
    """Produce items and put them in queue."""
    for i in range(5):
        item = f"Item-{thread_id}-{i}"
        queue.put(item)
        print(f"Produced: {item}")
        time.sleep(random.random())
    
    queue.put(None)  # Signal end of production

def consumer(queue, thread_id):
    """Consume items from queue."""
    while True:
        item = queue.get()
        if item is None:  # Check for end signal
            queue.put(None)  # Put back for other consumers
            break
        print(f"Consumed by {thread_id}: {item}")
        time.sleep(random.random())

# Thread-safe queue
item_queue = queue.Queue()

# Start producer and consumers
producer_thread = threading.Thread(target=producer, args=(item_queue, 1))
consumer_threads = [
    threading.Thread(target=consumer, args=(item_queue, i)) 
    for i in range(2)
]

producer_thread.start()
for t in consumer_threads: t.start()

producer_thread.join()
for t in consumer_threads: t.join()

print("Producer-consumer example completed")

queue.Queue provides thread-safe communication between threads.

Thread Pools

Manage multiple threads efficiently.

ThreadPoolExecutor

import concurrent.futures
import time
import random

def process_item(item_id):
    """Simulate processing time."""
    delay = random.uniform(0.1, 1.0)
    time.sleep(delay)
    return f"Processed item {item_id} in {delay:.2f} seconds"

# Create thread pool
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks
    futures = [executor.submit(process_item, i) for i in range(10)]
    
    # Collect results as they complete
    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        print(result)

print("All tasks completed")

ThreadPoolExecutor manages a pool of threads automatically.

Map Function

import concurrent.futures
import math

def calculate_factorial(n):
    """Calculate factorial with some delay."""
    time.sleep(0.1)  # Simulate work
    return math.factorial(n)

numbers = list(range(1, 11))  # 1! through 10!

with concurrent.futures.ThreadPoolExecutor() as executor:
    # Map function to all items
    results = executor.map(calculate_factorial, numbers)

print("Factorials:")
for num, fact in zip(numbers, results):
    print(f"{num}! = {fact}")

executor.map() applies a function to all items concurrently.

Daemon Threads

Background threads that don’t prevent program exit.

import threading
import time

def background_worker():
    """Daemon thread that runs in background."""
    while True:
        print("Background work...")
        time.sleep(1)

def main_task():
    """Main thread task."""
    for i in range(5):
        print(f"Main task: {i}")
        time.sleep(1)

# Create daemon thread
daemon_thread = threading.Thread(target=background_worker)
daemon_thread.daemon = True  # Mark as daemon
daemon_thread.start()

# Main task
main_task()

print("Main thread finished - daemon thread will be terminated")

Daemon threads exit when the main thread ends.

Thread Safety

Ensure code works correctly with multiple threads.

Thread-Local Storage

import threading
import time

# Thread-local storage
local_data = threading.local()

def worker(thread_id):
    # Each thread has its own copy of local_data.value
    local_data.value = thread_id * 10
    
    time.sleep(0.1)  # Simulate work
    
    print(f"Thread {thread_id}: local_data.value = {local_data.value}")

# Create threads
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("All threads completed")

threading.local() provides thread-specific storage.

Common Patterns

Producer-Consumer Pattern

import threading
import queue
import time
import random

def producer(queue, items_to_produce):
    for i in range(items_to_produce):
        item = f"Item-{i}"
        queue.put(item)
        print(f"Produced: {item}")
        time.sleep(random.uniform(0.1, 0.5))
    
    queue.put(None)  # Signal end

def consumer(queue, consumer_id):
    while True:
        item = queue.get()
        if item is None:
            queue.put(None)  # Put back for other consumers
            break
        
        print(f"Consumer {consumer_id} processing: {item}")
        time.sleep(random.uniform(0.2, 0.8))

# Create shared queue
work_queue = queue.Queue(maxsize=5)  # Limit queue size

# Start producer
producer_thread = threading.Thread(target=producer, args=(work_queue, 10))

# Start consumers
consumer_threads = [
    threading.Thread(target=consumer, args=(work_queue, i))
    for i in range(3)
]

producer_thread.start()
for t in consumer_threads: t.start()

producer_thread.join()
for t in consumer_threads: t.join()

print("Producer-consumer pattern completed")

Classic pattern for dividing work between threads.

Performance Considerations

GIL Limitation

import threading
import time
import multiprocessing

def cpu_intensive_task(n):
    """CPU-bound task."""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

# Single-threaded
start = time.time()
result1 = cpu_intensive_task(1000000)
time1 = time.time() - start

# Multi-threaded (limited by GIL)
start = time.time()
threads = []
results = []

def thread_worker(n, results_list):
    results_list.append(cpu_intensive_task(n))

for _ in range(4):
    t = threading.Thread(target=thread_worker, args=(250000, results))
    threads.append(t)
    t.start()

for t in threads: t.join()
time2 = time.time() - start

print(f"Single-threaded: {time1:.2f}s")
print(f"Multi-threaded: {time2:.2f}s")
print(f"Speedup: {time1/time2:.2f}x (limited by GIL)")

# For CPU-bound tasks, use multiprocessing instead

Python’s GIL limits threading performance for CPU-bound tasks.

Best Practices

  1. Use threading for I/O-bound tasks, multiprocessing for CPU-bound
  2. Always synchronize access to shared resources
  3. Use thread pools for managing multiple threads
  4. Handle exceptions in threads properly
  5. Use queues for thread communication
  6. Set appropriate daemon flags
  7. Avoid global variables when possible

External Resources:

Related Tutorials:

Last updated on