05 - Worker Pools

📋 Jump to Takeaways

A worker pool is a fixed number of goroutines pulling jobs from a shared queue. Unlike fan-out where workers die when input is exhausted, pool workers stay alive and wait for more work.

This is one of the most common patterns in production Go.

Why Worker Pools

You could spin up a goroutine for every job. With 10,000 jobs, that's 10,000 goroutines. Goroutines are cheap, but each one might hold a database connection, an HTTP client, or a file handle. Those aren't cheap. A worker pool gives you controlled parallelism — fixed resource usage, predictable behavior, and backpressure when work piles up.

Use worker pools when:

  • You're processing a queue of jobs (image resizing, email sending, API calls)
  • You need to limit concurrent access to a resource (DB connections, file handles)
  • Jobs arrive continuously and workers should stay alive between jobs

Basic Worker Pool

package main

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("worker %d processing job %d\n", id, job)
        time.Sleep(500 * time.Millisecond) // simulate work
        results <- job * 2
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    var wg sync.WaitGroup

    // Start workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    // Send jobs
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    // Wait for workers, then close results
    go func() {
        wg.Wait()
        close(results)
    }()

    // Collect results
    for r := range results {
        fmt.Println("result:", r)
    }
}

Three workers, ten jobs. Workers pull from the jobs channel until it's closed. Results flow into a buffered channel. The closing goroutine waits for all workers to finish before closing results.

Notice who closes what: main closes jobs because it's the producer of jobs. But workers are the producers of results — so who closes results? No single worker can, because they all share it. The wg.Wait() goroutine acts as a coordinator: it waits for all producers to finish, then closes the channel. This is the standard pattern when multiple goroutines write to one channel.

Worker Pool with Context

Add cancellation so the pool can be stopped mid-flight.

func worker(ctx context.Context, id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("worker %d cancelled\n", id)
            return
        case job, ok := <-jobs:
            if !ok {
                return // channel closed, no more jobs
            }
            fmt.Printf("worker %d processing job %d\n", id, job)
            time.Sleep(500 * time.Millisecond) // simulate work

            select {
            case results <- job * 2:
            case <-ctx.Done():
                return
            }
        }
    }
}

The double select is important.

  1. First select: worker is idle, waiting for a job or cancellation. If the context is cancelled while waiting, the worker exits instead of picking up more work.

  2. Second select: worker finished the job and wants to send the result. But if nobody is reading from results anymore (context was cancelled), results <- job * 2 would block forever. The second select prevents that deadlock.

Graceful Shutdown

There are two ways to stop a worker pool:

Graceful — close the jobs channel. Workers finish whatever they're processing, drain remaining jobs, then exit.

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    var wg sync.WaitGroup

    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    for j := 0; j < 20; j++ {
        jobs <- j
    }
    close(jobs) // no more jobs — workers drain what's left and exit

    wg.Wait()      // wait for all workers to finish
    close(results)  // safe to close — all producers are done

    for r := range results {
        fmt.Println("result:", r)
    }
}

Emergency — cancel the context. Workers drop everything and exit immediately, even if jobs remain in the queue.

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    var wg sync.WaitGroup

    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(ctx, i, jobs, results, &wg)
    }

    for j := 0; j < 20; j++ {
        jobs <- j
    }

    cancel() // stop now — workers exit even if jobs remain

    wg.Wait()
    close(results)

    for r := range results {
        fmt.Println("result:", r)
    }
}

Use graceful shutdown by default. Use context cancellation when something goes wrong — a timeout, a signal from the OS, or an unrecoverable error.

Struct-Based Worker Pool

The previous examples wire up channels, WaitGroups, and goroutines every time. When you need a pool in multiple places, wrap it in a struct — callers just use Submit and Shutdown without managing internals.

type Pool struct {
    jobs    chan func()
    wg      sync.WaitGroup
}

func NewPool(size int) *Pool {
    p := &Pool{
        jobs: make(chan func(), size*2),
    }
    for i := 0; i < size; i++ {
        p.wg.Add(1)
        go p.run()
    }
    return p
}

