Concurrency in Go

Concurrency in Go

Go was designed with concurrency in mind. It provides built-in support for concurrent programming through goroutines and channels, making it easy to write programs that do multiple things at once.

Goroutines

A goroutine is a lightweight thread managed by the Go runtime. They’re cheap to create and have a small memory footprint.

Basic Goroutine

package main

import (
    "fmt"
    "time"
)

func sayHello(name string) {
    for i := 0; i < 3; i++ {
        fmt.Printf("Hello, %s! (%d)\n", name, i+1)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    // Start a goroutine
    go sayHello("Alice")
    
    // Main goroutine continues immediately
    sayHello("Bob")
    
    // Wait a bit to let the other goroutine finish
    time.Sleep(500 * time.Millisecond)
}

Multiple Goroutines

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // Signal that this goroutine is done
    
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Duration(id) * 100 * time.Millisecond)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    // Launch multiple goroutines
    for i := 1; i <= 5; i++ {
        wg.Add(1) // Increment wait group counter
        go worker(i, &wg)
    }
    
    // Wait for all workers to complete
    wg.Wait()
    fmt.Println("All workers completed")
}

Channels

Channels provide a way for goroutines to communicate with each other and synchronize their execution.

Basic Channel Operations

package main

import (
    "fmt"
    "time"
)

func producer(ch chan<- string) {
    for i := 0; i < 5; i++ {
        msg := fmt.Sprintf("Message %d", i+1)
        ch <- msg // Send message to channel
        fmt.Printf("Sent: %s\n", msg)
        time.Sleep(200 * time.Millisecond)
    }
    close(ch) // Close the channel
}

func consumer(ch <-chan string) {
    for msg := range ch {
        fmt.Printf("Received: %s\n", msg)
        time.Sleep(300 * time.Millisecond)
    }
    fmt.Println("Channel closed, consumer exiting")
}

func main() {
    messageChannel := make(chan string)
    
    // Start producer and consumer goroutines
    go producer(messageChannel)
    go consumer(messageChannel)
    
    // Wait for both to finish
    time.Sleep(2 * time.Second)
    fmt.Println("Main function exiting")
}

Select Statement

The select statement allows a goroutine to wait on multiple communication operations:

package main

import (
    "fmt"
    "time"
)

func fibonacci(ch chan int, quit chan bool) {
    x, y := 0, 1
    
    for {
        select {
        case ch <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("Fibonacci generator quitting")
            return
        default:
            fmt.Println("No communication ready")
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func main() {
    ch := make(chan int)
    quit := make(chan bool)
    
    // Consumer goroutine
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Printf("Fibonacci: %d\n", <-ch)
            time.Sleep(200 * time.Millisecond)
        }
        quit <- true
    }()
    
    fibonacci(ch, quit)
}

Channel Patterns

Fan-Out, Fan-In Pattern

package main

import (
    "fmt"
    "sync"
    "time"
)

// Worker function
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        
        // Simulate work
        time.Sleep(100 * time.Millisecond)
        result := job * 2
        
        results <- result
        fmt.Printf("Worker %d completed job %d with result %d\n", id, job, result)
    }
}

func main() {
    numJobs := 10
    numWorkers := 3
    
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    var wg sync.WaitGroup
    
    // Start workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }
    
    // Send jobs
    go func() {
        for j := 1; j <= numJobs; j++ {
            jobs <- j
        }
        close(jobs)
    }()
    
    // Wait for all workers to finish
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Collect results
    fmt.Println("Results:")
    for result := range results {
        fmt.Printf("Result: %d\n", result)
    }
}

Buffered Channels

package main

import (
    "fmt"
    "time"
)

func main() {
    // Buffered channel with capacity 3
    bufferedChannel := make(chan string, 3)
    
    // Send without blocking (up to capacity)
    bufferedChannel <- "First"
    bufferedChannel <- "Second"
    bufferedChannel <- "Third"
    
    fmt.Println("All messages sent to buffered channel")
    
    // Receive messages
    fmt.Println(<-bufferedChannel)
    fmt.Println(<-bufferedChannel)
    fmt.Println(<-bufferedChannel)
    
    // Demonstrate blocking when buffer is full
    fmt.Println("\nDemonstrating blocking behavior:")
    
    ch := make(chan int, 2)
    ch <- 1
    ch <- 2
    
    fmt.Println("Buffer full, attempting to send third value...")
    go func() {
        time.Sleep(1 * time.Second)
        <-ch // Receive one value
        fmt.Println("Received one value, buffer has space")
    }()
    
    ch <- 3 // This will block until space is available
    fmt.Println("Third value sent successfully")
}

