7.1.4 Streaming JSON over HTTP
Micronaut’s HTTP client includes support for streaming data over HTTP via the ReactorStreamingHttpClient interface which includes methods specific to streaming including:
Method | Description |
---|---|
| Returns a stream of data as a reactor:Flux[] of ByteBuffer |
| Returns the HttpResponse wrapping a reactor:Flux[] of ByteBuffer |
| Returns a non-blocking stream of JSON objects |
To use JSON streaming, declare a controller method on the server that returns a application/x-json-stream
of JSON objects. For example:
Streaming JSON on the Server
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
Publisher<Headline> streamHeadlines() {
return Mono.fromCallable(() -> { (2)
Headline headline = new Headline();
headline.setText("Latest Headline at " + ZonedDateTime.now());
return headline;
}).repeat(100) (3)
.delayElements(Duration.of(1, ChronoUnit.SECONDS)); (4)
}
Streaming JSON on the Server
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
Flux<Headline> streamHeadlines() {
Mono.fromCallable({ (2)
new Headline(text: "Latest Headline at ${ZonedDateTime.now()}")
}).repeat(100) (3)
.delayElements(Duration.of(1, ChronoUnit.SECONDS)) (4)
}
Streaming JSON on the Server
import io.micronaut.http.MediaType.APPLICATION_JSON_STREAM
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit.SECONDS
@Get(value = "/headlines", processes = [APPLICATION_JSON_STREAM]) (1)
internal fun streamHeadlines(): Flux<Headline> {
return Mono.fromCallable { (2)
val headline = Headline()
headline.text = "Latest Headline at ${ZonedDateTime.now()}"
headline
}.repeat(100) (3)
.delayElements(Duration.of(1, ChronoUnit.SECONDS)) (4)
}
1 | The streamHeadlines method produces application/x-json-stream |
2 | A reactor:Flux[] is created from a Callable function (note no blocking occurs within the function, so this is ok, otherwise you should subscribeOn an I/O thread pool). |
3 | The reactor:Flux[] repeats 100 times |
4 | The reactor:Flux[] emits items with a delay of one second between each |
The server does not have to be written in Micronaut, any server that supports JSON streaming will do. |
Then on the client, subscribe to the stream using jsonStream
and every time the server emits a JSON object the client will decode and consume it:
Streaming JSON on the Client
Flux<Headline> headlineStream = Flux.from(client.jsonStream(
GET("/streaming/headlines"), Headline.class)); (1)
CompletableFuture<Headline> future = new CompletableFuture<>(); (2)
headlineStream.subscribe(new Subscriber<Headline>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1); (3)
}
@Override
public void onNext(Headline headline) {
System.out.println("Received Headline = " + headline.getText());
future.complete(headline); (4)
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t); (5)
}
@Override
public void onComplete() {
// no-op (6)
}
});
Streaming JSON on the Client
Flux<Headline> headlineStream = Flux.from(client.jsonStream(
GET("/streaming/headlines"), Headline)) (1)
CompletableFuture<Headline> future = new CompletableFuture<>() (2)
headlineStream.subscribe(new Subscriber<Headline>() {
@Override
void onSubscribe(Subscription s) {
s.request(1) (3)
}
@Override
void onNext(Headline headline) {
println "Received Headline = $headline.text"
future.complete(headline) (4)
}
@Override
void onError(Throwable t) {
future.completeExceptionally(t) (5)
}
@Override
void onComplete() {
// no-op (6)
}
})
Streaming JSON on the Client
val headlineStream = client.jsonStream(
GET<Any>("/streaming/headlines"), Headline::class.java) (1)
val future = CompletableFuture<Headline>() (2)
headlineStream.subscribe(object : Subscriber<Headline> {
override fun onSubscribe(s: Subscription) {
s.request(1) (3)
}
override fun onNext(headline: Headline) {
println("Received Headline = ${headline.text!!}")
future.complete(headline) (4)
}
override fun onError(t: Throwable) {
future.completeExceptionally(t) (5)
}
override fun onComplete() {
// no-op (6)
}
})
1 | The jsonStream method returns a reactor:Flux[] |
2 | A CompletableFuture is used to receive a value, but what you do with each emitted item is application-specific |
3 | The Subscription requests a single item. You can use the Subscription to regulate back pressure and demand. |
4 | The onNext method is called when an item is emitted |
5 | The onError method is called when an error occurs |
6 | The onComplete method is called when all Headline instances have been emitted |
Note neither the server nor the client in the example above perform any blocking I/O.