S3 JSONL Pipeline

Process JSONL files from S3 and publish records to Kinesis. Combines pipelines, semaphores (capped downloads), and a worker pool (batched publishing).

package main

import (
	"context"
	"fmt"
	"strings"
	"sync"
	"time"
)

// Simulated S3 file — in real code, use s3.GetObject
func downloadFile(ctx context.Context, key string) ([]string, error) {
	time.Sleep(200 * time.Millisecond) // simulate S3 download
	return []string{
		`{"event":"click","user":"alice"}`,
		`{"event":"view","user":"bob"}`,
		`{"event":"purchase","user":"charlie"}`,
	}, nil
}

// Simulated Kinesis publish — in real code, use kinesis.PutRecords
func publishBatch(records []string) error {
	time.Sleep(100 * time.Millisecond) // simulate Kinesis API call
	fmt.Printf("published %d records\n", len(records))
	return nil
}

// Stage 1: Download files from S3 (semaphore — max 5 concurrent downloads)
func downloadStage(ctx context.Context, files []string) <-chan []string {
	out := make(chan []string, len(files))
	sem := make(chan struct{}, 5) // max 5 concurrent downloads
	var wg sync.WaitGroup

	for _, file := range files {
		wg.Add(1)
		go func(key string) {
			sem <- struct{}{} // acquire slot
			defer wg.Done()
			defer func() { <-sem }() // release slot

			lines, err := downloadFile(ctx, key)
			if err != nil {
				fmt.Printf("download error %s: %v\n", key, err)
				return
			}
			select {
			case out <- lines:
			case <-ctx.Done():
			}
		}(file)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

// Stage 2: Read lines from downloaded files, emit one line at a time
func readLinesStage(ctx context.Context, in <-chan []string) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for lines := range in {
			for _, line := range lines {
				line = strings.TrimSpace(line)
				if line == "" {
					continue
				}
				select {
				case out <- line:
				case <-ctx.Done():
					return
				}
			}
		}
	}()
	return out
}

// Stage 3: Batch and publish to Kinesis (worker pool — 3 publishers)
func publishStage(ctx context.Context, in <-chan string, batchSize int) {
	jobs := make(chan []string)
	var wg sync.WaitGroup

	// Start 3 publisher workers
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for batch := range jobs {
				if err := publishBatch(batch); err != nil {
					fmt.Printf("publish error: %v\n", err)
				}
			}
		}()
	}

	// Accumulate lines into batches
	batch := make([]string, 0, batchSize)
	for line := range in {
		batch = append(batch, line)
		if len(batch) >= batchSize {
			select {
			case jobs <- batch:
			case <-ctx.Done():
				close(jobs)
				wg.Wait()
				return
			}
			batch = make([]string, 0, batchSize)
		}
	}
	// Flush remaining
	if len(batch) > 0 {
		jobs <- batch
	}
	close(jobs)
	wg.Wait()
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	files := []string{
		"data/2024-01-01.jsonl",
		"data/2024-01-02.jsonl",
		"data/2024-01-03.jsonl",
		"data/2024-01-04.jsonl",
		"data/2024-01-05.jsonl",
		"data/2024-01-06.jsonl",
		"data/2024-01-07.jsonl",
		"data/2024-01-08.jsonl",
	}

	// Pipeline: download → read lines → batch & publish
	downloaded := downloadStage(ctx, files)
	lines := readLinesStage(ctx, downloaded)
	publishStage(ctx, lines, 5) // batch 5 records per Kinesis call

	fmt.Println("pipeline complete")
}

Patterns used:

  • Pipeline — three stages connected by channels: download → read → publish
  • Semaphore (stage 1) — caps S3 downloads at 5 concurrent. Many goroutines, only 5 active at a time
  • Worker pool (stage 3) — 3 long-lived publisher goroutines pulling batches from a job channel
  • Bounded channels — backpressure flows naturally. If publishers are slow, batching blocks, which blocks line reading, which blocks downloads
  • Context — timeout cancels the entire pipeline

💻 Run locally

Copy the code above and run it on your machine

© 2026 ByteLearn.dev. Free courses for developers. · Privacy