Synchronization Primitives

Mutex

package main

import (
    "fmt"
    "sync"
    "time"
)

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    c.value++
    fmt.Printf("Counter incremented to %d\n", c.value)
}

func (c *Counter) GetValue() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    return c.value
}

func main() {
    counter := &Counter{}
    var wg sync.WaitGroup
    
    // Launch multiple goroutines to increment counter
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 3; j++ {
                counter.Increment()
                time.Sleep(50 * time.Millisecond)
            }
            fmt.Printf("Goroutine %d finished\n", id)
        }(i)
    }
    
    wg.Wait()
    fmt.Printf("Final counter value: %d\n", counter.GetValue())
}

RWMutex (Read-Write Mutex)

package main

import (
    "fmt"
    "sync"
    "time"
)

type DataStore struct {
    mu    sync.RWMutex
    data  map[string]int
}

func NewDataStore() *DataStore {
    return &DataStore{
        data: make(map[string]int),
    }
}

func (ds *DataStore) Write(key string, value int) {
    ds.mu.Lock()
    defer ds.mu.Unlock()
    
    ds.data[key] = value
    fmt.Printf("Wrote: %s = %d\n", key, value)
}

func (ds *DataStore) Read(key string) (int, bool) {
    ds.mu.RLock()
    defer ds.mu.RUnlock()
    
    value, exists := ds.data[key]
    return value, exists
}

func (ds *DataStore) ReadAll() map[string]int {
    ds.mu.RLock()
    defer ds.mu.RUnlock()
    
    // Return a copy to avoid race conditions
    result := make(map[string]int)
    for k, v := range ds.data {
        result[k] = v
    }
    return result
}

func main() {
    store := NewDataStore()
    var wg sync.WaitGroup
    
    // Writer goroutine
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < 5; i++ {
            key := fmt.Sprintf("key%d", i)
            store.Write(key, i*10)
            time.Sleep(100 * time.Millisecond)
        }
    }()
    
    // Reader goroutines
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(readerID int) {
            defer wg.Done()
            for j := 0; j < 5; j++ {
                data := store.ReadAll()
                fmt.Printf("Reader %d sees %d items: %v\n", readerID, len(data), data)
                time.Sleep(150 * time.Millisecond)
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Println("All operations completed")
}

Once

Use sync.Once to ensure a function is executed only once:

package main

import (
    "fmt"
    "sync"
)

type Database struct {
    connected bool
}

var (
    db  *Database
    once sync.Once
)

func getInstance() *Database {
    once.Do(func() {
        fmt.Println("Initializing database connection...")
        db = &Database{connected: true}
        fmt.Println("Database connection established")
    })
    return db
}

func main() {
    // Multiple goroutines trying to get instance
    for i := 0; i < 5; i++ {
        go func(id int) {
            instance := getInstance()
            fmt.Printf("Goroutine %d got instance: %+v\n", id, instance)
        }(i)
    }
    
    // Give time for goroutines to complete
    fmt.Scanln()
}

Context Package

The context package is essential for managing deadlines, cancellations, and timeouts in concurrent operations.

package main

import (
    "context"
    "fmt"
    "time"
)

func task(ctx context.Context, id int) {
    for i := 0; i < 5; i++ {
        select {
        case <-ctx.Done():
            fmt.Printf("Task %d canceled: %v\n", id, ctx.Err())
            return
        default:
            fmt.Printf("Task %d working... %d\n", id, i+1)
            time.Sleep(200 * time.Millisecond)
        }
    }
    fmt.Printf("Task %d completed\n", id)
}

func main() {
    // Example 1: Context with timeout
    fmt.Println("Example 1: Context with timeout")
    ctx1, cancel1 := context.WithTimeout(context.Background(), 800*time.Millisecond)
    defer cancel1()
    
    go task(ctx1, 1)
    time.Sleep(1 * time.Second)
    
    // Example 2: Manual cancellation
    fmt.Println("\nExample 2: Manual cancellation")
    ctx2, cancel2 := context.WithCancel(context.Background())
    
    go task(ctx2, 2)
    
    // Cancel after 500ms
    time.Sleep(500 * time.Millisecond)
    cancel2()
    
    time.Sleep(1 * time.Second)
    
    // Example 3: Context with deadline
    fmt.Println("\nExample 3: Context with deadline")
    ctx3, cancel3 := context.WithDeadline(context.Background(), time.Now().Add(600*time.Millisecond))
    defer cancel3()
    
    go task(ctx3, 3)
    time.Sleep(1 * time.Second)
}

Worker Pool Pattern

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Task struct {
    ID       int
    Duration time.Duration
}

func worker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
    defer wg.Done()
    
    fmt.Printf("Worker %d started\n", id)
    
    for task := range tasks {
        fmt.Printf("Worker %d started task %d\n", id, task.ID)
        
        // Simulate work
        time.Sleep(task.Duration)
        
        fmt.Printf("Worker %d completed task %d\n", id, task.ID)
    }
    
    fmt.Printf("Worker %d stopped\n", id)
}

