Merge Subscriptions

Merge multiple subscription streams into one, with clean shutdown. Based on Sameer Ajmani's Advanced Go Concurrency Patterns talk.

The naive approach has a goroutine leak — if the receiver stops reading, the forwarding goroutines block forever on send. The fix: a quit channel checked on both receive and send.

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Item struct{ Title, Source string }

type Subscription interface {
	Updates() <-chan Item
	Close() error
}

// --- Naive Merge (buggy) ---
// Goroutines block forever on m.updates if receiver stops reading.

type naiveMerge struct {
	subs    []Subscription
	updates chan Item
}

func NaiveMerge(subs ...Subscription) Subscription {
	m := &naiveMerge{subs: subs, updates: make(chan Item)}
	for _, sub := range subs {
		go func(s Subscription) {
			for it := range s.Updates() {
				m.updates <- it // BUG: blocks if receiver stops
			}
		}(sub)
	}
	return m
}

func (m *naiveMerge) Updates() <-chan Item { return m.updates }
func (m *naiveMerge) Close() (err error) {
	for _, sub := range m.subs {
		if e := sub.Close(); err == nil && e != nil {
			err = e
		}
	}
	close(m.updates) // BUG: forwarding goroutines may still be running
	return
}

// --- Correct Merge ---
// Quit channel checked on both receive and send — no goroutine leaks.

type merge struct {
	subs    []Subscription
	updates chan Item
	quit    chan struct{}
	errs    chan error
}

func Merge(subs ...Subscription) Subscription {
	m := &merge{
		subs:    subs,
		updates: make(chan Item),
		quit:    make(chan struct{}),
		errs:    make(chan error),
	}
	for _, sub := range subs {
		go func(s Subscription) {
			for {
				var it Item
				select {
				case it = <-s.Updates():
				case <-m.quit:
					m.errs <- s.Close()
					return
				}
				select {
				case m.updates <- it:
				case <-m.quit:
					m.errs <- s.Close()
					return
				}
			}
		}(sub)
	}
	return m
}

func (m *merge) Updates() <-chan Item { return m.updates }
func (m *merge) Close() (err error) {
	close(m.quit) // signal all forwarding goroutines to stop
	for range m.subs {
		if e := <-m.errs; e != nil {
			err = e
		}
	}
	close(m.updates)
	return
}

// --- Fake subscription for demo ---

type fakeSub struct {
	source  string
	updates chan Item
	closing chan chan error
}

func FakeSubscribe(source string) Subscription {
	s := &fakeSub{source: source, updates: make(chan Item), closing: make(chan chan error)}
	go s.loop()
	return s
}

func (s *fakeSub) Updates() <-chan Item { return s.updates }
func (s *fakeSub) Close() error {
	errc := make(chan error)
	s.closing <- errc
	return <-errc
}

func (s *fakeSub) loop() {
	for i := 0; ; i++ {
		select {
		case s.updates <- Item{Title: fmt.Sprintf("Item %d", i), Source: s.source}:
			time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
		case errc := <-s.closing:
			close(s.updates)
			errc <- nil
			return
		}
	}
}

func main() {
	merged := Merge(
		FakeSubscribe("blog.golang.org"),
		FakeSubscribe("news.ycombinator.com"),
	)

	time.AfterFunc(2*time.Second, func() {
		fmt.Println("closed:", merged.Close())
	})

	for it := range merged.Updates() {
		fmt.Printf("[%s] %s\n", it.Source, it.Title)
	}
	fmt.Println("done")
}
▶ Open Go Playground

Copy the code above and paste to run

© 2026 ByteLearn.dev. Free courses for developers. · Privacy