Merge Subscriptions
Merge multiple subscription streams into one, with clean shutdown. Based on Sameer Ajmani's Advanced Go Concurrency Patterns talk.
The naive approach has a goroutine leak — if the receiver stops reading, the forwarding goroutines block forever on send. The fix: a quit channel checked on both receive and send.
package main
import (
"fmt"
"math/rand"
"time"
)
type Item struct{ Title, Source string }
type Subscription interface {
Updates() <-chan Item
Close() error
}
// --- Naive Merge (buggy) ---
// Goroutines block forever on m.updates if receiver stops reading.
type naiveMerge struct {
subs []Subscription
updates chan Item
}
func NaiveMerge(subs ...Subscription) Subscription {
m := &naiveMerge{subs: subs, updates: make(chan Item)}
for _, sub := range subs {
go func(s Subscription) {
for it := range s.Updates() {
m.updates <- it // BUG: blocks if receiver stops
}
}(sub)
}
return m
}
func (m *naiveMerge) Updates() <-chan Item { return m.updates }
func (m *naiveMerge) Close() (err error) {
for _, sub := range m.subs {
if e := sub.Close(); err == nil && e != nil {
err = e
}
}
close(m.updates) // BUG: forwarding goroutines may still be running
return
}
// --- Correct Merge ---
// Quit channel checked on both receive and send — no goroutine leaks.
type merge struct {
subs []Subscription
updates chan Item
quit chan struct{}
errs chan error
}
func Merge(subs ...Subscription) Subscription {
m := &merge{
subs: subs,
updates: make(chan Item),
quit: make(chan struct{}),
errs: make(chan error),
}
for _, sub := range subs {
go func(s Subscription) {
for {
var it Item
select {
case it = <-s.Updates():
case <-m.quit:
m.errs <- s.Close()
return
}
select {
case m.updates <- it:
case <-m.quit:
m.errs <- s.Close()
return
}
}
}(sub)
}
return m
}
func (m *merge) Updates() <-chan Item { return m.updates }
func (m *merge) Close() (err error) {
close(m.quit) // signal all forwarding goroutines to stop
for range m.subs {
if e := <-m.errs; e != nil {
err = e
}
}
close(m.updates)
return
}
// --- Fake subscription for demo ---
type fakeSub struct {
source string
updates chan Item
closing chan chan error
}
func FakeSubscribe(source string) Subscription {
s := &fakeSub{source: source, updates: make(chan Item), closing: make(chan chan error)}
go s.loop()
return s
}
func (s *fakeSub) Updates() <-chan Item { return s.updates }
func (s *fakeSub) Close() error {
errc := make(chan error)
s.closing <- errc
return <-errc
}
func (s *fakeSub) loop() {
for i := 0; ; i++ {
select {
case s.updates <- Item{Title: fmt.Sprintf("Item %d", i), Source: s.source}:
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
case errc := <-s.closing:
close(s.updates)
errc <- nil
return
}
}
}
func main() {
merged := Merge(
FakeSubscribe("blog.golang.org"),
FakeSubscribe("news.ycombinator.com"),
)
time.AfterFunc(2*time.Second, func() {
fmt.Println("closed:", merged.Close())
})
for it := range merged.Updates() {
fmt.Printf("[%s] %s\n", it.Source, it.Title)
}
fmt.Println("done")
}