03 - Pipeline Pattern
📋 Jump to TakeawaysA pipeline is a series of stages connected by channels. Each stage is a goroutine that receives values, processes them, and sends results to the next stage. Data flows in one direction, like an assembly line. You'll see this pattern everywhere in Go — log processing, data transformation, stream handling.
Why Pipelines
Without pipelines, you'd process data sequentially:
data := getData()
filtered := filter(data)
transformed := transform(filtered)
result := aggregate(transformed)This works, but each step waits for the previous one to finish. With pipelines, stages run concurrently — stage 2 starts processing as soon as stage 1 produces its first value.
Basic Pipeline
Three stages: generate → square → print.
// Stage 1: Generate numbers
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// Stage 2: Square each number
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// Chain the stages
nums := gen(2, 3, 4, 5)
squares := square(nums)
// Stage 3: Print results
for val := range squares {
fmt.Println(val) // 4, 9, 16, 25
}
}Notice that gen and square return immediately — the goroutines inside them run in the background. While square is processing the first number, gen is already sending the second. That's the concurrency.
Each stage owns its output channel and closes it when done. The next stage ranges over the input and exits when the channel closes. Clean shutdown propagates automatically.
The Pattern
Every pipeline stage follows the same shape:
- Take a receive-only channel as input (
<-chan T) - Create a new channel for output
- Launch a goroutine that reads from input, processes, writes to output
- Close the output channel when input is exhausted
- Return the output channel
func stage(in <-chan T) <-chan U {
out := make(chan U)
go func() {
defer close(out)
for val := range in {
out <- process(val)
}
}()
return out
}This makes stages composable. Plug them together like Lego.
Adding More Stages
Pipelines shine when you chain multiple transformations.
// Filter: only pass even numbers
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
// Double each value
func double(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * 2
}
}()
return out
}
func main() {
nums := gen(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
evens := filterEven(nums)
doubled := double(evens)
for val := range doubled {
fmt.Println(val) // 4, 8, 12, 16, 20
}
}Each stage runs in its own goroutine. While filterEven is checking the next number, double is already processing the previous one.
Pipeline with Context
Real pipelines need cancellation. If the consumer stops reading, the producer should stop producing.
A common misconception: "the garbage collector will clean up blocked goroutines." It won't. A goroutine stuck on a channel send is still alive from the runtime's perspective — the GC only collects unreachable memory, not blocked goroutines. In a long-running server, leaked goroutines accumulate and eventually eat all your memory. Context gives them an exit path.
func gen(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nums := gen(ctx, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squares := square(ctx, nums)
// Only take the first 3 results
for i := 0; i < 3; i++ {
fmt.Println(<-squares)
}
cancel() // stop the pipeline — no goroutine leaks
}Without context, the gen goroutine would block forever trying to send values that nobody reads. Context cancellation prevents goroutine leaks.
Practical Example: Log Processing
Process log lines through a pipeline: read → parse → filter → format.
type LogEntry struct {
Timestamp time.Time
Level string
Message string
}
func readLines(ctx context.Context, lines []string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for _, line := range lines {
select {
case out <- line:
case <-ctx.Done():
return
}
}
}()
return out
}
func parse(ctx context.Context, in <-chan string) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
for line := range in {
parts := strings.SplitN(line, " ", 3)
if len(parts) < 3 {
continue // skip malformed lines
}
ts, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
continue
}
select {
case out <- LogEntry{Timestamp: ts, Level: parts[1], Message: parts[2]}:
case <-ctx.Done():
return
}
}
}()
return out
}
func filterErrors(ctx context.Context, in <-chan LogEntry) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
for entry := range in {
if entry.Level == "ERROR" {
select {
case out <- entry:
case <-ctx.Done():
return
}
}
}
}()
return out
}Chain them: readLines → parse → filterErrors → print. Each stage does one thing. Easy to test, easy to extend.
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logs := []string{
"2026-04-10T10:00:00Z INFO Server started",
"2026-04-10T10:00:05Z ERROR Connection refused",
"2026-04-10T10:00:10Z INFO Request handled",
"bad line",
"2026-04-10T10:00:15Z ERROR Timeout exceeded",
}
lines := readLines(ctx, logs)
parsed := parse(ctx, lines)
errors := filterErrors(ctx, parsed)
for entry := range errors {
fmt.Printf("[%s] %s: %s\n", entry.Timestamp.Format("15:04:05"), entry.Level, entry.Message)
}
}When to Use Pipelines
Pipelines work best when:
- Data flows through multiple transformation steps
- Each step is independent and can run concurrently
- You're processing a stream of values, not a single batch
They're overkill when:
- You have a simple loop that processes items sequentially
- The processing is CPU-bound and there's no I/O to overlap
- You only have two stages — just use a goroutine and a channel
Key Takeaways
- A pipeline is stages connected by channels — each stage is a goroutine
- Every stage: receive from input, process, send to output, close output when done
- Stages are composable — chain them together for complex processing
- Always add context for cancellation to prevent goroutine leaks
- Use
selectwithctx.Done()on every channel send in a pipeline stage - Pipelines shine for streaming data with multiple transformation steps