Java gRPC streaming
Java

Java gRPC Streaming: A Deep Dive

Introduction to gRPC Streaming in Java

What is gRPC?

Java gRPC Streaming, short for Java Google Remote Procedure Call, is a modern open source framework for RPC (Remote Procedure Call) developed by Google. It enables applications to communicate with each other as easily as if they were making local calls, but across different computers and environments. At its core, gRPC uses HTTP/2 for transport, Protocol Buffers as Interface Definition Language (IDL) and provides features such as authentication, load balancing and tracing. This combination results in a highly efficient and powerful communication mechanism that is particularly suitable for microservices architectures.

Why streaming?

While traditional unary RPC (where a single request results in a single response) is perfectly adequate for many scenarios, it is insufficient for certain types of interactions that require a continuous flow of data or real-time updates. In this case, streaming is invaluable. Think of situations where:

Real-time data feeds

Applications need to receive a continuous stream of updates, such as live stock quotes, sensor readings or news, without constantly polling the server.

Large file transfers

Sending or receiving very large files can be inefficient with non-ary calls, as the entire file must be cached before transmission or processing. With streaming, data can be sent and processed in chunks.

Interactive applications

For conversational interfaces, collaboration tools or online games, continuous bi-directional communication is essential for a smooth user experience.

Advantages of gRPC streaming

The integrated gRPC support for streaming, which is supported by HTTP/2, offers several significant advantages over traditional RPC or other communication paradigms:

Persistent connections (HTTP/2)

Unlike HTTP/1.1, which typically closes connections after each request-response cycle, HTTP/2 allows long-lived, persistent connections. This reduces the effort required to establish new connections for each interaction, resulting in lower latency and better resource utilization.

Multiplexing

The multiplexing capability of HTTP/2 means that multiple requests and responses can be sent simultaneously over a single TCP connection. This avoids blocking at the beginning ofthe line and improves the overall throughput, especially in scenarios with high parallelism.

Lower latency

By enabling a continuous flow of data and reducing connection overhead, gRPC streaming significantly reduces latency** for real-time applications. Data can be sent and received as soon as it is available instead of waiting for an entire message to be formed or a new connection to be established.

Understanding gRPC streaming concepts

The power of protocol buffers (.proto)

At the heart of gRPC, and consequently gRPC streaming, are protocol buffers (protobuf). This language- and platform-neutral, extensible mechanism for serializing structured data is used to define the service contract between the client and the server. Think of a “proto” file as a blueprint that precisely outlines the data structures (messages) to be exchanged and the methods (services) that can be called. This strict contract ensures that both the client and the server understand the format of the data, regardless of the programming language in which it is written. For streaming, Protobuf defines the message types that flow within a stream, ensuring data consistency and interoperability.

stream keyword in .proto

The magic behind gRPC streaming in the .proto file is the stream keyword. This keyword explicitly tells the protobuf compiler that a particular method involves a continuous flow of messages and not just a single request-response pair.

How stream is used:

  • Server-Side Streaming: When the server sends a stream of messages back to the client, the stream keyword is applied to the response type of the RPC method. For example:
 service MyService {
 rpc GetUpdates(Request) returns (stream Response);
 }

Here the client sends a “request” and the server can return several “response” messages.

  • Client-Side Streaming: When the client sends a stream of messages to the server, the keyword “stream” is applied to the request type of the RPC method. For example:
 service MyService {
 rpc UploadData(stream DataChunk) returns (UploadSummary);
 }

In this case, the client sends multiple DataChunk messages and the server returns a single UploadSummary after processing all chunks.

  • Bidirectional streaming: If both the client and the server exchange message streams simultaneously and independently, the keyword “stream” is applied to both request and response types. For example:
 service MyService {
 rpc Chat(stream ChatMessage) returns (stream ChatMessage);
 }

This structure enables a continuous, two-way conversation in which both sides can send messages at any time.

StreamObserver

In Java, the StreamObserver interface is the cornerstone for handling gRPC streaming operations. It acts as an event-driven callback mechanism that allows your application to respond to messages as they arrive in a stream. Both the client and server implementations of gRPC streaming methods rely heavily on StreamObserver to manage the data flow.

