Concurrency in Go
Go was designed with concurrency in mind. It provides built-in support for concurrent programming through goroutines and channels, making it easy to write programs that do multiple things at once.
Goroutines
A goroutine is a lightweight thread managed by the Go runtime. They’re cheap to create and have a small memory footprint.
Basic Goroutine
package main
import (
"fmt"
"time"
)
func sayHello(name string) {
for i := 0; i < 3; i++ {
fmt.Printf("Hello, %s! (%d)\n", name, i+1)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
// Start a goroutine
go sayHello("Alice")
// Main goroutine continues immediately
sayHello("Bob")
// Wait a bit to let the other goroutine finish
time.Sleep(500 * time.Millisecond)
}Multiple Goroutines
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // Signal that this goroutine is done
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Duration(id) * 100 * time.Millisecond)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
// Launch multiple goroutines
for i := 1; i <= 5; i++ {
wg.Add(1) // Increment wait group counter
go worker(i, &wg)
}
// Wait for all workers to complete
wg.Wait()
fmt.Println("All workers completed")
}Channels
Channels provide a way for goroutines to communicate with each other and synchronize their execution.
Basic Channel Operations
package main
import (
"fmt"
"time"
)
func producer(ch chan<- string) {
for i := 0; i < 5; i++ {
msg := fmt.Sprintf("Message %d", i+1)
ch <- msg // Send message to channel
fmt.Printf("Sent: %s\n", msg)
time.Sleep(200 * time.Millisecond)
}
close(ch) // Close the channel
}
func consumer(ch <-chan string) {
for msg := range ch {
fmt.Printf("Received: %s\n", msg)
time.Sleep(300 * time.Millisecond)
}
fmt.Println("Channel closed, consumer exiting")
}
func main() {
messageChannel := make(chan string)
// Start producer and consumer goroutines
go producer(messageChannel)
go consumer(messageChannel)
// Wait for both to finish
time.Sleep(2 * time.Second)
fmt.Println("Main function exiting")
}Select Statement
The select statement allows a goroutine to wait on multiple communication operations:
package main
import (
"fmt"
"time"
)
func fibonacci(ch chan int, quit chan bool) {
x, y := 0, 1
for {
select {
case ch <- x:
x, y = y, x+y
case <-quit:
fmt.Println("Fibonacci generator quitting")
return
default:
fmt.Println("No communication ready")
time.Sleep(100 * time.Millisecond)
}
}
}
func main() {
ch := make(chan int)
quit := make(chan bool)
// Consumer goroutine
go func() {
for i := 0; i < 10; i++ {
fmt.Printf("Fibonacci: %d\n", <-ch)
time.Sleep(200 * time.Millisecond)
}
quit <- true
}()
fibonacci(ch, quit)
}Channel Patterns
Fan-Out, Fan-In Pattern
package main
import (
"fmt"
"sync"
"time"
)
// Worker function
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
// Simulate work
time.Sleep(100 * time.Millisecond)
result := job * 2
results <- result
fmt.Printf("Worker %d completed job %d with result %d\n", id, job, result)
}
}
func main() {
numJobs := 10
numWorkers := 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
// Start workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Send jobs
go func() {
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
}()
// Wait for all workers to finish
go func() {
wg.Wait()
close(results)
}()
// Collect results
fmt.Println("Results:")
for result := range results {
fmt.Printf("Result: %d\n", result)
}
}Buffered Channels
package main
import (
"fmt"
"time"
)
func main() {
// Buffered channel with capacity 3
bufferedChannel := make(chan string, 3)
// Send without blocking (up to capacity)
bufferedChannel <- "First"
bufferedChannel <- "Second"
bufferedChannel <- "Third"
fmt.Println("All messages sent to buffered channel")
// Receive messages
fmt.Println(<-bufferedChannel)
fmt.Println(<-bufferedChannel)
fmt.Println(<-bufferedChannel)
// Demonstrate blocking when buffer is full
fmt.Println("\nDemonstrating blocking behavior:")
ch := make(chan int, 2)
ch <- 1
ch <- 2
fmt.Println("Buffer full, attempting to send third value...")
go func() {
time.Sleep(1 * time.Second)
<-ch // Receive one value
fmt.Println("Received one value, buffer has space")
}()
ch <- 3 // This will block until space is available
fmt.Println("Third value sent successfully")
}Synchronization Primitives
Mutex
package main
import (
"fmt"
"sync"
"time"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
fmt.Printf("Counter incremented to %d\n", c.value)
}
func (c *Counter) GetValue() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := &Counter{}
var wg sync.WaitGroup
// Launch multiple goroutines to increment counter
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 3; j++ {
counter.Increment()
time.Sleep(50 * time.Millisecond)
}
fmt.Printf("Goroutine %d finished\n", id)
}(i)
}
wg.Wait()
fmt.Printf("Final counter value: %d\n", counter.GetValue())
}RWMutex (Read-Write Mutex)
package main
import (
"fmt"
"sync"
"time"
)
type DataStore struct {
mu sync.RWMutex
data map[string]int
}
func NewDataStore() *DataStore {
return &DataStore{
data: make(map[string]int),
}
}
func (ds *DataStore) Write(key string, value int) {
ds.mu.Lock()
defer ds.mu.Unlock()
ds.data[key] = value
fmt.Printf("Wrote: %s = %d\n", key, value)
}
func (ds *DataStore) Read(key string) (int, bool) {
ds.mu.RLock()
defer ds.mu.RUnlock()
value, exists := ds.data[key]
return value, exists
}
func (ds *DataStore) ReadAll() map[string]int {
ds.mu.RLock()
defer ds.mu.RUnlock()
// Return a copy to avoid race conditions
result := make(map[string]int)
for k, v := range ds.data {
result[k] = v
}
return result
}
func main() {
store := NewDataStore()
var wg sync.WaitGroup
// Writer goroutine
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
key := fmt.Sprintf("key%d", i)
store.Write(key, i*10)
time.Sleep(100 * time.Millisecond)
}
}()
// Reader goroutines
for i := 0; i < 3; i++ {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
for j := 0; j < 5; j++ {
data := store.ReadAll()
fmt.Printf("Reader %d sees %d items: %v\n", readerID, len(data), data)
time.Sleep(150 * time.Millisecond)
}
}(i)
}
wg.Wait()
fmt.Println("All operations completed")
}Once
Use sync.Once to ensure a function is executed only once:
package main
import (
"fmt"
"sync"
)
type Database struct {
connected bool
}
var (
db *Database
once sync.Once
)
func getInstance() *Database {
once.Do(func() {
fmt.Println("Initializing database connection...")
db = &Database{connected: true}
fmt.Println("Database connection established")
})
return db
}
func main() {
// Multiple goroutines trying to get instance
for i := 0; i < 5; i++ {
go func(id int) {
instance := getInstance()
fmt.Printf("Goroutine %d got instance: %+v\n", id, instance)
}(i)
}
// Give time for goroutines to complete
fmt.Scanln()
}Context Package
The context package is essential for managing deadlines, cancellations, and timeouts in concurrent operations.
package main
import (
"context"
"fmt"
"time"
)
func task(ctx context.Context, id int) {
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
fmt.Printf("Task %d canceled: %v\n", id, ctx.Err())
return
default:
fmt.Printf("Task %d working... %d\n", id, i+1)
time.Sleep(200 * time.Millisecond)
}
}
fmt.Printf("Task %d completed\n", id)
}
func main() {
// Example 1: Context with timeout
fmt.Println("Example 1: Context with timeout")
ctx1, cancel1 := context.WithTimeout(context.Background(), 800*time.Millisecond)
defer cancel1()
go task(ctx1, 1)
time.Sleep(1 * time.Second)
// Example 2: Manual cancellation
fmt.Println("\nExample 2: Manual cancellation")
ctx2, cancel2 := context.WithCancel(context.Background())
go task(ctx2, 2)
// Cancel after 500ms
time.Sleep(500 * time.Millisecond)
cancel2()
time.Sleep(1 * time.Second)
// Example 3: Context with deadline
fmt.Println("\nExample 3: Context with deadline")
ctx3, cancel3 := context.WithDeadline(context.Background(), time.Now().Add(600*time.Millisecond))
defer cancel3()
go task(ctx3, 3)
time.Sleep(1 * time.Second)
}Worker Pool Pattern
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Task struct {
ID int
Duration time.Duration
}
func worker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d started\n", id)
for task := range tasks {
fmt.Printf("Worker %d started task %d\n", id, task.ID)
// Simulate work
time.Sleep(task.Duration)
fmt.Printf("Worker %d completed task %d\n", id, task.ID)
}
fmt.Printf("Worker %d stopped\n", id)
}
func main() {
numWorkers := 3
numTasks := 10
tasks := make(chan Task, numTasks)
var wg sync.WaitGroup
// Start workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, tasks, &wg)
}
// Generate tasks
rand.Seed(time.Now().UnixNano())
for i := 1; i <= numTasks; i++ {
duration := time.Duration(rand.Intn(500)+100) * time.Millisecond
tasks <- Task{ID: i, Duration: duration}
}
close(tasks)
// Wait for all workers to complete
wg.Wait()
fmt.Println("All tasks completed")
}Pipeline Pattern
package main
import (
"fmt"
"sync"
)
func generator(numbers []int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, num := range numbers {
out <- num
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for num := range in {
out <- num * num
}
}()
return out
}
func filter(in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for num := range in {
if predicate(num) {
out <- num
}
}
}()
return out
}
func main() {
numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
// Create pipeline
stage1 := generator(numbers)
stage2 := square(stage1)
stage3 := filter(stage2, func(n int) bool { return n > 25 })
// Consume results
fmt.Println("Squares greater than 25:")
for result := range stage3 {
fmt.Println(result)
}
// Alternative approach using fan-in
fmt.Println("\nFan-in example:")
inputs := []<-chan int{
square(generator([]int{1, 2, 3})),
square(generator([]int{4, 5, 6})),
square(generator([]int{7, 8, 9})),
}
out := fanIn(inputs...)
for result := range out {
fmt.Println(result)
}
}
func fanIn(inputs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(inputs))
for _, in := range inputs {
go output(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}Common Pitfalls and Best Practices
1. Race Conditions
// BAD: Race condition
func badCounter() {
counter := 0
for i := 0; i < 1000; i++ {
go func() {
counter++ // Race condition!
}()
}
}
// GOOD: Using mutex
func goodCounter() {
var mu sync.Mutex
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}()
}
wg.Wait()
}2. Goroutine Leaks
// BAD: Goroutine leak
func badLeak() {
ch := make(chan int)
go func() {
<-ch // This goroutine will never exit
}()
// ch is never sent to, so goroutine leaks
}
// GOOD: Proper cleanup
func goodLeakPrevention() {
ch := make(chan int)
done := make(chan struct{})
go func() {
select {
case <-ch:
// Process data
case <-done:
// Time to exit
}
}()
// Send signal to cleanup when needed
close(done)
}Best Practices
- Prefer channels over shared memory when possible
- Use mutexes only when necessary for protecting shared state
- Always close channels when you’re done sending
- Be careful with goroutine lifecycles - ensure they can exit
- Use context for cancellation and timeouts
- Avoid blocking operations in critical sections
- Don’t forget to defer WaitGroup.Done()
- Use buffered channels when you know the expected load
- Handle panic in goroutines if they might occur
- Profile your concurrent code to find bottlenecks
External Resources:
Related Tutorials:
Last updated on