Streaming Ordered Results

Fan-out/fan-in with a reordering buffer that emits results in original input order as they arrive — no need to collect everything first.

Why not just collect and sort?

The slice + sort approach from the lesson works, but it waits for ALL results before you can process any. With 1000 jobs where most finish in 10ms but one takes 5 seconds, you're blocked for 5 seconds doing nothing.

The streaming approach lets you process results as soon as their turn comes. Job 0 finishes? Emit it immediately. Job 2 finishes before job 1? Hold it in a buffer. Job 1 arrives? Flush both 1 and 2.

How it works

Each job carries an Index (its original position). The reorder function uses a min-heap keyed by index. It tracks nextIndex (starting at 0) and flushes consecutive results from the heap whenever the next expected index is at the top.

  • Best case: results arrive roughly in order, heap stays small, results stream out immediately
  • Worst case: the first job is the slowest, heap fills up with everything else, then flushes all at once when job 0 arrives
  • Typical case with N workers: heap holds ~N items at any time

The ordering is by position, not by value — it preserves the original input sequence.

package main

import (
	"container/heap"
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Job struct {
	Index int
	Value int
}

type Result struct {
	Index int
	Value int
}

// --- Min-heap ordered by Index ---

type resultHeap []Result

func (h resultHeap) Len() int            { return len(h) }
func (h resultHeap) Less(i, j int) bool  { return h[i].Index < h[j].Index }
func (h resultHeap) Swap(i, j int)       { h[i], h[j] = h[j], h[i] }
func (h *resultHeap) Push(x any)         { *h = append(*h, x.(Result)) }
func (h *resultHeap) Pop() any {
	old := *h
	n := len(old)
	x := old[n-1]
	*h = old[:n-1]
	return x
}

// --- Worker ---

func worker(ctx context.Context, id int, in <-chan Job) <-chan Result {
	out := make(chan Result)
	go func() {
		defer close(out)
		for job := range in {
			// Simulate variable processing time
			time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
			select {
			case out <- Result{Index: job.Index, Value: job.Value * job.Value}:
			case <-ctx.Done():
				return
			}
		}
	}()
	return out
}

// --- Fan-in ---

func fanIn(ctx context.Context, channels ...<-chan Result) <-chan Result {
	out := make(chan Result)
	var wg sync.WaitGroup
	for _, ch := range channels {
		wg.Add(1)
		go func(c <-chan Result) {
			defer wg.Done()
			for r := range c {
				select {
				case out <- r:
				case <-ctx.Done():
					return
				}
			}
		}(ch)
	}
	go func() { wg.Wait(); close(out) }()
	return out
}

// --- Reorder buffer: emits results in index order as they become available ---

func reorder(in <-chan Result) <-chan Result {
	out := make(chan Result)
	go func() {
		defer close(out)
		h := &resultHeap{}
		heap.Init(h)
		nextIndex := 0

		for r := range in {
			heap.Push(h, r)
			// Flush all consecutive results starting from nextIndex
			for h.Len() > 0 && (*h)[0].Index == nextIndex {
				out <- heap.Pop(h).(Result)
				nextIndex++
			}
		}
		// Drain any remaining (shouldn't happen if all jobs complete)
		for h.Len() > 0 {
			out <- heap.Pop(h).(Result)
		}
	}()
	return out
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	const numJobs = 15

	// Generate jobs
	jobs := make(chan Job, numJobs)
	for i := 0; i < numJobs; i++ {
		jobs <- Job{Index: i, Value: i + 1}
	}
	close(jobs)

	// Fan-out to 4 workers
	w1 := worker(ctx, 0, jobs)
	w2 := worker(ctx, 1, jobs)
	w3 := worker(ctx, 2, jobs)
	w4 := worker(ctx, 3, jobs)

	// Fan-in, then reorder
	merged := fanIn(ctx, w1, w2, w3, w4)
	ordered := reorder(merged)

	// Stream results — they arrive in order
	for r := range ordered {
		fmt.Printf("job %2d: %d\n", r.Index, r.Value)
	}
}
▶ Open Go Playground

Copy the code above and paste to run

© 2026 ByteLearn.dev. Free courses for developers. · Privacy