S3 JSONL Pipeline
Process JSONL files from S3 and publish records to Kinesis. Combines pipelines, semaphores (capped downloads), and a worker pool (batched publishing).
package main
import (
"context"
"fmt"
"strings"
"sync"
"time"
)
// Simulated S3 file — in real code, use s3.GetObject
func downloadFile(ctx context.Context, key string) ([]string, error) {
time.Sleep(200 * time.Millisecond) // simulate S3 download
return []string{
`{"event":"click","user":"alice"}`,
`{"event":"view","user":"bob"}`,
`{"event":"purchase","user":"charlie"}`,
}, nil
}
// Simulated Kinesis publish — in real code, use kinesis.PutRecords
func publishBatch(records []string) error {
time.Sleep(100 * time.Millisecond) // simulate Kinesis API call
fmt.Printf("published %d records\n", len(records))
return nil
}
// Stage 1: Download files from S3 (semaphore — max 5 concurrent downloads)
func downloadStage(ctx context.Context, files []string) <-chan []string {
out := make(chan []string, len(files))
sem := make(chan struct{}, 5) // max 5 concurrent downloads
var wg sync.WaitGroup
for _, file := range files {
wg.Add(1)
go func(key string) {
sem <- struct{}{} // acquire slot
defer wg.Done()
defer func() { <-sem }() // release slot
lines, err := downloadFile(ctx, key)
if err != nil {
fmt.Printf("download error %s: %v\n", key, err)
return
}
select {
case out <- lines:
case <-ctx.Done():
}
}(file)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// Stage 2: Read lines from downloaded files, emit one line at a time
func readLinesStage(ctx context.Context, in <-chan []string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for lines := range in {
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
select {
case out <- line:
case <-ctx.Done():
return
}
}
}
}()
return out
}
// Stage 3: Batch and publish to Kinesis (worker pool — 3 publishers)
func publishStage(ctx context.Context, in <-chan string, batchSize int) {
jobs := make(chan []string)
var wg sync.WaitGroup
// Start 3 publisher workers
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for batch := range jobs {
if err := publishBatch(batch); err != nil {
fmt.Printf("publish error: %v\n", err)
}
}
}()
}
// Accumulate lines into batches
batch := make([]string, 0, batchSize)
for line := range in {
batch = append(batch, line)
if len(batch) >= batchSize {
select {
case jobs <- batch:
case <-ctx.Done():
close(jobs)
wg.Wait()
return
}
batch = make([]string, 0, batchSize)
}
}
// Flush remaining
if len(batch) > 0 {
jobs <- batch
}
close(jobs)
wg.Wait()
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
files := []string{
"data/2024-01-01.jsonl",
"data/2024-01-02.jsonl",
"data/2024-01-03.jsonl",
"data/2024-01-04.jsonl",
"data/2024-01-05.jsonl",
"data/2024-01-06.jsonl",
"data/2024-01-07.jsonl",
"data/2024-01-08.jsonl",
}
// Pipeline: download → read lines → batch & publish
downloaded := downloadStage(ctx, files)
lines := readLinesStage(ctx, downloaded)
publishStage(ctx, lines, 5) // batch 5 records per Kinesis call
fmt.Println("pipeline complete")
}Patterns used:
- Pipeline — three stages connected by channels: download → read → publish
- Semaphore (stage 1) — caps S3 downloads at 5 concurrent. Many goroutines, only 5 active at a time
- Worker pool (stage 3) — 3 long-lived publisher goroutines pulling batches from a job channel
- Bounded channels — backpressure flows naturally. If publishers are slow, batching blocks, which blocks line reading, which blocks downloads
- Context — timeout cancels the entire pipeline