Streaming Ordered Results
Fan-out/fan-in with a reordering buffer that emits results in original input order as they arrive — no need to collect everything first.
Why not just collect and sort?
The slice + sort approach from the lesson works, but it waits for ALL results before you can process any. With 1000 jobs where most finish in 10ms but one takes 5 seconds, you're blocked for 5 seconds doing nothing.
The streaming approach lets you process results as soon as their turn comes. Job 0 finishes? Emit it immediately. Job 2 finishes before job 1? Hold it in a buffer. Job 1 arrives? Flush both 1 and 2.
How it works
Each job carries an Index (its original position). The reorder function uses a min-heap keyed by index. It tracks nextIndex (starting at 0) and flushes consecutive results from the heap whenever the next expected index is at the top.
- Best case: results arrive roughly in order, heap stays small, results stream out immediately
- Worst case: the first job is the slowest, heap fills up with everything else, then flushes all at once when job 0 arrives
- Typical case with N workers: heap holds ~N items at any time
The ordering is by position, not by value — it preserves the original input sequence.
package main
import (
"container/heap"
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
Index int
Value int
}
type Result struct {
Index int
Value int
}
// --- Min-heap ordered by Index ---
type resultHeap []Result
func (h resultHeap) Len() int { return len(h) }
func (h resultHeap) Less(i, j int) bool { return h[i].Index < h[j].Index }
func (h resultHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *resultHeap) Push(x any) { *h = append(*h, x.(Result)) }
func (h *resultHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
// --- Worker ---
func worker(ctx context.Context, id int, in <-chan Job) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for job := range in {
// Simulate variable processing time
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
select {
case out <- Result{Index: job.Index, Value: job.Value * job.Value}:
case <-ctx.Done():
return
}
}
}()
return out
}
// --- Fan-in ---
func fanIn(ctx context.Context, channels ...<-chan Result) <-chan Result {
out := make(chan Result)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan Result) {
defer wg.Done()
for r := range c {
select {
case out <- r:
case <-ctx.Done():
return
}
}
}(ch)
}
go func() { wg.Wait(); close(out) }()
return out
}
// --- Reorder buffer: emits results in index order as they become available ---
func reorder(in <-chan Result) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
h := &resultHeap{}
heap.Init(h)
nextIndex := 0
for r := range in {
heap.Push(h, r)
// Flush all consecutive results starting from nextIndex
for h.Len() > 0 && (*h)[0].Index == nextIndex {
out <- heap.Pop(h).(Result)
nextIndex++
}
}
// Drain any remaining (shouldn't happen if all jobs complete)
for h.Len() > 0 {
out <- heap.Pop(h).(Result)
}
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const numJobs = 15
// Generate jobs
jobs := make(chan Job, numJobs)
for i := 0; i < numJobs; i++ {
jobs <- Job{Index: i, Value: i + 1}
}
close(jobs)
// Fan-out to 4 workers
w1 := worker(ctx, 0, jobs)
w2 := worker(ctx, 1, jobs)
w3 := worker(ctx, 2, jobs)
w4 := worker(ctx, 3, jobs)
// Fan-in, then reorder
merged := fanIn(ctx, w1, w2, w3, w4)
ordered := reorder(merged)
// Stream results — they arrive in order
for r := range ordered {
fmt.Printf("job %2d: %d\n", r.Index, r.Value)
}
}