Python Async Programming

Python Async Programming

Async programming in Python allows you to write concurrent code that can handle many operations simultaneously without blocking. This is especially useful for I/O-bound tasks like web requests, database operations, and file operations.

Understanding Async Programming

What is Async Programming?

Async programming is a programming paradigm that allows multiple operations to run concurrently without blocking each other. Instead of waiting for one operation to complete before starting another, async code can switch between tasks efficiently.

Synchronous vs Asynchronous

# Synchronous code (blocking)
import requests
import time

def fetch_url_sync(url):
    start_time = time.time()
    response = requests.get(url)
    end_time = time.time()
    print(f"Downloaded {url} in {end_time - start_time:.2f} seconds")
    return response.text

urls = ["https://example.com", "https://example.org", "https://example.net"]
for url in urls:
    fetch_url_sync(url)

# Each request waits for the previous one to complete
# Asynchronous code (non-blocking)
import aiohttp
import asyncio
import time

async def fetch_url_async(session, url):
    start_time = time.time()
    async with session.get(url) as response:
        text = await response.text()
        end_time = time.time()
        print(f"Downloaded {url} in {end_time - start_time:.2f} seconds")
        return text

async def main():
    urls = ["https://example.com", "https://example.org", "https://example.net"]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url_async(session, url) for url in urls]
        await asyncio.gather(*tasks)

asyncio.run(main())

# All requests run concurrently

Async/Await Syntax

Basic Async Function

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"Started at {time.strftime('%X')}")
    
    await say_after(1, 'hello')
    await say_after(2, 'world')
    
    print(f"Finished at {time.strftime('%X')}")

asyncio.run(main())

Running Multiple Tasks Concurrently

async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    
    await task1
    await task2

# Or using gather
async def main_gather():
    await asyncio.gather(
        say_after(1, 'hello'),
        say_after(2, 'world')
    )

Async Context Managers

Using Async With

import aiofiles

async def read_file_async(filename):
    async with aiofiles.open(filename, 'r') as file:
        content = await file.read()
        return content

async def write_file_async(filename, content):
    async with aiofiles.open(filename, 'w') as file:
        await file.write(content)

async def main():
    await write_file_async('test.txt', 'Hello, async world!')
    content = await read_file_async('test.txt')
    print(content)

asyncio.run(main())

Custom Async Context Manager

from contextlib import asynccontextmanager

@asynccontextmanager
async def web_page(url):
    print(f"Loading {url}...")
    # Simulate loading time
    await asyncio.sleep(1)
    yield f"Content of {url}"
    print(f"Closed {url}")

async def main():
    async with web_page("https://example.com") as page:
        print(f"Reading: {page}")

asyncio.run(main())

HTTP Requests with Async

Using aiohttp

import aiohttp
import asyncio

async def fetch_single_url(session, url):
    try:
        async with session.get(url) as response:
            if response.status == 200:
                return await response.text()
            else:
                return f"Error: {response.status}"
    except Exception as e:
        return f"Exception: {e}"

async def fetch_multiple_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_single_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

async def main():
    urls = [
        "https://example.com",
        "https://example.org", 
        "https://example.net"
    ]
    
    results = await fetch_multiple_urls(urls)
    
    for url, result in zip(urls, results):
        print(f"{url}: {len(str(result))} characters")

asyncio.run(main())

Advanced HTTP Example with Rate Limiting

import aiohttp
import asyncio
from datetime import datetime

class AsyncRateLimiter:
    def __init__(self, rate_limit):
        self.rate_limit = rate_limit
        self.tokens = rate_limit
        self.last_update = datetime.now()
    
    async def acquire(self):
        while self.tokens <= 0:
            await asyncio.sleep(0.1)
            self._update_tokens()
        
        self.tokens -= 1
    
    def _update_tokens(self):
        now = datetime.now()
        time_passed = (now - self.last_update).total_seconds()
        self.tokens = min(self.rate_limit, self.tokens + time_passed)
        self.last_update = now

async def fetch_with_rate_limit(session, url, limiter):
    await limiter.acquire()
    try:
        async with session.get(url) as response:
            return await response.text()
    except Exception as e:
        return f"Error fetching {url}: {e}"

async def main():
    urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 11)]
    limiter = AsyncRateLimiter(rate_limit=2)  # 2 requests per second
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_rate_limit(session, url, limiter) for url in urls]
        results = await asyncio.gather(*tasks)
    
    for i, result in enumerate(results, 1):
        print(f"Post {i}: {len(result)} characters")

