Log Processing Pipeline
Read → parse → filter errors. A practical pipeline with context cancellation.
package main
import (
"context"
"fmt"
"strings"
"time"
)
type LogEntry struct {
Timestamp time.Time
Level string
Message string
}
func readLines(ctx context.Context, lines []string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for _, line := range lines {
select {
case out <- line:
case <-ctx.Done():
return
}
}
}()
return out
}
func parse(ctx context.Context, in <-chan string) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
for line := range in {
parts := strings.SplitN(line, " ", 3)
if len(parts) < 3 {
continue
}
ts, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
continue
}
select {
case out <- LogEntry{Timestamp: ts, Level: parts[1], Message: parts[2]}:
case <-ctx.Done():
return
}
}
}()
return out
}
func filterErrors(ctx context.Context, in <-chan LogEntry) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
for entry := range in {
if entry.Level == "ERROR" {
select {
case out <- entry:
case <-ctx.Done():
return
}
}
}
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logs := []string{
"2026-04-10T10:00:00Z INFO Server started",
"2026-04-10T10:00:05Z ERROR Connection refused",
"2026-04-10T10:00:10Z INFO Request handled",
"bad line",
"2026-04-10T10:00:15Z ERROR Timeout exceeded",
}
lines := readLines(ctx, logs)
parsed := parse(ctx, lines)
errors := filterErrors(ctx, parsed)
fmt.Printf("Processing %d log lines...\n\n", len(logs))
count := 0
for entry := range errors {
count++
fmt.Printf(" ❌ [%s] %s: %s\n", entry.Timestamp.Format("15:04:05"), entry.Level, entry.Message)
}
fmt.Printf("\nFound %d errors out of %d lines (malformed lines skipped)\n", count, len(logs))
}