04 - Fan-Out / Fan-In
📋 Jump to TakeawaysFan-out means starting multiple goroutines to read from the same channel. Fan-in means merging multiple channels into one. Together, they distribute work and collect results. Once you see this pattern, you'll recognize it everywhere.
We touched on this in Go Essentials. Now we go deeper — dynamic workers, ordered results, and real-world usage.
Why Fan-Out / Fan-In
A single goroutine processing items one at a time is simple but slow. If each item takes 100ms and you have 100 items, that's 10 seconds. Fan-out to 10 workers and it's 1 second. The work is the same — you're just doing it in parallel.
But now you have 10 goroutines producing results on separate channels. The consumer shouldn't care how many workers there are. Fan-in merges everything back into one channel — the consumer reads from a single stream.
Fan-Out
Multiple goroutines reading from a single channel. The Go runtime handles distribution — whichever goroutine is ready gets the next value.
package main
func main() {
jobs := make(chan int, 10)
// Fan-out: 3 workers reading from the same channel
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
fmt.Printf("worker %d got job %d\n", id, job)
time.Sleep(100 * time.Millisecond) // simulate processing time
}
}(i)
}
// Send jobs
for j := 0; j < 10; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
}Work is distributed automatically. No manual assignment needed.
Fan-In
Merge multiple channels into one. The consumer reads from a single channel regardless of how many producers there are.
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}This is a reusable function. Pass any number of channels, get one merged channel back.
Fan-Out / Fan-In Combined
Compute squares of 1 to n using 3 concurrent workers. The generator produces numbers, workers square them in parallel, and fanIn merges the results into a single channel.
package main
func generator(ctx context.Context, n int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= n; i++ {
select {
case out <- i:
case <-ctx.Done():
return
}
}
}()
return out
}
func worker(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}
func fanIn(ctx context.Context, channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
select {
case out <- val:
case <-ctx.Done():
return
}
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Generate numbers 1 to 10
nums := generator(ctx, 10)
// Fan-out: spawn 3 workers reading from the same input
numWorkers := 3
workers := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
workers[i] = worker(ctx, nums)
}
// Fan-in: merge results
for val := range fanIn(ctx, workers...) {
fmt.Println(val)
}
}Important: all three workers read from the same nums channel. Each value goes to exactly one worker — the channel handles distribution.
Ordered Results
Fan-out/fan-in doesn't preserve order. If you need results in the same order as input, wrap values with an index.
type Job struct {
Index int
Value int
}
type Result struct {
Index int
Value int
}
func orderedWorker(ctx context.Context, in <-chan Job) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for job := range in {
select {
case out <- Result{Index: job.Index, Value: job.Value * job.Value}:
case <-ctx.Done():
return
}
}
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
jobs := make(chan Job, 10)
for i := 0; i < 10; i++ {
jobs <- Job{Index: i, Value: i + 1}
}
close(jobs)
// Fan-out
r1 := orderedWorker(ctx, jobs)
r2 := orderedWorker(ctx, jobs)
r3 := orderedWorker(ctx, jobs)
// Fan-in and collect
merged := fanInResults(ctx, r1, r2, r3)
// Collect all results, then sort by index
var all []Result
for r := range merged {
all = append(all, r)
}
sort.Slice(all, func(i, j int) bool {
return all[i].Index < all[j].Index
})
for _, r := range all {
fmt.Printf("job %d: %d\n", r.Index, r.Value)
}
}The trade-off: you collect all results before sorting, which uses more memory. For streaming ordered results, you'd need a more complex reordering buffer.
Dynamic Worker Count
Sometimes you don't know how many workers you need upfront. Scale based on input size or system resources.
func fanOutDynamic(ctx context.Context, in <-chan int, numWorkers int) <-chan int {
workers := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
workers[i] = worker(ctx, in)
}
return fanIn(ctx, workers...)
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nums := generator(ctx, 10)
numWorkers := runtime.NumCPU()
results := fanOutDynamic(ctx, nums, numWorkers)
for val := range results {
fmt.Println(val)
}
}runtime.NumCPU() is a common choice for CPU-bound work. For I/O-bound work (HTTP calls, database queries), you'd use a higher number.
Fan-Out vs Worker Pool
They solve similar problems but differ in structure:
| Fan-Out | Worker Pool | |
|---|---|---|
| Workers | Each gets its own output channel | All share one output channel |
| Lifetime | Workers exit when input is exhausted | Workers run until explicitly stopped |
| Use case | Pipeline stages | Long-running job processing |
Fan-out workers process a batch and exit when done. Worker pool (next lesson) workers stay alive and wait for the next job.
Key Takeaways
- Fan-out: multiple goroutines reading from one channel — work distributes automatically
- Fan-in: merge multiple channels into one — consumers see a single stream
- Always use context for cancellation in every goroutine
- Results arrive in arbitrary order — use index wrapping if order matters
- Scale worker count with
runtime.NumCPU()for CPU-bound, higher for I/O-bound - Fan-out workers process a batch and exit when done. Worker pool workers stay alive and wait for the next job