asyncio.run(main())

Database Operations with Async

Async SQLite

import aiosqlite
import asyncio

async def create_database():
    async with aiosqlite.connect("example.db") as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT UNIQUE
            )
        """)
        await db.commit()

async def add_user(name, email):
    async with aiosqlite.connect("example.db") as db:
        await db.execute(
            "INSERT INTO users (name, email) VALUES (?, ?)",
            (name, email)
        )
        await db.commit()

async def get_user(user_id):
    async with aiosqlite.connect("example.db") as db:
        cursor = await db.execute(
            "SELECT * FROM users WHERE id = ?",
            (user_id,)
        )
        return await cursor.fetchone()

async def get_all_users():
    async with aiosqlite.connect("example.db") as db:
        db.row_factory = aiosqlite.Row
        cursor = await db.execute("SELECT * FROM users")
        return await cursor.fetchall()

async def main():
    await create_database()
    
    # Add users
    await add_user("Alice", "[email protected]")
    await add_user("Bob", "[email protected]")
    await add_user("Charlie", "[email protected]")
    
    # Get all users
    users = await get_all_users()
    for user in users:
        print(f"ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")

asyncio.run(main())

Async PostgreSQL with asyncpg

import asyncpg
import asyncio

async def setup_database():
    conn = await asyncpg.connect(
        user="user",
        password="password",
        database="testdb",
        host="localhost"
    )
    
    await conn.execute("""
        CREATE TABLE IF NOT EXISTS products (
            id SERIAL PRIMARY KEY,
            name VARCHAR(100) NOT NULL,
            price DECIMAL(10, 2),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    
    await conn.close()

async def add_product(name, price):
    conn = await asyncpg.connect(
        user="user",
        password="password", 
        database="testdb",
        host="localhost"
    )
    
    await conn.execute(
        "INSERT INTO products (name, price) VALUES ($1, $2)",
        name, price
    )
    
    await conn.close()

async def get_products():
    conn = await asyncpg.connect(
        user="user",
        password="password",
        database="testdb", 
        host="localhost"
    )
    
    rows = await conn.fetch("SELECT * FROM products ORDER BY created_at DESC")
    await conn.close()
    return rows

async def main():
    await setup_database()
    
    # Add products
    await add_product("Laptop", 999.99)
    await add_product("Mouse", 29.99)
    await add_product("Keyboard", 79.99)
    
    # Get products
    products = await get_products()
    for product in products:
        print(f"Product: {product['name']}, Price: ${product['price']}")

asyncio.run(main())

Web Servers with Async

Async Web Server with aiohttp

from aiohttp import web
import asyncio
import json

async def handle_hello(request):
    return web.Response(text="Hello, World!")

async def handle_users(request):
    users = [
        {"id": 1, "name": "Alice"},
        {"id": 2, "name": "Bob"},
        {"id": 3, "name": "Charlie"}
    ]
    return web.json_response(users)

async def handle_user(request):
    user_id = int(request.match_info['id'])
    users = {
        1: {"id": 1, "name": "Alice"},
        2: {"id": 2, "name": "Bob"},
        3: {"id": 3, "name": "Charlie"}
    }
    
    if user_id in users:
        return web.json_response(users[user_id])
    else:
        return web.json_response({"error": "User not found"}, status=404)

async def handle_slow(request):
    # Simulate slow operation
    await asyncio.sleep(2)
    return web.Response(text="This took 2 seconds to load!")

async def init_app():
    app = web.Application()
    
    app.router.add_get('/', handle_hello)
    app.router.add_get('/users', handle_users)
    app.router.add_get('/users/{id}', handle_user)
    app.router.add_get('/slow', handle_slow)
    
    return app

if __name__ == '__main__':
    app = init_app()
    web.run_app(app, host='localhost', port=8080)

Async Web Framework with FastAPI

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
import random

app = FastAPI()

class Item(BaseModel):
    id: int
    name: str
    price: float

# Simulated database
items_db = []

@app.get("/")
async def read_root():
    return {"message": "Welcome to Async FastAPI"}

@app.get("/items/", response_model=list[Item])
async def read_items():
    # Simulate async database query
    await asyncio.sleep(0.1)
    return items_db

@app.get("/items/{item_id}", response_model=Item)
async def read_item(item_id: int):
    # Simulate async lookup
    await asyncio.sleep(0.05)
    
    item = next((item for item in items_db if item.id == item_id), None)
    if item is None:
        raise HTTPException(status_code=404, detail="Item not found")
    return item

@app.post("/items/", response_model=Item)
async def create_item(item: Item):
    # Simulate async database insert
    await asyncio.sleep(0.1)
    items_db.append(item)
    return item

@app.get("/random-delay")
async def random_delay():
    delay = random.uniform(0.1, 2.0)
    await asyncio.sleep(delay)
    return {"delay": delay, "message": f"Waited {delay:.2f} seconds"}

Error Handling in Async

Try-Catch with Async

import asyncio
import aiohttp

async def fetch_with_retry(url, max_retries=3):
    for attempt in range(max_retries):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    response.raise_for_status()
                    return await response.text()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            wait_time = 2 ** attempt  # Exponential backoff
            print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
            await asyncio.sleep(wait_time)

async def main():
    try:
        content = await fetch_with_retry("https://example.com")
        print(f"Success: {len(content)} characters")
    except Exception as e:
        print(f"Failed after retries: {e}")

asyncio.run(main())

Handling Multiple Exceptions

async def risky_operation(name):
    if name == "fail":
        raise ValueError(f"Operation {name} failed")
    await asyncio.sleep(0.1)
    return f"Success: {name}"

async def main():
    tasks = [
        risky_operation("task1"),
        risky_operation("fail"),
        risky_operation("task3")
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"Result: {result}")

asyncio.run(main())

Performance and Best Practices

Event Loop Management

import asyncio

async def long_running_task():
    print("Starting long task...")
    await asyncio.sleep(5)
    print("Long task completed!")
    return "Done"

async def main():
    # Set custom event loop policy if needed
    # asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    
    # Run with timeout
    try:
        result = await asyncio.wait_for(long_running_task(), timeout=3)
        print(result)
    except asyncio.TimeoutError:
        print("Task timed out!")
    
    # Cancel tasks
    task = asyncio.create_task(long_running_task())
    await asyncio.sleep(1)
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled!")

asyncio.run(main())

Async Iterators

class AsyncCounter:
    def __init__(self, limit):
        self.limit = limit
        self.count = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.count < self.limit:
            await asyncio.sleep(0.1)  # Simulate async work
            self.count += 1
            return self.count
        else:
            raise StopAsyncIteration

async def main():
    async for number in AsyncCounter(10):
        print(f"Count: {number}")

asyncio.run(main())

Async Generators

async def async_generator():
    for i in range(5):
        await asyncio.sleep(0.5)
        yield f"Item {i}"

async def main():
    async for item in async_generator():
        print(item)

asyncio.run(main())

# Collect all items
async def collect_items():
    items = [item async for item in async_generator()]
    return items

asyncio.run(collect_items())

Testing Async Code

Unit Testing Async Functions

import unittest
import asyncio
from unittest.mock import AsyncMock, patch

async def fetch_data(url):
    # Simulate async data fetching
    await asyncio.sleep(0.1)
    return f"Data from {url}"

class TestAsyncFunctions(unittest.TestCase):
    def test_fetch_data(self):
        async def run_test():
            result = await fetch_data("https://example.com")
            self.assertEqual(result, "Data from https://example.com")
        
        asyncio.run(run_test())
    
    @patch('aiohttp.ClientSession.get')
    async def test_with_mock(self, mock_get):
        # Mock async response
        mock_response = AsyncMock()
        mock_response.text.return_value = "Mocked data"
        mock_response.status = 200
        mock_get.return_value.__aenter__.return_value = mock_response
        
        result = await fetch_data("https://example.com")
        self.assertEqual(result, "Data from https://example.com")

if __name__ == '__main__':
    unittest.main()

When to Use Async Programming

Good Use Cases for Async

  1. Web scraping - Multiple HTTP requests
  2. Web servers - Handling many concurrent connections
  3. Database operations - Multiple database queries
  4. File I/O - Reading/writing multiple files
  5. API integrations - Multiple external service calls

When NOT to Use Async

  1. CPU-intensive tasks - Use multiprocessing instead
  2. Simple scripts - Overkill for basic operations
  3. Legacy code - May require significant refactoring

Mixing Sync and Async Code

import asyncio
import time

def sync_heavy_computation(n):
    # CPU-intensive operation
    result = 0
    for i in range(n):
        result += i ** 2
    return result

async def async_wrapper(func, *args):
    # Run sync function in thread pool
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, func, *args)

async def main():
    # Run CPU-intensive work without blocking
    result = await async_wrapper(sync_heavy_computation, 1000000)
    print(f"Computation result: {result}")

asyncio.run(main())

External Resources:

Related Tutorials:

Last updated on