Python Threading
Threading allows Python programs to run multiple operations concurrently. The threading module provides tools for creating and managing threads, enabling parallel execution and better resource utilization.
Threading Basics
Threads are lightweight processes that run concurrently within a single process.
Creating Threads
import threading
import time
def worker(thread_name, delay):
"""Function to be executed by thread."""
print(f"{thread_name} starting")
time.sleep(delay)
print(f"{thread_name} finished")
# Create and start threads
thread1 = threading.Thread(target=worker, args=("Thread-1", 2))
thread2 = threading.Thread(target=worker, args=("Thread-2", 3))
print("Main thread: Starting threads")
thread1.start()
thread2.start()
# Wait for threads to complete
thread1.join()
thread2.join()
print("Main thread: All threads completed")threading.Thread() creates a thread. start() begins execution, join() waits for completion.
Thread Subclassing
import threading
import time
class WorkerThread(threading.Thread):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay
def run(self):
"""Override run method."""
print(f"{self.name} starting")
time.sleep(self.delay)
print(f"{self.name} finished")
# Create and start threads
threads = []
for i in range(3):
t = WorkerThread(f"Worker-{i+1}", i + 1)
threads.append(t)
t.start()
# Wait for all threads
for t in threads:
t.join()
print("All worker threads completed")Subclass Thread and override run() for custom thread behavior.
Synchronization
Prevent race conditions with synchronization primitives.
Locks
import threading
import time
counter = 0
lock = threading.Lock()
def increment_counter(thread_id):
global counter
for _ in range(100000):
# Acquire lock before modifying shared resource
with lock:
counter += 1
print(f"Thread {thread_id} completed")
# Create threads
threads = []
for i in range(3):
t = threading.Thread(target=increment_counter, args=(i,))
threads.append(t)
t.start()
# Wait for completion
for t in threads:
t.join()
print(f"Final counter value: {counter}") # Should be 300000Use Lock to synchronize access to shared resources. with statement automatically acquires and releases.
Race Condition Example
import threading
# Shared resource
shared_list = []
def add_to_list(thread_id):
for i in range(100):
# No synchronization - race condition!
shared_list.append(f"Thread-{thread_id}-Item-{i}")
# Without synchronization
threads = []
for i in range(3):
t = threading.Thread(target=add_to_list, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"List length: {len(shared_list)}") # May not be 300!
print("Items may be interleaved or missing due to race condition")Without synchronization, concurrent access can corrupt shared data.
Other Synchronization Tools
import threading
import time
# Semaphore - limits concurrent access
semaphore = threading.Semaphore(2) # Allow 2 threads at once
def limited_resource(thread_id):
with semaphore:
print(f"Thread {thread_id} acquired resource")
time.sleep(1)
print(f"Thread {thread_id} released resource")
# Event - signal between threads
event = threading.Event()
def waiter():
print("Waiting for event...")
event.wait() # Block until event is set
print("Event received!")
def setter():
time.sleep(2)
print("Setting event")
event.set() # Unblock waiting threads
# Condition - wait for condition
condition = threading.Condition()
shared_data = []
def producer():
with condition:
for i in range(5):
shared_data.append(f"Item-{i}")
condition.notify() # Notify waiting consumer
time.sleep(0.5)
def consumer():
with condition:
while len(shared_data) < 5:
condition.wait() # Wait for notification
if shared_data:
item = shared_data.pop(0)
print(f"Consumed: {item}")
# Run examples
print("Semaphore example:")
threads = [threading.Thread(target=limited_resource, args=(i,)) for i in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print("\nEvent example:")
threading.Thread(target=waiter).start()
threading.Thread(target=setter).start()
print("\nCondition example:")
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()Use appropriate synchronization tools for different scenarios.
Thread Communication
Exchange data between threads safely.
Queues
import threading
import queue
import time
import random
def producer(queue, thread_id):
"""Produce items and put them in queue."""
for i in range(5):
item = f"Item-{thread_id}-{i}"
queue.put(item)
print(f"Produced: {item}")
time.sleep(random.random())
queue.put(None) # Signal end of production
def consumer(queue, thread_id):
"""Consume items from queue."""
while True:
item = queue.get()
if item is None: # Check for end signal
queue.put(None) # Put back for other consumers
break
print(f"Consumed by {thread_id}: {item}")
time.sleep(random.random())
# Thread-safe queue
item_queue = queue.Queue()
# Start producer and consumers
producer_thread = threading.Thread(target=producer, args=(item_queue, 1))
consumer_threads = [
threading.Thread(target=consumer, args=(item_queue, i))
for i in range(2)
]
producer_thread.start()
for t in consumer_threads: t.start()
producer_thread.join()
for t in consumer_threads: t.join()
print("Producer-consumer example completed")queue.Queue provides thread-safe communication between threads.
Thread Pools
Manage multiple threads efficiently.
ThreadPoolExecutor
import concurrent.futures
import time
import random
def process_item(item_id):
"""Simulate processing time."""
delay = random.uniform(0.1, 1.0)
time.sleep(delay)
return f"Processed item {item_id} in {delay:.2f} seconds"
# Create thread pool
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks
futures = [executor.submit(process_item, i) for i in range(10)]
# Collect results as they complete
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(result)
print("All tasks completed")ThreadPoolExecutor manages a pool of threads automatically.
Map Function
import concurrent.futures
import math
def calculate_factorial(n):
"""Calculate factorial with some delay."""
time.sleep(0.1) # Simulate work
return math.factorial(n)
numbers = list(range(1, 11)) # 1! through 10!
with concurrent.futures.ThreadPoolExecutor() as executor:
# Map function to all items
results = executor.map(calculate_factorial, numbers)
print("Factorials:")
for num, fact in zip(numbers, results):
print(f"{num}! = {fact}")executor.map() applies a function to all items concurrently.
Daemon Threads
Background threads that don’t prevent program exit.
import threading
import time
def background_worker():
"""Daemon thread that runs in background."""
while True:
print("Background work...")
time.sleep(1)
def main_task():
"""Main thread task."""
for i in range(5):
print(f"Main task: {i}")
time.sleep(1)
# Create daemon thread
daemon_thread = threading.Thread(target=background_worker)
daemon_thread.daemon = True # Mark as daemon
daemon_thread.start()
# Main task
main_task()
print("Main thread finished - daemon thread will be terminated")Daemon threads exit when the main thread ends.
Thread Safety
Ensure code works correctly with multiple threads.
Thread-Local Storage
import threading
import time
# Thread-local storage
local_data = threading.local()
def worker(thread_id):
# Each thread has its own copy of local_data.value
local_data.value = thread_id * 10
time.sleep(0.1) # Simulate work
print(f"Thread {thread_id}: local_data.value = {local_data.value}")
# Create threads
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("All threads completed")threading.local() provides thread-specific storage.
Common Patterns
Producer-Consumer Pattern
import threading
import queue
import time
import random
def producer(queue, items_to_produce):
for i in range(items_to_produce):
item = f"Item-{i}"
queue.put(item)
print(f"Produced: {item}")
time.sleep(random.uniform(0.1, 0.5))
queue.put(None) # Signal end
def consumer(queue, consumer_id):
while True:
item = queue.get()
if item is None:
queue.put(None) # Put back for other consumers
break
print(f"Consumer {consumer_id} processing: {item}")
time.sleep(random.uniform(0.2, 0.8))
# Create shared queue
work_queue = queue.Queue(maxsize=5) # Limit queue size
# Start producer
producer_thread = threading.Thread(target=producer, args=(work_queue, 10))
# Start consumers
consumer_threads = [
threading.Thread(target=consumer, args=(work_queue, i))
for i in range(3)
]
producer_thread.start()
for t in consumer_threads: t.start()
producer_thread.join()
for t in consumer_threads: t.join()
print("Producer-consumer pattern completed")Classic pattern for dividing work between threads.
Performance Considerations
GIL Limitation
import threading
import time
import multiprocessing
def cpu_intensive_task(n):
"""CPU-bound task."""
result = 0
for i in range(n):
result += i ** 2
return result
# Single-threaded
start = time.time()
result1 = cpu_intensive_task(1000000)
time1 = time.time() - start
# Multi-threaded (limited by GIL)
start = time.time()
threads = []
results = []
def thread_worker(n, results_list):
results_list.append(cpu_intensive_task(n))
for _ in range(4):
t = threading.Thread(target=thread_worker, args=(250000, results))
threads.append(t)
t.start()
for t in threads: t.join()
time2 = time.time() - start
print(f"Single-threaded: {time1:.2f}s")
print(f"Multi-threaded: {time2:.2f}s")
print(f"Speedup: {time1/time2:.2f}x (limited by GIL)")
# For CPU-bound tasks, use multiprocessing insteadPython’s GIL limits threading performance for CPU-bound tasks.
Best Practices
- Use threading for I/O-bound tasks, multiprocessing for CPU-bound
- Always synchronize access to shared resources
- Use thread pools for managing multiple threads
- Handle exceptions in threads properly
- Use queues for thread communication
- Set appropriate daemon flags
- Avoid global variables when possible
External Resources:
Related Tutorials: