Naive Subscription Bugs

Three bugs in a naive subscription implementation, and how a for-select loop fixes all of them. Based on Sameer Ajmani's Advanced Go Concurrency Patterns talk.

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Item struct{ Title string }

type Fetcher interface {
	Fetch() ([]Item, time.Time, error)
}

// --- Naive implementation: 3 bugs ---

type naiveSub struct {
	fetcher Fetcher
	updates chan Item
	closed  bool // BUG 1: unsynchronized read/write across goroutines
	err     error // BUG 1: same — data race
}

func NaiveSubscribe(fetcher Fetcher) *naiveSub {
	s := &naiveSub{fetcher: fetcher, updates: make(chan Item)}
	go s.loop()
	return s
}

func (s *naiveSub) Updates() <-chan Item { return s.updates }

func (s *naiveSub) Close() error {
	s.closed = true // BUG 1: race — loop reads s.closed without sync
	return s.err    // BUG 1: race — loop writes s.err without sync
}

func (s *naiveSub) loop() {
	for {
		if s.closed { // BUG 1: unsynchronized read
			close(s.updates)
			return
		}
		items, next, err := s.fetcher.Fetch()
		if err != nil {
			s.err = err                          // BUG 1: unsynchronized write
			time.Sleep(10 * time.Second)         // BUG 2: blocks 10s, ignores Close
			continue
		}
		for _, item := range items {
			s.updates <- item // BUG 3: blocks forever if receiver stops reading
		}
		if now := time.Now(); next.After(now) {
			time.Sleep(next.Sub(now)) // BUG 2: blocks, ignores Close
		}
	}
}

// Bug 1: s.closed and s.err accessed from two goroutines without sync → data race
// Bug 2: time.Sleep blocks the loop — Close() has no effect until sleep finishes
// Bug 3: s.updates <- item blocks forever if nobody is reading

// --- Fixed implementation: for-select loop ---

type sub struct {
	fetcher Fetcher
	updates chan Item
	closing chan chan error
}

func Subscribe(fetcher Fetcher) *sub {
	s := &sub{
		fetcher: fetcher,
		updates: make(chan Item),
		closing: make(chan chan error),
	}
	go s.loop()
	return s
}

func (s *sub) Updates() <-chan Item { return s.updates }

func (s *sub) Close() error {
	errc := make(chan error)
	s.closing <- errc // send request to loop
	return <-errc     // wait for loop to reply
}

func (s *sub) loop() {
	var pending []Item
	var next time.Time
	var err error

	for {
		var fetchDelay time.Duration
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		startFetch := time.After(fetchDelay)

		// Nil channel trick: disable send when nothing pending
		var first Item
		var updates chan Item
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates
		}

		select {
		case errc := <-s.closing: // Fix bug 1: no shared state, communicate via channel
			errc <- err
			close(s.updates)
			return

		case <-startFetch: // Fix bug 2: timer instead of Sleep, select can still handle Close
			var fetched []Item
			fetched, next, err = s.fetcher.Fetch()
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			pending = append(pending, fetched...)

		case updates <- first: // Fix bug 3: send is a select case, never blocks forever
			pending = pending[1:]
		}
	}
}

// --- Demo ---

type fakeFetcher struct{ n int }

func (f *fakeFetcher) Fetch() ([]Item, time.Time, error) {
	f.n++
	time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
	return []Item{{Title: fmt.Sprintf("Article %d", f.n)}},
		time.Now().Add(300 * time.Millisecond), nil
}

func main() {
	s := Subscribe(&fakeFetcher{})

	// Read a few items
	for i := 0; i < 5; i++ {
		fmt.Println(<-s.Updates())
	}

	// Clean shutdown — no goroutine leaks, no races
	fmt.Println("closing:", s.Close())
}
▶ Open Go Playground

Copy the code above and paste to run

© 2026 ByteLearn.dev. Free courses for developers. · Privacy