Important methods of StreamObserver:

  • void onNext(T value): This method is called each time a new message (value of type T) is received from the stream. For a server, this method is called when the client sends a message. For a client, this method is called when the server sends a message.
  • void onError(Throwable t): This method is called when an error occurs during stream processing. The throwable t provides details about the error. It signals that the stream was aborted due to an error.
  • void onCompleted(): This method is called when the stream has been successfully completed and no further messages are expected. On a server, this method is called when the client signals that it has finished sending messages. For a client, this method is called when the server signals that it has finished sending messages.

Understanding StreamObserver is critical as it provides the asynchronous, non-blocking paradigm for handling the continuous flow of data in gRPC streaming, enabling efficient and responsive applications.

Server-side streaming (server to client)

Concept and use cases

In server-side streaming, the client sends a single request message to the server, and the server responds by sending back a stream of multiple messages. Think of it as if you were subscribing to a live news feed. You send a request to subscribe, and the news server continuously sends you new articles as they become available.

This pattern is ideal for scenarios where the server needs to send a continuous or potentially large amount of data to the client without the need for multiple individual requests. Common use cases are:

  • Real-time stock updates: A client requests data for a specific stock and the server continuously transmits price changes.
  • News feeds: A client subscribes to a news category and the server transmits new articles as they are published.
  • Large data reports: Instead of sending a large single response that may take too long or consume too much disk space, the server can transmit the report in smaller, more manageable packets.
  • Live sensor readings: A client requests data from a sensor and the server streams a continuous stream of readings.

Java implementation

Defining the RPC in .proto

To define a server-side streaming method in your Protocol Buffers .proto file, use the stream keyword for the response type. For example:

service StockService {
 rpc GetStockUpdates(StockRequest) returns (stream StockUpdate);
}

message StockRequest {
 string symbol = 1;
}

message StockUpdate {
 string symbol = 1;
 double price = 2;
 int64 timestamp = 3;
}

Here GetStockUpdates takes a single StockRequest, but returns a stream StockUpdate, indicating that the server will send multiple StockUpdate messages.

Server-side logic

On the server, your service implementation receives the single client request and then uses the provided StreamObserver to send multiple responses.

public class StockServiceImpl extends StockServiceGrpc.StockServiceImplBase {

 @Override
 public void getStockUpdates(StockRequest request, StreamObserver<StockUpdate> responseObserver) {
 String symbol = request.getSymbol();
 System.out.println("Client requested updates for: " + symbol);

 // Simulate the sending of continuous updates
 for (int i = 0; i < 5; i++) {
 StockUpdate update = StockUpdate.newBuilder()
 .setSymbol(Symbol)
 .setPrice(100.0 + i * 0.5) // Example price
 .setTimestamp(System.currentTimeMillis())
 .build();
 responseObserver.onNext(update); // Send a single update
 try {
 Thread.sleep(1000); // Wait for one second
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 responseObserver.onError(e);
 back;
 }
 }
 responseObserver.onCompleted(); // Signal that the stream has ended
 System.out.println("Finished sending updates for: " + symbol);
 }
}

The most important methods here are:

  • responseObserver.onNext(message): This method is called repeatedly to send individual messages to the client.
  • responseObserver.onCompleted(): This method must be called to signal to the client that the server has finished sending all messages in the stream. If this method is not called, the client waits indefinitely.
  • responseObserver.onError(throwable): If an error occurs during the stream, this method is used to inform the client of the error.

Client-side logic

On the client, you will normally use an asynchronous stub to initiate the call. You will also need to implement a “StreamObserver” to process the incoming stream of responses from the server.

public class StockClient {

 public static void main(String[] args) throws InterruptedException {
 ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
 .usePlaintext()
 .build();

 StockServiceGrpc.StockServiceStub asyncStub = StockServiceGrpc.newStub(channel);

 StockRequest request = StockRequest.newBuilder().setSymbol("GOOG").build();

 asyncStub.getStockUpdates(request, new StreamObserver<StockUpdate>() {
 @Override
 public void onNext(StockUpdate update) {
 System.out.println("Received stock update: " + update.getSymbol() + " - " + update.getPrice());
 }

 @Override
 public void onError(Throwable t) {
 System.err.println("Error receiving stock update: " + t.getMessage());
 }

 @Override
 public void onCompleted() {
 System.out.println("Stock update stream completed.");
 }
 });

// Keep the client alive to receive updates. In a real application, you may have
 // a more sophisticated way to manage the channel and client lifecycle.
 Thread.sleep(10000); // wait for 10 seconds to receive updates
 channel.shutdown();
 }
}