func (p *Pool) run() {
    defer p.wg.Done()
    for fn := range p.jobs {
        fn()
    }
}

func (p *Pool) Submit(fn func()) {
    p.jobs <- fn
}

func (p *Pool) Shutdown() {
    close(p.jobs)
    p.wg.Wait()
}

Usage:

func main() {
    pool := NewPool(4)

    for i := 0; i < 20; i++ {
        i := i // not needed in Go 1.22+ (loop vars are per-iteration)
        // before 1.22, all closures shared the same i — without this, every goroutine would print 20
        pool.Submit(func() {
            fmt.Printf("processing %d\n", i)
            time.Sleep(100 * time.Millisecond) // simulate work
        })
    }

    pool.Shutdown()
    fmt.Println("all done")
}

Submit sends a closure to the pool. Shutdown closes the channel and waits. Clean, reusable, and generic — the pool doesn't care what work it's doing.

Output order is non-deterministic — workers run concurrently, so numbers appear in arbitrary order. If it looks ordered on the Go Playground, that's because the Playground runs with GOMAXPROCS=1 and a deterministic scheduler. On a real machine with multiple cores, you'll see interleaving.

Buffered vs Unbuffered Job Channel

Unbuffered Buffered
Submit blocks when All workers are busy Buffer is full AND all workers are busy
Backpressure Immediate — caller waits Delayed — buffer absorbs bursts
Memory Minimal Buffer size × job size

Use buffered for bursty workloads. Use unbuffered when you want the caller to slow down if workers can't keep up.

Practical Example: Concurrent URL Fetcher

type FetchResult struct {
    URL    string
    Status int
    Err    error
}

func fetchWorker(ctx context.Context, urls <-chan string, results chan<- FetchResult, wg *sync.WaitGroup) {
    defer wg.Done()
    client := &http.Client{Timeout: 10 * time.Second}

    for url := range urls {
        select {
        case <-ctx.Done():
            return
        default:
        }

        req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
        if err != nil {
            results <- FetchResult{URL: url, Err: err}
            continue
        }

        resp, err := client.Do(req)
        if err != nil {
            results <- FetchResult{URL: url, Err: err}
            continue
        }
        resp.Body.Close()
        results <- FetchResult{URL: url, Status: resp.StatusCode}
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    urls := []string{
        "https://go.dev",
        "https://pkg.go.dev",
        "https://github.com",
        "https://example.com",
        "https://httpbin.org/get",
    }

    urlCh := make(chan string, len(urls))
    results := make(chan FetchResult, len(urls))
    var wg sync.WaitGroup

    // Start 3 workers
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go fetchWorker(ctx, urlCh, results, &wg)
    }

    // Send URLs
    for _, u := range urls {
        urlCh <- u
    }
    close(urlCh)

    go func() {
        wg.Wait()
        close(results)
    }()

    for r := range results {
        if r.Err != nil {
            fmt.Printf("FAIL %s: %v\n", r.URL, r.Err)
        } else {
            fmt.Printf("OK   %s: %d\n", r.URL, r.Status)
        }
    }
}

Three workers fetch URLs concurrently. Context handles timeout. Results are collected through a single channel. This is the worker pool pattern in action.

Key Takeaways

  • Worker pool: fixed goroutines pulling from a shared job channel
  • Close the job channel for graceful shutdown — workers drain remaining work
  • Use context cancellation for emergency stops
  • Double select pattern: check context on both receive and send
  • Struct-based pools are reusable — accept func() for generic work
  • Buffer the job channel for bursty workloads, unbuffered for backpressure
  • Worker pools are for long-running services; fan-out is for pipeline stages

🚀 Ready to run?

Complete runnable examples for this lesson.

📝 Ready to test your knowledge?

Answer the quiz below to mark this lesson complete.

© 2026 ByteLearn.dev. Free courses for developers. · Privacy