Python Async Programming
Async programming in Python allows you to write concurrent code that can handle many operations simultaneously without blocking. This is especially useful for I/O-bound tasks like web requests, database operations, and file operations.
Understanding Async Programming
What is Async Programming?
Async programming is a programming paradigm that allows multiple operations to run concurrently without blocking each other. Instead of waiting for one operation to complete before starting another, async code can switch between tasks efficiently.
Synchronous vs Asynchronous
# Synchronous code (blocking)
import requests
import time
def fetch_url_sync(url):
start_time = time.time()
response = requests.get(url)
end_time = time.time()
print(f"Downloaded {url} in {end_time - start_time:.2f} seconds")
return response.text
urls = ["https://example.com", "https://example.org", "https://example.net"]
for url in urls:
fetch_url_sync(url)
# Each request waits for the previous one to complete# Asynchronous code (non-blocking)
import aiohttp
import asyncio
import time
async def fetch_url_async(session, url):
start_time = time.time()
async with session.get(url) as response:
text = await response.text()
end_time = time.time()
print(f"Downloaded {url} in {end_time - start_time:.2f} seconds")
return text
async def main():
urls = ["https://example.com", "https://example.org", "https://example.net"]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url_async(session, url) for url in urls]
await asyncio.gather(*tasks)
asyncio.run(main())
# All requests run concurrentlyAsync/Await Syntax
Basic Async Function
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"Started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"Finished at {time.strftime('%X')}")
asyncio.run(main())Running Multiple Tasks Concurrently
async def main():
task1 = asyncio.create_task(say_after(1, 'hello'))
task2 = asyncio.create_task(say_after(2, 'world'))
await task1
await task2
# Or using gather
async def main_gather():
await asyncio.gather(
say_after(1, 'hello'),
say_after(2, 'world')
)Async Context Managers
Using Async With
import aiofiles
async def read_file_async(filename):
async with aiofiles.open(filename, 'r') as file:
content = await file.read()
return content
async def write_file_async(filename, content):
async with aiofiles.open(filename, 'w') as file:
await file.write(content)
async def main():
await write_file_async('test.txt', 'Hello, async world!')
content = await read_file_async('test.txt')
print(content)
asyncio.run(main())Custom Async Context Manager
from contextlib import asynccontextmanager
@asynccontextmanager
async def web_page(url):
print(f"Loading {url}...")
# Simulate loading time
await asyncio.sleep(1)
yield f"Content of {url}"
print(f"Closed {url}")
async def main():
async with web_page("https://example.com") as page:
print(f"Reading: {page}")
asyncio.run(main())HTTP Requests with Async
Using aiohttp
import aiohttp
import asyncio
async def fetch_single_url(session, url):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.text()
else:
return f"Error: {response.status}"
except Exception as e:
return f"Exception: {e}"
async def fetch_multiple_urls(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_single_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
urls = [
"https://example.com",
"https://example.org",
"https://example.net"
]
results = await fetch_multiple_urls(urls)
for url, result in zip(urls, results):
print(f"{url}: {len(str(result))} characters")
asyncio.run(main())Advanced HTTP Example with Rate Limiting
import aiohttp
import asyncio
from datetime import datetime
class AsyncRateLimiter:
def __init__(self, rate_limit):
self.rate_limit = rate_limit
self.tokens = rate_limit
self.last_update = datetime.now()
async def acquire(self):
while self.tokens <= 0:
await asyncio.sleep(0.1)
self._update_tokens()
self.tokens -= 1
def _update_tokens(self):
now = datetime.now()
time_passed = (now - self.last_update).total_seconds()
self.tokens = min(self.rate_limit, self.tokens + time_passed)
self.last_update = now
async def fetch_with_rate_limit(session, url, limiter):
await limiter.acquire()
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
return f"Error fetching {url}: {e}"
async def main():
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 11)]
limiter = AsyncRateLimiter(rate_limit=2) # 2 requests per second
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_rate_limit(session, url, limiter) for url in urls]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results, 1):
print(f"Post {i}: {len(result)} characters")
asyncio.run(main())Database Operations with Async
Async SQLite
import aiosqlite
import asyncio
async def create_database():
async with aiosqlite.connect("example.db") as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE
)
""")
await db.commit()
async def add_user(name, email):
async with aiosqlite.connect("example.db") as db:
await db.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
(name, email)
)
await db.commit()
async def get_user(user_id):
async with aiosqlite.connect("example.db") as db:
cursor = await db.execute(
"SELECT * FROM users WHERE id = ?",
(user_id,)
)
return await cursor.fetchone()
async def get_all_users():
async with aiosqlite.connect("example.db") as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("SELECT * FROM users")
return await cursor.fetchall()
async def main():
await create_database()
# Add users
await add_user("Alice", "[email protected]")
await add_user("Bob", "[email protected]")
await add_user("Charlie", "[email protected]")
# Get all users
users = await get_all_users()
for user in users:
print(f"ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")
asyncio.run(main())Async PostgreSQL with asyncpg
import asyncpg
import asyncio
async def setup_database():
conn = await asyncpg.connect(
user="user",
password="password",
database="testdb",
host="localhost"
)
await conn.execute("""
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
price DECIMAL(10, 2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
await conn.close()
async def add_product(name, price):
conn = await asyncpg.connect(
user="user",
password="password",
database="testdb",
host="localhost"
)
await conn.execute(
"INSERT INTO products (name, price) VALUES ($1, $2)",
name, price
)
await conn.close()
async def get_products():
conn = await asyncpg.connect(
user="user",
password="password",
database="testdb",
host="localhost"
)
rows = await conn.fetch("SELECT * FROM products ORDER BY created_at DESC")
await conn.close()
return rows
async def main():
await setup_database()
# Add products
await add_product("Laptop", 999.99)
await add_product("Mouse", 29.99)
await add_product("Keyboard", 79.99)
# Get products
products = await get_products()
for product in products:
print(f"Product: {product['name']}, Price: ${product['price']}")
asyncio.run(main())Web Servers with Async
Async Web Server with aiohttp
from aiohttp import web
import asyncio
import json
async def handle_hello(request):
return web.Response(text="Hello, World!")
async def handle_users(request):
users = [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Charlie"}
]
return web.json_response(users)
async def handle_user(request):
user_id = int(request.match_info['id'])
users = {
1: {"id": 1, "name": "Alice"},
2: {"id": 2, "name": "Bob"},
3: {"id": 3, "name": "Charlie"}
}
if user_id in users:
return web.json_response(users[user_id])
else:
return web.json_response({"error": "User not found"}, status=404)
async def handle_slow(request):
# Simulate slow operation
await asyncio.sleep(2)
return web.Response(text="This took 2 seconds to load!")
async def init_app():
app = web.Application()
app.router.add_get('/', handle_hello)
app.router.add_get('/users', handle_users)
app.router.add_get('/users/{id}', handle_user)
app.router.add_get('/slow', handle_slow)
return app
if __name__ == '__main__':
app = init_app()
web.run_app(app, host='localhost', port=8080)Async Web Framework with FastAPI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
import random
app = FastAPI()
class Item(BaseModel):
id: int
name: str
price: float
# Simulated database
items_db = []
@app.get("/")
async def read_root():
return {"message": "Welcome to Async FastAPI"}
@app.get("/items/", response_model=list[Item])
async def read_items():
# Simulate async database query
await asyncio.sleep(0.1)
return items_db
@app.get("/items/{item_id}", response_model=Item)
async def read_item(item_id: int):
# Simulate async lookup
await asyncio.sleep(0.05)
item = next((item for item in items_db if item.id == item_id), None)
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
@app.post("/items/", response_model=Item)
async def create_item(item: Item):
# Simulate async database insert
await asyncio.sleep(0.1)
items_db.append(item)
return item
@app.get("/random-delay")
async def random_delay():
delay = random.uniform(0.1, 2.0)
await asyncio.sleep(delay)
return {"delay": delay, "message": f"Waited {delay:.2f} seconds"}Error Handling in Async
Try-Catch with Async
import asyncio
import aiohttp
async def fetch_with_retry(url, max_retries=3):
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
return await response.text()
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt # Exponential backoff
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
async def main():
try:
content = await fetch_with_retry("https://example.com")
print(f"Success: {len(content)} characters")
except Exception as e:
print(f"Failed after retries: {e}")
asyncio.run(main())Handling Multiple Exceptions
async def risky_operation(name):
if name == "fail":
raise ValueError(f"Operation {name} failed")
await asyncio.sleep(0.1)
return f"Success: {name}"
async def main():
tasks = [
risky_operation("task1"),
risky_operation("fail"),
risky_operation("task3")
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"Result: {result}")
asyncio.run(main())Performance and Best Practices
Event Loop Management
import asyncio
async def long_running_task():
print("Starting long task...")
await asyncio.sleep(5)
print("Long task completed!")
return "Done"
async def main():
# Set custom event loop policy if needed
# asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# Run with timeout
try:
result = await asyncio.wait_for(long_running_task(), timeout=3)
print(result)
except asyncio.TimeoutError:
print("Task timed out!")
# Cancel tasks
task = asyncio.create_task(long_running_task())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled!")
asyncio.run(main())Async Iterators
class AsyncCounter:
def __init__(self, limit):
self.limit = limit
self.count = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.count < self.limit:
await asyncio.sleep(0.1) # Simulate async work
self.count += 1
return self.count
else:
raise StopAsyncIteration
async def main():
async for number in AsyncCounter(10):
print(f"Count: {number}")
asyncio.run(main())Async Generators
async def async_generator():
for i in range(5):
await asyncio.sleep(0.5)
yield f"Item {i}"
async def main():
async for item in async_generator():
print(item)
asyncio.run(main())
# Collect all items
async def collect_items():
items = [item async for item in async_generator()]
return items
asyncio.run(collect_items())Testing Async Code
Unit Testing Async Functions
import unittest
import asyncio
from unittest.mock import AsyncMock, patch
async def fetch_data(url):
# Simulate async data fetching
await asyncio.sleep(0.1)
return f"Data from {url}"
class TestAsyncFunctions(unittest.TestCase):
def test_fetch_data(self):
async def run_test():
result = await fetch_data("https://example.com")
self.assertEqual(result, "Data from https://example.com")
asyncio.run(run_test())
@patch('aiohttp.ClientSession.get')
async def test_with_mock(self, mock_get):
# Mock async response
mock_response = AsyncMock()
mock_response.text.return_value = "Mocked data"
mock_response.status = 200
mock_get.return_value.__aenter__.return_value = mock_response
result = await fetch_data("https://example.com")
self.assertEqual(result, "Data from https://example.com")
if __name__ == '__main__':
unittest.main()When to Use Async Programming
Good Use Cases for Async
- Web scraping - Multiple HTTP requests
- Web servers - Handling many concurrent connections
- Database operations - Multiple database queries
- File I/O - Reading/writing multiple files
- API integrations - Multiple external service calls
When NOT to Use Async
- CPU-intensive tasks - Use multiprocessing instead
- Simple scripts - Overkill for basic operations
- Legacy code - May require significant refactoring
Mixing Sync and Async Code
import asyncio
import time
def sync_heavy_computation(n):
# CPU-intensive operation
result = 0
for i in range(n):
result += i ** 2
return result
async def async_wrapper(func, *args):
# Run sync function in thread pool
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, func, *args)
async def main():
# Run CPU-intensive work without blocking
result = await async_wrapper(sync_heavy_computation, 1000000)
print(f"Computation result: {result}")
asyncio.run(main())External Resources:
Related Tutorials:
Last updated on