Building a real-time video AI service with Google Gemini
In a recent customer project with a Top 3 Global Service Provider, we built an agentic AI service to receive and process real-time video and feed the data stream into Google Gemini using the Gemini Multimodal Live API for streaming live input, combining audio, video, and text.
Approaching the challenge
We built, deployed, and scaled this service to 1000s of TPS—many orders of magnitude higher than the customer required. We developed the streaming video ingest, video augmentation, and conversational storage with Akka’s SDK and deployed it into a private Akka operating environment, provisioned within 2 hours in a Google VPC.
We made it into production in 2.5 days, though we were held back by the Google Gemini API client. The only available client provided by Google that supports this part of the Gemini APIs is Python-based, blocking, and synchronous (i.e., very inefficient!). So, after reverse-engineering the Python client’s behavior, we built our own reactive client in a day using Akka streams and remoting libraries.
Getting started
After some reading and digging, we found that the protocol uses JSON objects sent over a WebSocket. Following an initial message to set up the session, we must wait for an acknowledgement before streaming chunks of audio, video frames, and text to the API. In return, we receive a stream of either audio or text.
Streaming setup
The Akka HTTP library provides a streaming WebSocket API, making it easy to interact with the service from Java. From the Python client, we determined the exact WebSocket URL to use:
wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key=[GEMINI_API_KEY]
We then modeled the protocol using Java records. The protocol generally follows a tree structure of possible message types, where only one field can hold a value in a given instance:
record LiveClientMessage(
Optional<GenerateContentSetup> setup,
Optional<LiveClientContent> clientContent,
Optional<LiveClientRealtimeInput> realtimeInput,
Optional<LiveClientToolResponse> toolResponse
) {}
record LiveClientContent(List<Content> turns, boolean turnComplete) {}
record Content(List<Part> parts, Optional<String> role) {}
record Part(
Optional<Blob> inlineData,
Optional<String> text,
… long list of possible fields omitted …
) { }
… and so on for all protocol in and out message structures.
Serialization
We use Jackson to serialize and deserialize record instances to and from JSON. Some protocol-specific customization is needed:
- Byte array payloads for video and audio must be base64-encoded strings
- Empty fields should be omitted
- Field names should follow camel case
Jackson allows easy customization through custom StdSerializer
and StdDeserializer
implementations, as well as configuration of the ObjectMapper
.
The HTTP WebSocket API represents the bidirectional stream of WebSocket messages as a Flow<Message, Message, CompletionStage<WebsocketUpgradeResponse>>
where we can send Message
instances into it by attaching a Source<Message, ?>
in front and handle incoming messages with a Sink<Message, ?>
.
The materialized value, CompletionStage<WebsocketUpgradeResponse>
, lets us inspect the connection result, such as checking the HTTP response code:
var wsFlow = http.webSocketClientFlow(WebSocketRequest.create(url))
Transform
On the outgoing side, transforming objects into WebSocket messages is straightforward. We use a stream map with the Jackson object mapper to convert each object into a JSON string, then construct a WebSocket text message from it:
var wsInput = fullInput.map(liveClientMessage ->
TextMessage.create(objectMapper.writeValueAsString(liveClientMessage))
);
The incoming direction is a bit trickier. WebSocket messages can be either string or binary, and each type may arrive as a single “strict” message or as a stream of parts. We must collect these parts into memory as they arrive until the full message is received.
First, we aggregate both bytes and string messages into full string chunks as one mapAsync
stream step. Once assembled, we pass the bytes or string to Jackson for deserialization in a regular map
:
var TIMEOUT_MS = 3000;
var webSocketToOutput = Flow.<Message>create()
.mapAsync(1, wsMessage -> {
if (wsMessage instanceof TextMessage textMessage) {
if (textMessage.isStrict())
return CompletableFuture.completedFuture(textMessage.getStrictText());
else
return textMessage.toStrict(TIMEOUT_MS, materializer).thenApply(strict -> strict.getStrictText());
} else if (wsMessage instanceof BinaryMessage binaryMessage) {
if (binaryMessage.isStrict())
return CompletableFuture.completedFuture(binaryMessage.getStrictData().utf8String());
else
return binaryMessage.toStrict(TIMEOUT_MS, materializer).thenApply(strict -> strict.getStrictData().utf8String());
} else throw new IllegalArgumentException();
})
.map(jsonString -> objectMapper.readValue(jsonString, LiveServerMessage.class))
Initial message
One final challenge remains: sending the initial setup message and waiting for a response before passing any other streamed input over the Web Socket. This can be done by defining an incomplete CompletableFuture<Done>
and completing it once the setup acknowledgement is received:
var setupResponseSeen = new CompletableFuture<Done>();
…
var outputCheckWhenSetup = webSocketToOutput.map((message) -> {
if (message.setupComplete().isPresent()) {
setupResponseSeen.complete(done());
}
return message;
});
We can then delay the incoming stream of commands using Source
.
var delayedInput = Source.fromSourceCompletionStage(setupResponseSeen.thenApply(ignored -> input));
Since a stream can be materialized multiple times, it’s important to ensure that a shared CompletableFuture
is used only for a single materialization. This is assured by wrapping the entire client call with Source.fromMaterializer
:
Source<LiveServerMessage, NotUsed> connect(GenerateContentSetup setup, Source<LiveClientMessage, ?> input) {
return Source.fromMaterializer((materializer, attributes) -> {
var setupResponseSeen = new CompletableFuture<Done>();
… set up websocket flow, and in and out streams …
var fullInput = Source.single(setup).concat(delayedInput)
return wsInput.via(wsFlow).via(outputCheckWhenSetup)
};
}
To sum things up, interacting with a third party WebSocket API without native JVM client support is entirely feasible using the HTTP and Streams APIs—with just a day or two of work.
This specific example enables any service built with the Akka SDK or libraries to perform live video, audio, and text interactions with the Google Gemini LLM.
Posts by this author