
Java gRPC Streaming: A Deep Dive
Introduction to gRPC Streaming in Java
What is gRPC?
Java gRPC Streaming, short for Java Google Remote Procedure Call, is a modern open source framework for RPC (Remote Procedure Call) developed by Google. It enables applications to communicate with each other as easily as if they were making local calls, but across different computers and environments. At its core, gRPC uses HTTP/2 for transport, Protocol Buffers as Interface Definition Language (IDL) and provides features such as authentication, load balancing and tracing. This combination results in a highly efficient and powerful communication mechanism that is particularly suitable for microservices architectures.
Why streaming?
While traditional unary RPC (where a single request results in a single response) is perfectly adequate for many scenarios, it is insufficient for certain types of interactions that require a continuous flow of data or real-time updates. In this case, streaming is invaluable. Think of situations where:
Real-time data feeds
Applications need to receive a continuous stream of updates, such as live stock quotes, sensor readings or news, without constantly polling the server.
Large file transfers
Sending or receiving very large files can be inefficient with non-ary calls, as the entire file must be cached before transmission or processing. With streaming, data can be sent and processed in chunks.
Interactive applications
For conversational interfaces, collaboration tools or online games, continuous bi-directional communication is essential for a smooth user experience.
Advantages of gRPC streaming
The integrated gRPC support for streaming, which is supported by HTTP/2, offers several significant advantages over traditional RPC or other communication paradigms:
Persistent connections (HTTP/2)
Unlike HTTP/1.1, which typically closes connections after each request-response cycle, HTTP/2 allows long-lived, persistent connections. This reduces the effort required to establish new connections for each interaction, resulting in lower latency and better resource utilization.
Multiplexing
The multiplexing capability of HTTP/2 means that multiple requests and responses can be sent simultaneously over a single TCP connection. This avoids blocking at the beginning ofthe line and improves the overall throughput, especially in scenarios with high parallelism.
Lower latency
By enabling a continuous flow of data and reducing connection overhead, gRPC streaming significantly reduces latency** for real-time applications. Data can be sent and received as soon as it is available instead of waiting for an entire message to be formed or a new connection to be established.
Understanding gRPC streaming concepts
The power of protocol buffers (.proto)
At the heart of gRPC, and consequently gRPC streaming, are protocol buffers (protobuf). This language- and platform-neutral, extensible mechanism for serializing structured data is used to define the service contract between the client and the server. Think of a “proto” file as a blueprint that precisely outlines the data structures (messages) to be exchanged and the methods (services) that can be called. This strict contract ensures that both the client and the server understand the format of the data, regardless of the programming language in which it is written. For streaming, Protobuf defines the message types that flow within a stream, ensuring data consistency and interoperability.
stream
keyword in .proto
The magic behind gRPC streaming in the .proto
file is the stream
keyword. This keyword explicitly tells the protobuf compiler that a particular method involves a continuous flow of messages and not just a single request-response pair.
How stream
is used:
- Server-Side Streaming: When the server sends a stream of messages back to the client, the
stream
keyword is applied to the response type of the RPC method. For example:
service MyService {
rpc GetUpdates(Request) returns (stream Response);
}
Here the client sends a “request” and the server can return several “response” messages.
- Client-Side Streaming: When the client sends a stream of messages to the server, the keyword “stream” is applied to the request type of the RPC method. For example:
service MyService {
rpc UploadData(stream DataChunk) returns (UploadSummary);
}
In this case, the client sends multiple DataChunk
messages and the server returns a single UploadSummary
after processing all chunks.
- Bidirectional streaming: If both the client and the server exchange message streams simultaneously and independently, the keyword “stream” is applied to both request and response types. For example:
service MyService {
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
This structure enables a continuous, two-way conversation in which both sides can send messages at any time.
StreamObserver
In Java, the StreamObserver
interface is the cornerstone for handling gRPC streaming operations. It acts as an event-driven callback mechanism that allows your application to respond to messages as they arrive in a stream. Both the client and server implementations of gRPC streaming methods rely heavily on StreamObserver
to manage the data flow.
Important methods of StreamObserver
:
void onNext(T value)
: This method is called each time a new message (value
of typeT
) is received from the stream. For a server, this method is called when the client sends a message. For a client, this method is called when the server sends a message.void onError(Throwable t)
: This method is called when an error occurs during stream processing. Thethrowable t
provides details about the error. It signals that the stream was aborted due to an error.void onCompleted()
: This method is called when the stream has been successfully completed and no further messages are expected. On a server, this method is called when the client signals that it has finished sending messages. For a client, this method is called when the server signals that it has finished sending messages.
Understanding StreamObserver
is critical as it provides the asynchronous, non-blocking paradigm for handling the continuous flow of data in gRPC streaming, enabling efficient and responsive applications.
Server-side streaming (server to client)
Concept and use cases
In server-side streaming, the client sends a single request message to the server, and the server responds by sending back a stream of multiple messages. Think of it as if you were subscribing to a live news feed. You send a request to subscribe, and the news server continuously sends you new articles as they become available.
This pattern is ideal for scenarios where the server needs to send a continuous or potentially large amount of data to the client without the need for multiple individual requests. Common use cases are:
- Real-time stock updates: A client requests data for a specific stock and the server continuously transmits price changes.
- News feeds: A client subscribes to a news category and the server transmits new articles as they are published.
- Large data reports: Instead of sending a large single response that may take too long or consume too much disk space, the server can transmit the report in smaller, more manageable packets.
- Live sensor readings: A client requests data from a sensor and the server streams a continuous stream of readings.
Java implementation
Defining the RPC in .proto
To define a server-side streaming method in your Protocol Buffers .proto
file, use the stream
keyword for the response type. For example:
service StockService {
rpc GetStockUpdates(StockRequest) returns (stream StockUpdate);
}
message StockRequest {
string symbol = 1;
}
message StockUpdate {
string symbol = 1;
double price = 2;
int64 timestamp = 3;
}
Here GetStockUpdates
takes a single StockRequest
, but returns a stream StockUpdate
, indicating that the server will send multiple StockUpdate
messages.
Server-side logic
On the server, your service implementation receives the single client request and then uses the provided StreamObserver
to send multiple responses.
public class StockServiceImpl extends StockServiceGrpc.StockServiceImplBase {
@Override
public void getStockUpdates(StockRequest request, StreamObserver<StockUpdate> responseObserver) {
String symbol = request.getSymbol();
System.out.println("Client requested updates for: " + symbol);
// Simulate the sending of continuous updates
for (int i = 0; i < 5; i++) {
StockUpdate update = StockUpdate.newBuilder()
.setSymbol(Symbol)
.setPrice(100.0 + i * 0.5) // Example price
.setTimestamp(System.currentTimeMillis())
.build();
responseObserver.onNext(update); // Send a single update
try {
Thread.sleep(1000); // Wait for one second
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
responseObserver.onError(e);
back;
}
}
responseObserver.onCompleted(); // Signal that the stream has ended
System.out.println("Finished sending updates for: " + symbol);
}
}
The most important methods here are:
responseObserver.onNext(message)
: This method is called repeatedly to send individual messages to the client.responseObserver.onCompleted()
: This method must be called to signal to the client that the server has finished sending all messages in the stream. If this method is not called, the client waits indefinitely.responseObserver.onError(throwable)
: If an error occurs during the stream, this method is used to inform the client of the error.
Client-side logic
On the client, you will normally use an asynchronous stub to initiate the call. You will also need to implement a “StreamObserver” to process the incoming stream of responses from the server.
public class StockClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
StockServiceGrpc.StockServiceStub asyncStub = StockServiceGrpc.newStub(channel);
StockRequest request = StockRequest.newBuilder().setSymbol("GOOG").build();
asyncStub.getStockUpdates(request, new StreamObserver<StockUpdate>() {
@Override
public void onNext(StockUpdate update) {
System.out.println("Received stock update: " + update.getSymbol() + " - " + update.getPrice());
}
@Override
public void onError(Throwable t) {
System.err.println("Error receiving stock update: " + t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Stock update stream completed.");
}
});
// Keep the client alive to receive updates. In a real application, you may have
// a more sophisticated way to manage the channel and client lifecycle.
Thread.sleep(10000); // wait for 10 seconds to receive updates
channel.shutdown();
}
}
On the client side, your StreamObserver
implementation:
onNext(message)
: This method is called by the gRPC framework every time a new message is received from the server.onError(throwable)
: This method is called when an error occurs on the server or during network transmission.onCompleted()
: This method is called when the server has successfully completed sending all messages in the stream and has calledonCompleted()
on itsresponseObserver
.
Client-side streaming (client to server)
Concept and use cases
In client-side streaming, the client sends a stream of messages to the server, and the server responds with a single, final message once it has processed all incoming data. This is particularly useful when the client needs to transfer a large amount of data incrementally or when the data is generated over time.
Examples of the use of client-side streaming include:
- Uploading large files in chunks: Instead of sending an entire file at once, the client can stream smaller chunks, allowing for better troubleshooting and progress tracking.
- Sending a series of sensor readings: An IoT device could continuously send temperature or pressure readings to a central server for analysis.
- Live transcription: A client could stream audio segments to a server for real-time speech-to-text conversion.
- Batch processing: The client sends multiple records or transactions that are processed by the server in a single operation.
Java implementation
Server side
On the server, define the RPC method in your .proto
file with the keyword stream
for the request and a regular (unary) response:
service MyService {
rpc UploadData(stream DataChunk) returns (UploadSummary);
}
The server-side implementation will receive a StreamObserver
for the incoming DataChunk
messages. The server must implement the methods onNext()
, onError()
, and onCompleted()
.
onNext(DataChunk chunk)
: This method is called repeatedly as the client sends each “DataChunk”. The server processes each chunk as soon as it arrives.onError(Throwable t)
: This method is called when an error occurs while the client is streaming.onCompleted()
: This method is called when the client has finished sending all its messages. Typically, the server performs final processing within this method and then sends its singleUploadSummary
response using theStreamObserver
response.
Client side
On the client, use an asynchronous stub to initiate the streaming call. You will get back a StreamObserver
which you can use to send messages to the server.
The .proto
definition in this scenario would look like this:
service MyService {
rpc UploadData(stream DataChunk) returns (UploadSummary);
}
The client-side code becomes:
- Retrieve an asynchronous stub for the service.
- Call the streaming method on the stub, providing a
StreamObserver
to handle the single response from the server. - Use the
StreamObserver
returned by the stub call to send multipleDataChunk
messages using itsonNext()
method. - After all chunks have been sent, call
onCompleted()
on theStreamObserver
to signal the server that the stream is finished. - Implement the
onNext()
,onError()
andonCompleted()
methods for theStreamObserver
passed to the stub, which will process the singleUploadSummary
response from the server.
Bidirectional streaming (client-server and server-client)
Concept
With bidirectional streaming, both the client and the server send and receive message streams independently of each other simultaneously via a single, permanent HTTP/2 connection. This enables a continuous exchange of data in real time, where both sides can send a message at any time without waiting for a response from the other side. It’s like a two-way conversation where both parties can speak and listen at the same time.
Use Cases
Bidirectional streaming is ideal for scenarios that require interactive communication in real time.
Chat applications
Imagine a chat application where multiple users exchange messages. Each user’s client can stream their outgoing messages to the server, and the server can stream incoming messages from other users back to each client. This creates a fluid conversation in real time.
Games in real time
In online multiplayer games, client-side actions (such as player movements or spells) can be streamed to the server, and server-side updates (such as opponent positions or game state changes) can be streamed back to the clients. This ensures low-latency synchronization and a smooth gaming experience.
Live navigation updates
A navigation application could transmit a user’s current location to a server, and the server could continuously transmit back updated traffic information, alternate routes or points of interest based on the user’s progress.
Java implementation
Definition of “rpc” in “proto`
To define a bidirectional streaming method in your .proto
file, use the keyword stream
for both the request and response type:
service MyService {
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
Here “ChatMessage” is a protocol buffer message that contains the actual chat content.
Implementation of the server-side logic
On the server, your service implementation receives a StreamObserver
for the incoming client messages and provides a StreamObserver
to send messages back to the client.
public class MyServiceImpl extends MyServiceGrpc.MyServiceImplBase {
@Override
public StreamObserver<ChatMessage> chat(StreamObserver<ChatMessage> responseObserver) {
return new StreamObserver<ChatMessage>() {
@Override
public void onNext(ChatMessage chatMessage) {
// Process incoming client message
System.out.println("Server received: " + chatMessage.getMessage());
// Send a response back to the client
responseObserver.onNext(ChatMessage.newBuilder().setMessage("Server echo: " + chatMessage.getMessage()).build());
}
@Override
public void onError(Throwable t) {
System.err.println("Server error: " + t.getMessage());
}
@Override
public void onCompleted() {
// The client has finished sending messages
System.out.println("The client has finished chatting.");
responseObserver.onCompleted(); // Signaling the completion to the client
}
};
}
}
Implementation of the client-side logic
On the client, you get a StreamObserver
to send your outgoing messages and you provide a StreamObserver
to process the incoming messages from the server.
public class MyClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
MyServiceGrpc.MyServiceStub asyncStub = MyServiceGrpc.newStub(channel);
StreamObserver<ChatMessage> requestObserver = asyncStub.chat(new StreamObserver<ChatMessage>() {
@Override
public void onNext(ChatMessage chatMessage) {
// Process incoming server message
System.out.println("Client received: " + chatMessage.getMessage());
}
@Override
public void onError(Throwable t) {
System.err.println("Client error: " + t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("The server has ended the chat.");
}
});
// Sending messages from the client
requestObserver.onNext(ChatMessage.newBuilder().setMessage("Hello from client 1").build());
Thread.sleep(1000); // Simulate a delay
requestObserver.onNext(ChatMessage.newBuilder().setMessage("Hello from client 2").build());
Thread.sleep(1000);
requestObserver.onCompleted(); // signaling completion to the server
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
}
Project setup: Dependencies and proto compilation
Maven/Gradle dependencies
To get started with gRPC in your Java project, you need to include the necessary dependencies in your build file. For Maven, these are typically included in pom.xml
and for Gradle in build.gradle
.
Core gRPC and protocol buffer libraries
These are the basic libraries required for any gRPC application:
grpc-netty-shaded
: This dependency provides the Netty transport for gRPC, which handles the underlying network communication. It is often “shaded” to avoid dependency conflicts with other libraries that may use different versions of Netty.grpc-protobuf
: This library contains the protocol buffer runtime for gRPC, which enables serialization and deserialization of the messages you define.grpc-stub
: It provides the gRPC stub generation and utilities that allow you to create client stubs and implement server services based on your.proto
definitions.
Protobuf plugin for code generation
You also need a plugin to automatically generate Java source code from your .proto
files during the build process.
- Maven: The
protobuf-maven-plugin
is commonly used. You configure it in yourpom.xml
to specify the location of your.proto
files and the desired output directory for the generated Java code. - Gradle: For Gradle, the
com.google.protobuf
plugin handles the compilation of the protocol buffers. You use this plugin in yourbuild.gradle
and configure it so that it refers to your.proto
files.
Protobuf Compiler (protoc
)
The Protocol Buffers compiler, protoc
, is an important tool that translates your human-readable .proto
files into source code in various programming languages, including Java. Although the build plugins (Maven/Gradle) often handle the execution of protoc
in the background, it is good to understand its role.
How protoc
works
When you build your project, the configured protobuf plugin calls protoc
. It reads your .proto
files, which define your service contracts and message structures, and then generates corresponding Java classes. These generated classes provide:
- message classes: Java classes that represent your defined Protocol Buffer messages, with methods for setting and retrieving fields, serialization and deserialization.
- Service interfaces/abstract classes: For your gRPC services,
protoc
generates interfaces (for client stubs) and abstract base classes (for server implementations) that define the methods declared in your.proto
service.
Overview of the generated code
After successfully compiling your .proto
files, you will find new Java source files in your chosen output directory (often target/generated-sources/protobuf/java
for Maven or build/generated/source/proto/main/java
for Gradle).
Key components of the generated code
The generated code typically contains
- Message Classes: A Java class is generated for each message defined in your
.proto
file (e.g.Request
,Response
). These classes are immutable and contain a nested “builder” class for creating instances, which corresponds to the builder pattern for object construction. They also contain methods such asnewBuilder()
,getDefaultInstance()
,parseFrom()
, andtoByteString()
. - Service stubs: For each defined gRPC service (e.g.
MyService
),protoc
creates multiple classes: - Blocking Stub (
MyServiceGrpc.MyServiceBlockingStub
): This class provides synchronous (blocking) methods for gRPC calls. The client waits for the response from the server before continuing. - Asynchronous Stub (
MyServiceGrpc.MyServiceStub
): This provides asynchronous methods for gRPC calls. These methods typically return aStreamObserver
to handle the asynchronous nature of streaming and non-blocking interactions. This is particularly important for all types of gRPC streaming. - Abstract Service Base Class (
MyServiceGrpc.MyServiceImplBase
): On the server side, you will extend this abstract class and implement the gRPC service methods defined in your.proto
file. This is the place where your actual server-side logic is located.
Practical code examples (snippets for each type)
Server-side streaming example
.proto definition
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.example.grpc.streaming";
option java_outer_classname = "StockProto";
service StockService {
rpc GetStockPrices(StockRequest) returns (stream StockPrice);
}
message StockRequest {
string symbol = 1;
}
message StockPrice {
string symbol = 1;
double price = 2;
int64 timestamp = 3;
}
Server implementation
package com.example.grpc.streaming;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class StockServiceServerImpl extends StockServiceGrpc.StockServiceImplBase {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@Override
public void getStockPrices(StockRequest request, StreamObserver<StockPrice> responseObserver) {
String symbol = request.getSymbol();
System.out.println("Client requested stock prices for: " + symbol);
// Simulate sending stock quotes every 2 seconds
scheduler.scheduleAtFixedRate(() -> {
if (!responseObserver.isCancelled()) {
double price = 100.0 + (Math.random() * 10.0); // Simulate price fluctuations
StockPrice stockPrice = StockPrice.newBuilder()
.setSymbol(symbol)
.setPrice(price)
.setTimestamp(System.currentTimeMillis())
.build();
responseObserver.onNext(stockPrice);
System.out.println("Sending the price for " + symbol + ": " + price);
} else {
System.out.println("Client has canceled stream for " + symbol);
scheduler.shutdown(); // Stop sending when the client cancels
}
}, 0, 2, TimeUnit.SECONDS);
// In a real application, you could have a condition that calls onCompleted()
// For demonstration purposes, this stream runs indefinitely until the client cancels or the server stops.
}
}
Client implementation
package com.example.grpc.streaming;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class StockServiceClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.build();
StockServiceGrpc.StockServiceStub asyncStub = StockServiceGrpc.newStub(channel);
CountDownLatch latch = new CountDownLatch(1);
System.out.println("Requesting stock prices for GOOG");
asyncStub.getStockPrices(StockRequest.newBuilder().setSymbol("GOOG").build(), new StreamObserver<StockPrice>() {
@Override
public void onNext(StockPrice stockPrice) {
System.out.println("Received: " + stockPrice.getSymbol() + " - Price: " + stockPrice.getPrice() + " at " + stockPrice.getTimestamp());
}
@Override
public void onError(Throwable t) {
System.err.println("Error receiving stock prices: " + t.getMessage());
latch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Share price stream completed.");
latch.countDown();
}
});
// Run the client for a while to receive updates
latch.await(10, TimeUnit.SECONDS);
System.out.println("Client is shutting down.");
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
Client-side streaming example
.proto Definition
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.example.grpc.streaming";
option java_outer_classname = "FileProto";
service FileUploadService {
rpc UploadFile(stream Chunk) returns (UploadStatus);
}
message Chunk {
bytes data = 1;
int32 sequence = 2;
}
message UploadStatus {
string message = 1;
bool success = 2;
}
Server implementation
package com.example.grpc.streaming;
import io.grpc.stub.StreamObserver;
public class FileUploadServiceServerImpl extends FileUploadServiceGrpc.FileUploadServiceImplBase {
@Override
public StreamObserver uploadFile(StreamObserver<UploadStatus> responseObserver) {
return new StreamObserver() {
private int receivedChunks = 0;
private StringBuilder fileContent = new StringBuilder(); // For demonstration purposes
@Override
public void onNext(Chunk chunk) {
receivedChunks++;
fileContent.append(new String(chunk.getData())); // Simulate processing of the chunk data
System.out.println("Chunk received " + chunk.getSequence() + ", total received: " + receivedChunks);
}
@Override
public void onError(Throwable t) {
System.err.println("Error uploading the file: " + t.getMessage());
responseObserver.onNext(UploadStatus.newBuilder()
.setMessage("Upload failed: " + t.getMessage())
.setSuccess(false)
.build());
responseObserver.onCompleted();
}
@Override
public void onCompleted() {
System.out.println("The client has finished sending chunks. Total chunks received: " + receivedChunks);
System.out.println("Simulated file content: " + fileContent.substring(0, Math.min(fileContent.length(), 50)) + "..."); // Show beginning
responseObserver.onNext(UploadStatus.newBuilder()
.setMessage("File upload completed successfully, number of chunks: " + receivedChunks)
.setSuccess(true)
.build());
responseObserver.onCompleted();
}
};
}
}
Client implementation
package com.example.grpc.streaming;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import com.google.protobuf.ByteString;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class FileUploadServiceClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.build();
FileUploadServiceGrpc.FileUploadServiceStub asyncStub = FileUploadServiceGrpc.newStub(channel);
CountDownLatch latch = new CountDownLatch(1);
StreamObserver requestObserver = asyncStub.uploadFile(new StreamObserver<UploadStatus>() {
@Override
public void onNext(UploadStatus status) {
System.out.println("Upload Status: " + status.getMessage() + " (Success: " + status.getSuccess() + ")");
}
@Override
public void onError(Throwable t) {
System.err.println("Error uploading: " + t.getMessage());
latch.countDown();
}
@Override
public void onCompleted() {
System.out.println("The server has confirmed the completion of the file upload.");
latch.countDown();
}
});
try {
// Simulate sending the file in blocks
for (int i = 0; i < 5; i++) {
String data = "This is chunk " + i + " of the file content.";
Chunk chunk = Chunk.newBuilder()
.setData(ByteString.copyFromUtf8(data))
.setSequence(i)
.build();
requestObserver.onNext(chunk);
System.out.println("Sent chunk: " + i);
Thread.sleep(500); // Simulate network delay
}
} catch (RuntimeException | InterruptedException e) {
requestObserver.onError(e);
throw e;
} finally {
requestObserver.onCompleted(); // Signal completion to the server
}
latch.await(10, TimeUnit.SECONDS);
System.out.println("Client is shutting down.");
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
Bidirectional streaming example
.proto Definition
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.example.grpc.streaming";
option java_outer_classname = "ChatProto";
service ChatService {
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string sender = 1;
string message = 2;
int64 timestamp = 3;
}
Server implementation
package com.example.grpc.streaming;
import io.grpc.stub.StreamObserver;
public class ChatServiceServerImpl extends ChatServiceGrpc.ChatServiceImplBase {
@Override
public StreamObserver<ChatMessage> chat(StreamObserver<ChatMessage> responseObserver) {
return new StreamObserver<ChatMessage>() {
@Override
public void onNext(ChatMessage chatMessage) {
System.out.println("Server received message from " + chatMessage.getSender() + ": " + chatMessage.getMessage());
// Send message back to the client or send to other clients
ChatMessage serverResponse = ChatMessage.newBuilder()
.setSender("Server")
.setMessage("Echo: " + chatMessage.getMessage())
.setTimestamp(System.currentTimeMillis())
.build();
responseObserver.onNext(serverResponse);
}
@Override
public void onError(Throwable t) {
System.err.println("Error in chat stream: " + t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Client has ended chat stream.");
responseObserver.onCompleted(); // Signaling the completion to the client
}
};
}
}
Client implementation
package com.example.grpc.streaming;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class ChatServiceClient {
public static void main(String[] args) throws Exception {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.build();
ChatServiceGrpc.ChatServiceStub asyncStub = ChatServiceGrpc.newStub(channel);
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<ChatMessage> requestObserver = asyncStub.chat(new StreamObserver<ChatMessage>() {
@Override
public void onNext(ChatMessage chatMessage) {
System.out.println("Received from " + chatMessage.getSender() + ": " + chatMessage.getMessage());
}
@Override
public void onError(Throwable t) {
System.err.println("Error in chat: " + t.getMessage());
latch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Chat session ended.");
latch.countDown();
}
});
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line;
System.out.println("Enter messages (enter 'exit' to exit):");
while ((line = reader.readLine()) != null && !line.equalsIgnoreCase("exit")) {
ChatMessage message = ChatMessage.newBuilder()
.setSender("ClientUser")
.setMessage(line)
.setTimestamp(System.currentTimeMillis())
.build();
requestObserver.onNext(message);
}
} catch (Exception e) {
requestObserver.onError(e);
} finally {
requestObserver.onCompleted(); // signaling completion to the server
}
latch.await(5, TimeUnit.SECONDS); // Wait a short time for final responses
System.out.println("Client is shutting down.");
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
Error handling in gRPC streaming
Standard gRPC status codes
gRPC uses a comprehensive set of status codes to indicate the result of an RPC call, including streaming calls. These codes provide a standardized way for clients and servers to understand why a request failed or succeeded. For example, “UNAVAILABLE” can mean that the server is unreachable, “UNAUTHENTICATED” indicates missing or invalid credentials, and “INTERNAL” indicates a server-side error. If an error occurs in a stream, the Status
object, which contains the code and an optional description, is crucial for diagnosis.
StreamObserver.onError()
The “StreamObserver” interface, which is fundamental to gRPC streaming in Java, provides an “onError(Throwable t)” method. This method is the primary mechanism for handling errors in both client-side and server-side streaming.
Client-side error handling
If an error occurs on the client side during a stream, the server normally sends an onError
signal to the client’s StreamObserver
. Your client-side code should implement this method to gracefully handle issues such as network interruptions, server-side exceptions or application-specific errors passed on by the server. This allows the client to respond appropriately, for example by logging the error, displaying a message to the user or retrying the operation.
Server-side error handling
Conversely, on the server side, if your application logic encounters an error while processing a client’s stream or preparing responses for a server-side stream, you should call onError()
for the StreamObserver
provided by gRPC. This signals to the client that the stream was aborted due to an error. It is important to provide a meaningful Status
object with the onError
call to give the client enough context to understand the problem.
Custom error details
While gRPC status codes provide a good initial understanding of an error, sometimes more specific details need to be conveyed at the application level. gRPC allows User defined error details to be included in the Status
object. This is usually done by embedding a protobuf message in the 'Details
field of the ‘Status’.
Using protobuf messages for details
By defining a special protobuf message for your error details, you can structure extensive, type-safe error information. For example, you can include fields for an application-specific error code, a detailed error message, or even a stack trace (although passing raw stack traces to clients should be done with caution, especially in production environments). This allows clients to analyze these custom details and provide more granular error handling or feedback to end users.
Performance considerations and best practices
Connection management
- Channel lifecycle: Manage the lifecycle of gRPC Channels. A channel represents a long-lived connection to a gRPC server. Creating and tearing down channels for each RPC call is inefficient due to the overhead of establishing HTTP/2 connections. It is best to reuse channels across multiple RPC calls and possibly across different services on the same server.
- Channel pooling: For applications that handle a high volume of requests or communicate with multiple gRPC servers, consider implementing a channel pool. This enables efficient reuse of channels and avoids exhaustion of resources.
- Switch off: Closes channels when they are no longer needed to free up resources. The
shutdown()
method initiates a soft shutdown, whileshutdownNow()
attempts to terminate active RPCs and connections immediately.
Flow control
- Automatic handling: gRPC, which is built on HTTP/2, inherently provides flow control mechanisms. This means that both the client and the server can regulate the speed at which data is sent to prevent overloading the receiver. For basic scenarios, you generally do not need to implement explicit flow control logic in your application code.
- Backpressure: If a receiver cannot process data as fast as it is sent, gRPC’s flow control applies backpressure, slowing down the sender. This prevents errors caused by memory overruns and ensures reliable data transmission.
Resource management
- Life cycle of the StreamObserver: For streaming RPCs, it must be ensured that the
StreamObserver
instances are properly managed. In particular,onCompleted()
oronError()
should always be called to signal the end of a stream and release the associated resources on both the client and server side. If this is not done, it can lead to resource leaks and hanging connections. - Thread Management: Pay attention to thread usage in your gRPC applications. Both client and server usually use thread pools for processing RPC calls. Configure these pools according to the expected load and available system resources. Avoid blocking operations on gRPC-managed threads, as this can lead to thread starvation.
When to use which streaming type?
- Server-side streaming: Ideal for scenarios where a client requests data and the server needs to send a continuous stream of updates or a large data set. Examples include real-time data feeds (e.g. stock quotes, sensor data) or downloading large files in chunks.
- Client-side streaming: Suitable when a client needs to send a large amount of data to the server in chunks or a series of independent messages that the server processes together to achieve a single result. Examples include uploading large log files, continuous voice input for transcription or sending sensor readings in batches.
- Bidirectional streaming: Best suited for real-time interactive communication where both client and server can send and receive message streams independently. Common applications include chat applications, online gaming, live video conferencing or collaborative editing tools. This type offers the greatest flexibility for complex real-time interactions.
Further reading and next steps
Dive deeper into gRPC concepts
Advanced gRPC topics
- Interceptors: Learn how to implement and use interceptors for cross-cutting concerns such as logging, authentication, and monitoring.
- Metadata: Learn how to send and receive custom metadata with your gRPC calls for additional contextual information.
- Load Balancing: Understand how to configure gRPC clients for effective load balancing across multiple service instances.
- Error Handling Strategies: Explore more robust error handling patterns, including custom error details and retry mechanisms.
gRPC ecosystem
- gRPC-Web: Discover how gRPC-Web enables browser-based clients to interact with gRPC services.
- Proxies and Gateways: Learn about tools like Envoy or gRPC-Gateway that connect RESTful APIs to gRPC services.
- Monitorability: Learn how to integrate gRPC services with tracing (e.g. OpenTracing, OpenTelemetry) and metrics (e.g. Prometheus) for better monitoring.
Official documentation and community resources
Important documentation
- Official gRPC website: The main resource for everything related to gRPC, including detailed documentation, tutorials and examples for different languages.
- Protocol Buffer Documentation: A comprehensive guide to defining and using Protocol Buffers, the interface definition language for gRPC.
Community and Open Source
- gRPC GitHub Repository: Access source code, contribute to the project, and explore examples.
- Community forums and mailing lists: Participate in the gRPC community to ask questions, share knowledge, and stay up to date on developments.