On the client side, your StreamObserver implementation:

  • onNext(message): This method is called by the gRPC framework every time a new message is received from the server.
  • onError(throwable): This method is called when an error occurs on the server or during network transmission.
  • onCompleted(): This method is called when the server has successfully completed sending all messages in the stream and has called onCompleted() on its responseObserver.

Client-side streaming (client to server)

Concept and use cases

In client-side streaming, the client sends a stream of messages to the server, and the server responds with a single, final message once it has processed all incoming data. This is particularly useful when the client needs to transfer a large amount of data incrementally or when the data is generated over time.

Examples of the use of client-side streaming include:

  • Uploading large files in chunks: Instead of sending an entire file at once, the client can stream smaller chunks, allowing for better troubleshooting and progress tracking.
  • Sending a series of sensor readings: An IoT device could continuously send temperature or pressure readings to a central server for analysis.
  • Live transcription: A client could stream audio segments to a server for real-time speech-to-text conversion.
  • Batch processing: The client sends multiple records or transactions that are processed by the server in a single operation.

Java implementation

Server side

On the server, define the RPC method in your .proto file with the keyword stream for the request and a regular (unary) response:

service MyService {
 rpc UploadData(stream DataChunk) returns (UploadSummary);
}

The server-side implementation will receive a StreamObserver for the incoming DataChunk messages. The server must implement the methods onNext(), onError(), and onCompleted().

  • onNext(DataChunk chunk): This method is called repeatedly as the client sends each “DataChunk”. The server processes each chunk as soon as it arrives.
  • onError(Throwable t): This method is called when an error occurs while the client is streaming.
  • onCompleted(): This method is called when the client has finished sending all its messages. Typically, the server performs final processing within this method and then sends its single UploadSummary response using the StreamObserver response.

Client side

On the client, use an asynchronous stub to initiate the streaming call. You will get back a StreamObserver which you can use to send messages to the server.

The .proto definition in this scenario would look like this:

service MyService {
 rpc UploadData(stream DataChunk) returns (UploadSummary);
}

The client-side code becomes:

  1. Retrieve an asynchronous stub for the service.
  2. Call the streaming method on the stub, providing a StreamObserver to handle the single response from the server.
  3. Use the StreamObserver returned by the stub call to send multiple DataChunk messages using its onNext() method.
  4. After all chunks have been sent, call onCompleted() on the StreamObserver to signal the server that the stream is finished.
  5. Implement the onNext(), onError() and onCompleted() methods for the StreamObserver passed to the stub, which will process the single UploadSummary response from the server.

Bidirectional streaming (client-server and server-client)

Concept

With bidirectional streaming, both the client and the server send and receive message streams independently of each other simultaneously via a single, permanent HTTP/2 connection. This enables a continuous exchange of data in real time, where both sides can send a message at any time without waiting for a response from the other side. It’s like a two-way conversation where both parties can speak and listen at the same time.

Use Cases

Bidirectional streaming is ideal for scenarios that require interactive communication in real time.

Chat applications

Imagine a chat application where multiple users exchange messages. Each user’s client can stream their outgoing messages to the server, and the server can stream incoming messages from other users back to each client. This creates a fluid conversation in real time.

Games in real time

In online multiplayer games, client-side actions (such as player movements or spells) can be streamed to the server, and server-side updates (such as opponent positions or game state changes) can be streamed back to the clients. This ensures low-latency synchronization and a smooth gaming experience.

Live navigation updates

A navigation application could transmit a user’s current location to a server, and the server could continuously transmit back updated traffic information, alternate routes or points of interest based on the user’s progress.

Java implementation