func main() {
    numWorkers := 3
    numTasks := 10
    
    tasks := make(chan Task, numTasks)
    var wg sync.WaitGroup
    
    // Start workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, tasks, &wg)
    }
    
    // Generate tasks
    rand.Seed(time.Now().UnixNano())
    for i := 1; i <= numTasks; i++ {
        duration := time.Duration(rand.Intn(500)+100) * time.Millisecond
        tasks <- Task{ID: i, Duration: duration}
    }
    
    close(tasks)
    
    // Wait for all workers to complete
    wg.Wait()
    fmt.Println("All tasks completed")
}

Pipeline Pattern

package main

import (
    "fmt"
    "sync"
)

func generator(numbers []int) <-chan int {
    out := make(chan int)
    
    go func() {
        defer close(out)
        for _, num := range numbers {
            out <- num
        }
    }()
    
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    
    go func() {
        defer close(out)
        for num := range in {
            out <- num * num
        }
    }()
    
    return out
}

func filter(in <-chan int, predicate func(int) bool) <-chan int {
    out := make(chan int)
    
    go func() {
        defer close(out)
        for num := range in {
            if predicate(num) {
                out <- num
            }
        }
    }()
    
    return out
}

func main() {
    numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    
    // Create pipeline
    stage1 := generator(numbers)
    stage2 := square(stage1)
    stage3 := filter(stage2, func(n int) bool { return n > 25 })
    
    // Consume results
    fmt.Println("Squares greater than 25:")
    for result := range stage3 {
        fmt.Println(result)
    }
    
    // Alternative approach using fan-in
    fmt.Println("\nFan-in example:")
    
    inputs := []<-chan int{
        square(generator([]int{1, 2, 3})),
        square(generator([]int{4, 5, 6})),
        square(generator([]int{7, 8, 9})),
    }
    
    out := fanIn(inputs...)
    for result := range out {
        fmt.Println(result)
    }
}

func fanIn(inputs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    
    wg.Add(len(inputs))
    for _, in := range inputs {
        go output(in)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

Common Pitfalls and Best Practices

1. Race Conditions

// BAD: Race condition
func badCounter() {
    counter := 0
    
    for i := 0; i < 1000; i++ {
        go func() {
            counter++ // Race condition!
        }()
    }
}

// GOOD: Using mutex
func goodCounter() {
    var mu sync.Mutex
    counter := 0
    
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            counter++
            mu.Unlock()
        }()
    }
    wg.Wait()
}

2. Goroutine Leaks

// BAD: Goroutine leak
func badLeak() {
    ch := make(chan int)
    
    go func() {
        <-ch // This goroutine will never exit
    }()
    
    // ch is never sent to, so goroutine leaks
}

// GOOD: Proper cleanup
func goodLeakPrevention() {
    ch := make(chan int)
    done := make(chan struct{})
    
    go func() {
        select {
        case <-ch:
            // Process data
        case <-done:
            // Time to exit
        }
    }()
    
    // Send signal to cleanup when needed
    close(done)
}

Best Practices

  1. Prefer channels over shared memory when possible
  2. Use mutexes only when necessary for protecting shared state
  3. Always close channels when you’re done sending
  4. Be careful with goroutine lifecycles - ensure they can exit
  5. Use context for cancellation and timeouts
  6. Avoid blocking operations in critical sections
  7. Don’t forget to defer WaitGroup.Done()
  8. Use buffered channels when you know the expected load
  9. Handle panic in goroutines if they might occur
  10. Profile your concurrent code to find bottlenecks

External Resources:

Related Tutorials:

Last updated on