10 - Client & Bidirectional Streaming
📋 Jump to TakeawaysServer streaming sends many responses for one request. Client streaming flips that: the client sends many requests and the server responds once. Bidirectional streaming lets both sides send messages at the same time.
Client Streaming
Use client streaming when the client has a batch of data to send. For our shortener, we'll add a BatchCreateLinks RPC that accepts a stream of URLs and returns a summary.
Proto Definition
Add to proto/link.proto:
message BatchCreateLinksResponse {
int64 created_count = 1;
repeated Link links = 2;
}
service LinkService {
// ... existing RPCs
rpc BatchCreateLinks(stream CreateLinkRequest) returns (BatchCreateLinksResponse);
}The stream keyword is on the request side now. Regenerate after updating:
make protoGenerated Stream Types
The generated code includes a new stream interface:
type LinkService_BatchCreateLinksServer interface {
SendAndClose(*BatchCreateLinksResponse) error // send final response and close
Recv() (*CreateLinkRequest, error) // read next client message
grpc.ServerStream
}This is the stream object the server uses to read client messages and send the final response.
Note: With
protoc-gen-go-grpcv1.4+, you'll see a generic type alias instead of an explicit interface:type LinkService_BatchCreateLinksServer = grpc.ClientStreamingServer[CreateLinkRequest, BatchCreateLinksResponse]The methods (
Recv,SendAndClose) are the same — they're just defined ongrpc.ClientStreamingServerrather than spelled out in the generated file. Usage is identical.
Server: BatchCreateLinks
Add this method to your linkServer (the server struct from lesson 07) in main.go (before func main()). You'll also need to add "io" to your imports:
func (s *linkServer) BatchCreateLinks(stream pb.LinkService_BatchCreateLinksServer) error {
var created []*pb.Link
for {
req, err := stream.Recv()
if err == io.EOF {
// Client is done sending — return the final response
return stream.SendAndClose(&pb.BatchCreateLinksResponse{
CreatedCount: int64(len(created)),
Links: created,
})
}
if err != nil {
return err
}
id := s.nextID.Add(1)
link := &pb.Link{
Id: id,
Url: req.GetUrl(),
ShortCode: shortCode(),
}
s.mu.Lock()
s.links[link.ShortCode] = link
s.mu.Unlock()
created = append(created, link)
}
}The server calls stream.Recv() in a loop to read client messages one at a time. When the client signals it's done, Recv() returns io.EOF. The server then calls stream.SendAndClose() — this sends the one and only response and closes the stream.
Like server streaming, there's no context.Context parameter. Use stream.Context() if you need it.
Client: BatchCreateLinks
Add to cmd/client/main.go:
func batchCreate(client pb.LinkServiceClient, urls []string) []*pb.Link {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := client.BatchCreateLinks(ctx)
if err != nil {
log.Fatal(err)
}
// Send all URLs one by one
for _, url := range urls {
if err := stream.Send(&pb.CreateLinkRequest{Url: url}); err != nil {
log.Fatal(err)
}
}
// Signal we're done sending and get the response
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatal(err)
}
fmt.Printf("created %d links\n", resp.GetCreatedCount())
for _, link := range resp.GetLinks() {
fmt.Printf(" %s -> %s\n", link.GetShortCode(), link.GetUrl())
}
return resp.GetLinks()
}The client calls stream.Send() for each message, then stream.CloseAndRecv() to signal it's done and get the response.
SendAndClose and CloseAndRecv are two ends of the same handshake:
- Server calls
SendAndClose()— sends the response and closes its side - Client calls
CloseAndRecv()— closes its sending side and waits for the response
Running BatchCreateLinks
Update main() in cmd/client/main.go to call batchCreate:
func main() {
conn, err := grpc.NewClient("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pb.NewLinkServiceClient(conn)
batchCreate(client, []string{
"https://example.com/article-one",
"https://example.com/article-two",
"https://example.com/article-three",
})
}created 3 links
k7m2px -> https://example.com/article-one
a3f9wq -> https://example.com/article-two
zt04bn -> https://example.com/article-threeBidirectional Streaming
Both sides send and receive at the same time. This is the most flexible pattern but also the most complex. Use it for chat systems, real-time sync, or any protocol where both sides need to talk independently.
Proto Definition
Add to proto/link.proto:
message SyncRequest {
string short_code = 1;
int64 clicks = 2;
}
message SyncResponse {
string short_code = 1;
int64 total_clicks = 2;
bool acknowledged = 3;
}
service LinkService {
// ... existing RPCs
rpc SyncClicks(stream SyncRequest) returns (stream SyncResponse);
}Both request and response have the stream keyword. Regenerate:
make protoServer: SyncClicks
Add this method to your linkServer in main.go (after BatchCreateLinks, before func main()):
func (s *linkServer) SyncClicks(stream pb.LinkService_SyncClicksServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil // client is done, close the stream
}
if err != nil {
return err
}
s.mu.Lock()
link, ok := s.links[req.GetShortCode()]
var total int64
if ok {
link.Clicks += req.GetClicks()
total = link.Clicks
}
s.mu.Unlock()
if err := stream.Send(&pb.SyncResponse{
ShortCode: req.GetShortCode(),
TotalClicks: total,
Acknowledged: ok,
}); err != nil {
return err
}
}
}The server reads and writes on the same stream. It doesn't have to alternate — it could read three messages then send one response, or send a response for every message. The protocol is up to you. Here we respond to each message immediately.
Client: SyncClicks
Add to cmd/client/main.go:
func syncClicks(client pb.LinkServiceClient, codes []string) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := client.SyncClicks(ctx)
if err != nil {
log.Fatal(err)
}
// Send in a goroutine
go func() {
for _, code := range codes {
err := stream.Send(&pb.SyncRequest{
ShortCode: code,
Clicks: 10,
})
if err != nil {
log.Printf("send error: %v", err)
return
}
}
stream.CloseSend() // IMPORTANT: signals server we're done sending
}()
// Receive in the main goroutine
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Printf("synced %s: total=%d ack=%v\n",
resp.GetShortCode(), resp.GetTotalClicks(), resp.GetAcknowledged())
}
}Why the goroutine? Both sides send and receive simultaneously. If you send all messages first and then try to receive, it works for small payloads. But with large streams, the send buffer fills up and blocks — while the receive side is also blocked waiting for you to read. That's a deadlock. Using a goroutine lets sending and receiving happen concurrently.
CloseSend() is critical. It tells the server the client is done sending. Without it, the server's Recv() never gets io.EOF and hangs forever.
Note: after CloseSend(), the client can still receive — it only closes the sending direction.
Running SyncClicks
Update main() in cmd/client/main.go to call both batchCreate and syncClicks:
func main() {
conn, err := grpc.NewClient("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pb.NewLinkServiceClient(conn)
// First batch-create some links
created := batchCreate(client, []string{
"https://example.com/article-one",
"https://example.com/article-two",
"https://example.com/article-three",
})
// Then sync clicks using their short codes
var codes []string
for _, link := range created {
codes = append(codes, link.GetShortCode())
}
syncClicks(client, codes)
}synced k7m2px: total=10 ack=true
synced a3f9wq: total=10 ack=true
synced zt04bn: total=10 ack=trueIf you pass a short code that doesn't exist, you'll get ack=false:
synced nonexistent: total=0 ack=falseWhen to Use Each Pattern
| Pattern | Client sends | Server sends | Use case |
|---|---|---|---|
| Unary | 1 message | 1 message | Most RPCs |
| Server streaming | 1 message | many messages | Lists, feeds, progress |
| Client streaming | many messages | 1 message | Batch uploads, log ingestion |
| Bidirectional | many messages | many messages | Chat, real-time sync |
Start with unary. Only use streaming when you have a real reason. Most services work fine with unary RPCs and the occasional server stream.
Key Takeaways
- Client streaming: client sends many messages, server responds once with
SendAndClose() SendAndClose()(server) andCloseAndRecv()(client) are two ends of the same handshake- Bidirectional streaming: both sides send and receive independently on the same stream
- Use a goroutine on the client for concurrent send/receive to avoid deadlock
CloseSend()is critical — without it the server hangs waiting for more messages- The server doesn't have to respond to every message — the protocol is yours to design
- Start with unary RPCs, add streaming only when the use case demands it