Definition of “rpc” in “proto`

To define a bidirectional streaming method in your .proto file, use the keyword stream for both the request and response type:

service MyService {
 rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

Here “ChatMessage” is a protocol buffer message that contains the actual chat content.

Implementation of the server-side logic

On the server, your service implementation receives a StreamObserver for the incoming client messages and provides a StreamObserver to send messages back to the client.

public class MyServiceImpl extends MyServiceGrpc.MyServiceImplBase {
 @Override
 public StreamObserver<ChatMessage> chat(StreamObserver<ChatMessage> responseObserver) {
 return new StreamObserver<ChatMessage>() {
 @Override
 public void onNext(ChatMessage chatMessage) {
 // Process incoming client message
 System.out.println("Server received: " + chatMessage.getMessage());
 // Send a response back to the client
 responseObserver.onNext(ChatMessage.newBuilder().setMessage("Server echo: " + chatMessage.getMessage()).build());
 }

 @Override
 public void onError(Throwable t) {
 System.err.println("Server error: " + t.getMessage());
 }

 @Override
 public void onCompleted() {
 // The client has finished sending messages
 System.out.println("The client has finished chatting.");
 responseObserver.onCompleted(); // Signaling the completion to the client
 }
 };
 }
}

Implementation of the client-side logic

On the client, you get a StreamObserver to send your outgoing messages and you provide a StreamObserver to process the incoming messages from the server.

public class MyClient {
 public static void main(String[] args) throws InterruptedException {
 ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
 .usePlaintext()
 .build();

 MyServiceGrpc.MyServiceStub asyncStub = MyServiceGrpc.newStub(channel);

 StreamObserver<ChatMessage> requestObserver = asyncStub.chat(new StreamObserver<ChatMessage>() {
 @Override
 public void onNext(ChatMessage chatMessage) {
 // Process incoming server message
 System.out.println("Client received: " + chatMessage.getMessage());
 }

 @Override
 public void onError(Throwable t) {
 System.err.println("Client error: " + t.getMessage());
 }

 @Override
 public void onCompleted() {
 System.out.println("The server has ended the chat.");
 }
 });

 // Sending messages from the client
 requestObserver.onNext(ChatMessage.newBuilder().setMessage("Hello from client 1").build());
 Thread.sleep(1000); // Simulate a delay
 requestObserver.onNext(ChatMessage.newBuilder().setMessage("Hello from client 2").build());
 Thread.sleep(1000);

 requestObserver.onCompleted(); // signaling completion to the server
 channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
 }
}

Project setup: Dependencies and proto compilation

Maven/Gradle dependencies

To get started with gRPC in your Java project, you need to include the necessary dependencies in your build file. For Maven, these are typically included in pom.xml and for Gradle in build.gradle.

Core gRPC and protocol buffer libraries

These are the basic libraries required for any gRPC application:

  • grpc-netty-shaded: This dependency provides the Netty transport for gRPC, which handles the underlying network communication. It is often “shaded” to avoid dependency conflicts with other libraries that may use different versions of Netty.
  • grpc-protobuf: This library contains the protocol buffer runtime for gRPC, which enables serialization and deserialization of the messages you define.
  • grpc-stub: It provides the gRPC stub generation and utilities that allow you to create client stubs and implement server services based on your .proto definitions.

Protobuf plugin for code generation

You also need a plugin to automatically generate Java source code from your .proto files during the build process.

  • Maven: The protobuf-maven-plugin is commonly used. You configure it in your pom.xml to specify the location of your .proto files and the desired output directory for the generated Java code.
  • Gradle: For Gradle, the com.google.protobuf plugin handles the compilation of the protocol buffers. You use this plugin in your build.gradle and configure it so that it refers to your .proto files.

Protobuf Compiler (protoc)

The Protocol Buffers compiler, protoc, is an important tool that translates your human-readable .proto files into source code in various programming languages, including Java. Although the build plugins (Maven/Gradle) often handle the execution of protoc in the background, it is good to understand its role.

How protoc works

When you build your project, the configured protobuf plugin calls protoc. It reads your .proto files, which define your service contracts and message structures, and then generates corresponding Java classes. These generated classes provide:

  • message classes: Java classes that represent your defined Protocol Buffer messages, with methods for setting and retrieving fields, serialization and deserialization.
  • Service interfaces/abstract classes: For your gRPC services, protoc generates interfaces (for client stubs) and abstract base classes (for server implementations) that define the methods declared in your .proto service.

Overview of the generated code

After successfully compiling your .proto files, you will find new Java source files in your chosen output directory (often target/generated-sources/protobuf/java for Maven or build/generated/source/proto/main/java for Gradle).

Key components of the generated code

The generated code typically contains

  • Message Classes: A Java class is generated for each message defined in your .proto file (e.g. Request, Response). These classes are immutable and contain a nested “builder” class for creating instances, which corresponds to the builder pattern for object construction. They also contain methods such as newBuilder(), getDefaultInstance(), parseFrom(), and toByteString().
  • Service stubs: For each defined gRPC service (e.g. MyService), protoc creates multiple classes:
  • Blocking Stub (MyServiceGrpc.MyServiceBlockingStub): This class provides synchronous (blocking) methods for gRPC calls. The client waits for the response from the server before continuing.
  • Asynchronous Stub (MyServiceGrpc.MyServiceStub): This provides asynchronous methods for gRPC calls. These methods typically return a StreamObserver to handle the asynchronous nature of streaming and non-blocking interactions. This is particularly important for all types of gRPC streaming.
  • Abstract Service Base Class (MyServiceGrpc.MyServiceImplBase): On the server side, you will extend this abstract class and implement the gRPC service methods defined in your .proto file. This is the place where your actual server-side logic is located.

Practical code examples (snippets for each type)

Server-side streaming example

.proto definition

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.example.grpc.streaming";
option java_outer_classname = "StockProto";

service StockService {
 rpc GetStockPrices(StockRequest) returns (stream StockPrice);
}

message StockRequest {
 string symbol = 1;
}

message StockPrice {
 string symbol = 1;
 double price = 2;
 int64 timestamp = 3;
}

Server implementation

package com.example.grpc.streaming;

import io.grpc.stub.StreamObserver;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class StockServiceServerImpl extends StockServiceGrpc.StockServiceImplBase {

 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

 @Override
 public void getStockPrices(StockRequest request, StreamObserver<StockPrice> responseObserver) {
 String symbol = request.getSymbol();
 System.out.println("Client requested stock prices for: " + symbol);

 // Simulate sending stock quotes every 2 seconds
 scheduler.scheduleAtFixedRate(() -> {
 if (!responseObserver.isCancelled()) {
 double price = 100.0 + (Math.random() * 10.0); // Simulate price fluctuations
 StockPrice stockPrice = StockPrice.newBuilder()
 .setSymbol(symbol)
 .setPrice(price)
 .setTimestamp(System.currentTimeMillis())
 .build();
 responseObserver.onNext(stockPrice);
 System.out.println("Sending the price for " + symbol + ": " + price);
 } else {
 System.out.println("Client has canceled stream for " + symbol);
 scheduler.shutdown(); // Stop sending when the client cancels
 }
 }, 0, 2, TimeUnit.SECONDS);

 // In a real application, you could have a condition that calls onCompleted()
 // For demonstration purposes, this stream runs indefinitely until the client cancels or the server stops.
 }
}

Client implementation

package com.example.grpc.streaming;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class StockServiceClient {

 public static void main(String[] args) throws InterruptedException {
 ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
 .usePlaintext()
 .build();

 StockServiceGrpc.StockServiceStub asyncStub = StockServiceGrpc.newStub(channel);

 CountDownLatch latch = new CountDownLatch(1);

 System.out.println("Requesting stock prices for GOOG");
 asyncStub.getStockPrices(StockRequest.newBuilder().setSymbol("GOOG").build(), new StreamObserver<StockPrice>() {
 @Override
 public void onNext(StockPrice stockPrice) {
 System.out.println("Received: " + stockPrice.getSymbol() + " - Price: " + stockPrice.getPrice() + " at " + stockPrice.getTimestamp());
 }

 @Override
 public void onError(Throwable t) {
 System.err.println("Error receiving stock prices: " + t.getMessage());
 latch.countDown();
 }

 @Override
 public void onCompleted() {
 System.out.println("Share price stream completed.");
 latch.countDown();
 }
 });

 // Run the client for a while to receive updates
 latch.await(10, TimeUnit.SECONDS);
 System.out.println("Client is shutting down.");
 channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
 }
}

Client-side streaming example

.proto Definition

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.example.grpc.streaming";
option java_outer_classname = "FileProto";

service FileUploadService {
 rpc UploadFile(stream Chunk) returns (UploadStatus);
}

message Chunk {
 bytes data = 1;
 int32 sequence = 2;
}

message UploadStatus {
 string message = 1;
 bool success = 2;
}

Server implementation

package com.example.grpc.streaming;

import io.grpc.stub.StreamObserver;

public class FileUploadServiceServerImpl extends FileUploadServiceGrpc.FileUploadServiceImplBase {

 @Override
 public StreamObserver uploadFile(StreamObserver<UploadStatus> responseObserver) {
 return new StreamObserver() {
 private int receivedChunks = 0;
 private StringBuilder fileContent = new StringBuilder(); // For demonstration purposes

@Override
 public void onNext(Chunk chunk) {
 receivedChunks++;
 fileContent.append(new String(chunk.getData())); // Simulate processing of the chunk data
 System.out.println("Chunk received " + chunk.getSequence() + ", total received: " + receivedChunks);
 }

 @Override
 public void onError(Throwable t) {
 System.err.println("Error uploading the file: " + t.getMessage());
 responseObserver.onNext(UploadStatus.newBuilder()
 .setMessage("Upload failed: " + t.getMessage())
 .setSuccess(false)
 .build());
 responseObserver.onCompleted();
 }

 @Override
 public void onCompleted() {
 System.out.println("The client has finished sending chunks. Total chunks received: " + receivedChunks);
 System.out.println("Simulated file content: " + fileContent.substring(0, Math.min(fileContent.length(), 50)) + "..."); // Show beginning
 responseObserver.onNext(UploadStatus.newBuilder()
 .setMessage("File upload completed successfully, number of chunks: " + receivedChunks)
 .setSuccess(true)
 .build());
 responseObserver.onCompleted();
 }
 };
 }
}

Client implementation

package com.example.grpc.streaming;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import com.google.protobuf.ByteString;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class FileUploadServiceClient {

 public static void main(String[] args) throws InterruptedException {
 ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
 .usePlaintext()
 .build();

 FileUploadServiceGrpc.FileUploadServiceStub asyncStub = FileUploadServiceGrpc.newStub(channel);

 CountDownLatch latch = new CountDownLatch(1);

 StreamObserver requestObserver = asyncStub.uploadFile(new StreamObserver<UploadStatus>() {
 @Override
 public void onNext(UploadStatus status) {
 System.out.println("Upload Status: " + status.getMessage() + " (Success: " + status.getSuccess() + ")");
 }

 @Override
 public void onError(Throwable t) {
 System.err.println("Error uploading: " + t.getMessage());
 latch.countDown();
 }

 @Override
 public void onCompleted() {
 System.out.println("The server has confirmed the completion of the file upload.");
 latch.countDown();
 }
 });

 try {
 // Simulate sending the file in blocks
 for (int i = 0; i < 5; i++) {
 String data = "This is chunk " + i + " of the file content.";
 Chunk chunk = Chunk.newBuilder()
 .setData(ByteString.copyFromUtf8(data))
 .setSequence(i)
 .build();
 requestObserver.onNext(chunk);
 System.out.println("Sent chunk: " + i);
 Thread.sleep(500); // Simulate network delay
 }
 } catch (RuntimeException | InterruptedException e) {
 requestObserver.onError(e);
 throw e;
 } finally {
 requestObserver.onCompleted(); // Signal completion to the server
 }

 latch.await(10, TimeUnit.SECONDS);
 System.out.println("Client is shutting down.");
 channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
 }
}

Bidirectional streaming example

.proto Definition

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.example.grpc.streaming";
option java_outer_classname = "ChatProto";

service ChatService {
 rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
 string sender = 1;
 string message = 2;
 int64 timestamp = 3;
}

Server implementation

package com.example.grpc.streaming;

import io.grpc.stub.StreamObserver;

public class ChatServiceServerImpl extends ChatServiceGrpc.ChatServiceImplBase {

@Override
 public StreamObserver<ChatMessage> chat(StreamObserver<ChatMessage> responseObserver) {
 return new StreamObserver<ChatMessage>() {
 @Override
 public void onNext(ChatMessage chatMessage) {
 System.out.println("Server received message from " + chatMessage.getSender() + ": " + chatMessage.getMessage());
 // Send message back to the client or send to other clients
 ChatMessage serverResponse = ChatMessage.newBuilder()
 .setSender("Server")
 .setMessage("Echo: " + chatMessage.getMessage())
 .setTimestamp(System.currentTimeMillis())
 .build();
 responseObserver.onNext(serverResponse);
 }

 @Override
 public void onError(Throwable t) {
 System.err.println("Error in chat stream: " + t.getMessage());
 }

 @Override
 public void onCompleted() {
 System.out.println("Client has ended chat stream.");
 responseObserver.onCompleted(); // Signaling the completion to the client
 }
 };
 }
}

Client implementation

package com.example.grpc.streaming;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class ChatServiceClient {

 public static void main(String[] args) throws Exception {
 ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
 .usePlaintext()
 .build();

 ChatServiceGrpc.ChatServiceStub asyncStub = ChatServiceGrpc.newStub(channel);

 CountDownLatch latch = new CountDownLatch(1);

 StreamObserver<ChatMessage> requestObserver = asyncStub.chat(new StreamObserver<ChatMessage>() {
 @Override
 public void onNext(ChatMessage chatMessage) {
 System.out.println("Received from " + chatMessage.getSender() + ": " + chatMessage.getMessage());
 }

 @Override
 public void onError(Throwable t) {
 System.err.println("Error in chat: " + t.getMessage());
 latch.countDown();
 }

 @Override
 public void onCompleted() {
 System.out.println("Chat session ended.");
 latch.countDown();
 }
 });

 try {
 BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
 String line;
 System.out.println("Enter messages (enter 'exit' to exit):");
 while ((line = reader.readLine()) != null && !line.equalsIgnoreCase("exit")) {
 ChatMessage message = ChatMessage.newBuilder()
 .setSender("ClientUser")
 .setMessage(line)
 .setTimestamp(System.currentTimeMillis())
 .build();
 requestObserver.onNext(message);
 }
 } catch (Exception e) {
 requestObserver.onError(e);
 } finally {
 requestObserver.onCompleted(); // signaling completion to the server
 }

 latch.await(5, TimeUnit.SECONDS); // Wait a short time for final responses
 System.out.println("Client is shutting down.");
 channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
 }
}

Error handling in gRPC streaming

Standard gRPC status codes

gRPC uses a comprehensive set of status codes to indicate the result of an RPC call, including streaming calls. These codes provide a standardized way for clients and servers to understand why a request failed or succeeded. For example, “UNAVAILABLE” can mean that the server is unreachable, “UNAUTHENTICATED” indicates missing or invalid credentials, and “INTERNAL” indicates a server-side error. If an error occurs in a stream, the Status object, which contains the code and an optional description, is crucial for diagnosis.

StreamObserver.onError()

The “StreamObserver” interface, which is fundamental to gRPC streaming in Java, provides an “onError(Throwable t)” method. This method is the primary mechanism for handling errors in both client-side and server-side streaming.

Client-side error handling

If an error occurs on the client side during a stream, the server normally sends an onError signal to the client’s StreamObserver. Your client-side code should implement this method to gracefully handle issues such as network interruptions, server-side exceptions or application-specific errors passed on by the server. This allows the client to respond appropriately, for example by logging the error, displaying a message to the user or retrying the operation.

Server-side error handling

Conversely, on the server side, if your application logic encounters an error while processing a client’s stream or preparing responses for a server-side stream, you should call onError() for the StreamObserver provided by gRPC. This signals to the client that the stream was aborted due to an error. It is important to provide a meaningful Status object with the onError call to give the client enough context to understand the problem.

Custom error details

While gRPC status codes provide a good initial understanding of an error, sometimes more specific details need to be conveyed at the application level. gRPC allows User defined error details to be included in the Status object. This is usually done by embedding a protobuf message in the 'Details field of the ‘Status’.

Using protobuf messages for details

By defining a special protobuf message for your error details, you can structure extensive, type-safe error information. For example, you can include fields for an application-specific error code, a detailed error message, or even a stack trace (although passing raw stack traces to clients should be done with caution, especially in production environments). This allows clients to analyze these custom details and provide more granular error handling or feedback to end users.

Performance considerations and best practices

Connection management

  • Channel lifecycle: Manage the lifecycle of gRPC Channels. A channel represents a long-lived connection to a gRPC server. Creating and tearing down channels for each RPC call is inefficient due to the overhead of establishing HTTP/2 connections. It is best to reuse channels across multiple RPC calls and possibly across different services on the same server.
  • Channel pooling: For applications that handle a high volume of requests or communicate with multiple gRPC servers, consider implementing a channel pool. This enables efficient reuse of channels and avoids exhaustion of resources.
  • Switch off: Closes channels when they are no longer needed to free up resources. The shutdown() method initiates a soft shutdown, while shutdownNow() attempts to terminate active RPCs and connections immediately.

Flow control

  • Automatic handling: gRPC, which is built on HTTP/2, inherently provides flow control mechanisms. This means that both the client and the server can regulate the speed at which data is sent to prevent overloading the receiver. For basic scenarios, you generally do not need to implement explicit flow control logic in your application code.
  • Backpressure: If a receiver cannot process data as fast as it is sent, gRPC’s flow control applies backpressure, slowing down the sender. This prevents errors caused by memory overruns and ensures reliable data transmission.

Resource management

  • Life cycle of the StreamObserver: For streaming RPCs, it must be ensured that the StreamObserver instances are properly managed. In particular, onCompleted() or onError() should always be called to signal the end of a stream and release the associated resources on both the client and server side. If this is not done, it can lead to resource leaks and hanging connections.
  • Thread Management: Pay attention to thread usage in your gRPC applications. Both client and server usually use thread pools for processing RPC calls. Configure these pools according to the expected load and available system resources. Avoid blocking operations on gRPC-managed threads, as this can lead to thread starvation.

When to use which streaming type?

  • Server-side streaming: Ideal for scenarios where a client requests data and the server needs to send a continuous stream of updates or a large data set. Examples include real-time data feeds (e.g. stock quotes, sensor data) or downloading large files in chunks.
  • Client-side streaming: Suitable when a client needs to send a large amount of data to the server in chunks or a series of independent messages that the server processes together to achieve a single result. Examples include uploading large log files, continuous voice input for transcription or sending sensor readings in batches.
  • Bidirectional streaming: Best suited for real-time interactive communication where both client and server can send and receive message streams independently. Common applications include chat applications, online gaming, live video conferencing or collaborative editing tools. This type offers the greatest flexibility for complex real-time interactions.

Further reading and next steps

Dive deeper into gRPC concepts

Advanced gRPC topics

  • Interceptors: Learn how to implement and use interceptors for cross-cutting concerns such as logging, authentication, and monitoring.
  • Metadata: Learn how to send and receive custom metadata with your gRPC calls for additional contextual information.
  • Load Balancing: Understand how to configure gRPC clients for effective load balancing across multiple service instances.
  • Error Handling Strategies: Explore more robust error handling patterns, including custom error details and retry mechanisms.

gRPC ecosystem

  • gRPC-Web: Discover how gRPC-Web enables browser-based clients to interact with gRPC services.
  • Proxies and Gateways: Learn about tools like Envoy or gRPC-Gateway that connect RESTful APIs to gRPC services.
  • Monitorability: Learn how to integrate gRPC services with tracing (e.g. OpenTracing, OpenTelemetry) and metrics (e.g. Prometheus) for better monitoring.

Official documentation and community resources

Important documentation

  • Official gRPC website: The main resource for everything related to gRPC, including detailed documentation, tutorials and examples for different languages.
  • Protocol Buffer Documentation: A comprehensive guide to defining and using Protocol Buffers, the interface definition language for gRPC.

Community and Open Source

  • gRPC GitHub Repository: Access source code, contribute to the project, and explore examples.
  • Community forums and mailing lists: Participate in the gRPC community to ask questions, share knowledge, and stay up to date on developments.