09 - Server Streaming
📋 Jump to TakeawaysUnary RPCs are one request, one response. Server streaming is one request, many responses. The client sends a single request and the server sends back a stream of messages. Think of it like subscribing to a feed.
When to Use Server Streaming
Use it when the response is naturally a sequence: listing items, tailing logs, watching for changes, sending progress updates. Instead of loading everything into memory and sending one giant response, you stream items one at a time.
For our link shortener, we'll add a ListLinks RPC that streams all links to the client.
Define the RPC
Update the proto file:
service LinkService {
rpc CreateLink(CreateLinkRequest) returns (CreateLinkResponse);
rpc GetLink(GetLinkRequest) returns (GetLinkResponse);
rpc DeleteLink(DeleteLinkRequest) returns (DeleteLinkResponse);
rpc ListLinks(ListLinksRequest) returns (stream Link);
}
message ListLinksRequest {}The stream keyword before Link in the return type makes this a server streaming RPC. The request is a regular message (even if empty).
Regenerate:
make protoGenerated Stream Interface
After regenerating, the LinkServiceServer interface now includes:
ListLinks(*ListLinksRequest, LinkService_ListLinksServer) errorAnd a new stream interface is generated:
type LinkService_ListLinksServer interface {
Send(*Link) error
grpc.ServerStream
}This is the stream object you'll use to send messages to the client. Send() pushes one message, and grpc.ServerStream gives you access to Context() and metadata methods.
Server Implementation
func (s *linkServer) ListLinks(req *pb.ListLinksRequest, stream pb.LinkService_ListLinksServer) error {
s.mu.RLock()
defer s.mu.RUnlock()
for _, link := range s.links {
if err := stream.Send(link); err != nil {
return err
}
}
return nil
}Notice the signature is different from unary RPCs:
- No
context.Contextparameter — the context is available throughstream.Context()instead. gRPC embeds it in the stream because streaming methods need the stream object anyway. - No response return value — you don't return a response message. Instead, you call
stream.Send()for each message you want to send. - Returns
erroronly — returnnilto close the stream successfully. Return an error to close with a failure status.
When the function returns nil, gRPC closes the stream and the client knows there are no more messages.
Client Implementation
On the client side, you call the method and get a stream object. Then you read messages in a loop until the stream ends:
import "io" // for io.EOF
func listLinks(client pb.LinkServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ListLinks(ctx, &pb.ListLinksRequest{})
if err != nil {
log.Fatalf("ListLinks failed: %v", err)
}
for {
link, err := stream.Recv()
if err == io.EOF {
break // server closed the stream, no more messages
}
if err != nil {
log.Fatalf("error receiving: %v", err)
}
fmt.Printf("link: %s -> %s (clicks: %d)\n", link.GetShortCode(), link.GetUrl(), link.GetClicks())
}
}stream.Recv() blocks until the next message arrives. When the server returns nil (closes the stream), Recv() returns io.EOF — this is Go's standard "end of data" signal (same as reading to the end of a file). Any other error means something went wrong.
Streaming with Delays
Server streaming is especially useful for real-time data. Here's a conceptual example that simulates watching for new links by sending updates every 2 seconds:
// This would require adding to the proto file:
// rpc WatchLinks(WatchLinksRequest) returns (stream Link);
// message WatchLinksRequest {}
func (s *linkServer) WatchLinks(req *pb.WatchLinksRequest, stream pb.LinkService_WatchLinksServer) error {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-stream.Context().Done():
// Client disconnected or deadline expired
return stream.Context().Err()
case <-ticker.C:
s.mu.RLock()
for _, link := range s.links {
if err := stream.Send(link); err != nil {
s.mu.RUnlock()
return err
}
}
s.mu.RUnlock()
}
}
}stream.Context() carries the client's deadline and cancellation. When the client disconnects or the timeout expires, the context is cancelled and the server stops sending. This is how you avoid leaking goroutines in long-lived streams.
Testing with grpcurl
# Create some links first
grpcurl -plaintext -d '{"url": "https://example.com/first-article"}' localhost:50051 shortener.LinkService/CreateLink
grpcurl -plaintext -d '{"url": "https://example.com/second-article"}' localhost:50051 shortener.LinkService/CreateLink
# Stream all links
grpcurl -plaintext localhost:50051 shortener.LinkService/ListLinksEach link appears as a separate JSON object in the output:
{
"id": "1",
"url": "https://example.com/first-article",
"shortCode": "k7m2px"
}
{
"id": "2",
"url": "https://example.com/second-article",
"shortCode": "a3f9wq"
}Flow Control
gRPC handles flow control automatically. If the client reads slowly, the server's Send() calls will eventually block — this is called backpressure. It prevents the server from flooding a slow client with data it can't process.
Think of it like a pipe: if the receiving end is clogged, the sending end can't push more water through. You don't implement this yourself — just call Send() and gRPC handles the buffering and blocking.
Key Takeaways
- Server streaming: one request, many responses — use the
streamkeyword in the return type - The generated stream interface (
LinkService_ListLinksServer) hasSend()andContext()methods - Server calls
stream.Send()for each message, returnsnilto close the stream - Client calls
stream.Recv()in a loop, checks forio.EOFto detect the end - No
context.Contextparameter in streaming methods — usestream.Context()instead - gRPC handles backpressure automatically — slow clients won't be overwhelmed
- Good for listing items, tailing logs, watching for changes, progress updates