Fan-Out Fan-In
Distribute work to multiple workers, merge results into one channel.
package main
import (
"context"
"fmt"
"sync"
)
func gen(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
func worker(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}
func fanIn(ctx context.Context, channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
select {
case out <- val:
case <-ctx.Done():
return
}
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nums := gen(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Fan-out: 3 workers
w1 := worker(ctx, nums)
w2 := worker(ctx, nums)
w3 := worker(ctx, nums)
// Fan-in: merge results
for val := range fanIn(ctx, w1, w2, w3) {
fmt.Println(val)
}
}