Naive Subscription Bugs
Three bugs in a naive subscription implementation, and how a for-select loop fixes all of them. Based on Sameer Ajmani's Advanced Go Concurrency Patterns talk.
package main
import (
"fmt"
"math/rand"
"time"
)
type Item struct{ Title string }
type Fetcher interface {
Fetch() ([]Item, time.Time, error)
}
// --- Naive implementation: 3 bugs ---
type naiveSub struct {
fetcher Fetcher
updates chan Item
closed bool // BUG 1: unsynchronized read/write across goroutines
err error // BUG 1: same — data race
}
func NaiveSubscribe(fetcher Fetcher) *naiveSub {
s := &naiveSub{fetcher: fetcher, updates: make(chan Item)}
go s.loop()
return s
}
func (s *naiveSub) Updates() <-chan Item { return s.updates }
func (s *naiveSub) Close() error {
s.closed = true // BUG 1: race — loop reads s.closed without sync
return s.err // BUG 1: race — loop writes s.err without sync
}
func (s *naiveSub) loop() {
for {
if s.closed { // BUG 1: unsynchronized read
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err // BUG 1: unsynchronized write
time.Sleep(10 * time.Second) // BUG 2: blocks 10s, ignores Close
continue
}
for _, item := range items {
s.updates <- item // BUG 3: blocks forever if receiver stops reading
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now)) // BUG 2: blocks, ignores Close
}
}
}
// Bug 1: s.closed and s.err accessed from two goroutines without sync → data race
// Bug 2: time.Sleep blocks the loop — Close() has no effect until sleep finishes
// Bug 3: s.updates <- item blocks forever if nobody is reading
// --- Fixed implementation: for-select loop ---
type sub struct {
fetcher Fetcher
updates chan Item
closing chan chan error
}
func Subscribe(fetcher Fetcher) *sub {
s := &sub{
fetcher: fetcher,
updates: make(chan Item),
closing: make(chan chan error),
}
go s.loop()
return s
}
func (s *sub) Updates() <-chan Item { return s.updates }
func (s *sub) Close() error {
errc := make(chan error)
s.closing <- errc // send request to loop
return <-errc // wait for loop to reply
}
func (s *sub) loop() {
var pending []Item
var next time.Time
var err error
for {
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
// Nil channel trick: disable send when nothing pending
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates
}
select {
case errc := <-s.closing: // Fix bug 1: no shared state, communicate via channel
errc <- err
close(s.updates)
return
case <-startFetch: // Fix bug 2: timer instead of Sleep, select can still handle Close
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
case updates <- first: // Fix bug 3: send is a select case, never blocks forever
pending = pending[1:]
}
}
}
// --- Demo ---
type fakeFetcher struct{ n int }
func (f *fakeFetcher) Fetch() ([]Item, time.Time, error) {
f.n++
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
return []Item{{Title: fmt.Sprintf("Article %d", f.n)}},
time.Now().Add(300 * time.Millisecond), nil
}
func main() {
s := Subscribe(&fakeFetcher{})
// Read a few items
for i := 0; i < 5; i++ {
fmt.Println(<-s.Updates())
}
// Clean shutdown — no goroutine leaks, no races
fmt.Println("closing:", s.Close())
}