7.1.4 Streaming JSON over HTTP
Micronaut’s HTTP client includes support for streaming data over HTTP via the RxStreamingHttpClient interface which includes methods specific to HTTP streaming including:
Method | Description |
---|---|
| Returns a stream of data as a Flowable of ByteBuffer |
| Returns the HttpResponse wrapping a Flowable of ByteBuffer |
| Returns a non-blocking stream of JSON objects |
In order to do JSON streaming you should on the server side declare a controller method that returns a application/x-json-stream
of JSON objects. For example:
Streaming JSON on the Server
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.reactivex.Flowable;
import java.time.ZonedDateTime;
import java.util.concurrent.TimeUnit;
@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
Flowable<Headline> streamHeadlines() {
return Flowable.fromCallable(() -> { (2)
Headline headline = new Headline();
headline.setText("Latest Headline at " + ZonedDateTime.now());
return headline;
}).repeat(100) (3)
.delay(1, TimeUnit.SECONDS); (4)
}
Streaming JSON on the Server
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.reactivex.Flowable
import java.time.ZonedDateTime
import java.util.concurrent.TimeUnit
@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
Flowable<Headline> streamHeadlines() {
Flowable.fromCallable({ (2)
Headline headline = new Headline()
headline.setText("Latest Headline at " + ZonedDateTime.now())
return headline
}).repeat(100) (3)
.delay(1, TimeUnit.SECONDS) (4)
}
Streaming JSON on the Server
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.reactivex.Flowable
import java.time.ZonedDateTime
import java.util.concurrent.TimeUnit
@Get(value = "/headlines", processes = [MediaType.APPLICATION_JSON_STREAM]) (1)
internal fun streamHeadlines(): Flowable<Headline> {
return Flowable.fromCallable {
(2)
val headline = Headline()
headline.text = "Latest Headline at " + ZonedDateTime.now()
headline
}.repeat(100) (3)
.delay(1, TimeUnit.SECONDS) (4)
}
1 | A method streamHeadlines is defined that produces application/x-json-stream |
2 | A Flowable is created from a Callable function (note no blocking occurs within the function so this is ok, otherwise you would want to subscribeOn an I/O thread pool). |
3 | The Flowable is set to repeat 100 times |
4 | The Flowable will emit items with a delay of 1 second between each item |
The server does not have to be written in Micronaut, any server that supports JSON streaming will do. |
Then on the client simply subscribe to the stream using jsonStream
and every time the server emits a JSON object the client will decode and consume it:
Streaming JSON on the Client
Flowable<Headline> headlineStream = client.jsonStream(GET("/streaming/headlines"), Headline.class); (1)
CompletableFuture<Headline> future = new CompletableFuture<>(); (2)
headlineStream.subscribe(new Subscriber<Headline>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1); (3)
}
@Override
public void onNext(Headline headline) {
System.out.println("Received Headline = " + headline.getText());
future.complete(headline); (4)
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t); (5)
}
@Override
public void onComplete() {
// no-op (6)
}
});
Streaming JSON on the Client
Flowable<Headline> headlineStream = client.jsonStream(GET("/streaming/headlines"), Headline.class) (1)
CompletableFuture<Headline> future = new CompletableFuture<>() (2)
headlineStream.subscribe(new Subscriber<Headline>() {
@Override
void onSubscribe(Subscription s) {
s.request(1) (3)
}
@Override
void onNext(Headline headline) {
System.out.println("Received Headline = " + headline.getText())
future.complete(headline) (4)
}
@Override
void onError(Throwable t) {
future.completeExceptionally(t) (5)
}
@Override
void onComplete() {
// no-op (6)
}
})
Streaming JSON on the Client
val headlineStream = client.jsonStream(GET<Any>("/streaming/headlines"), Headline::class.java) (1)
val future = CompletableFuture<Headline>() (2)
headlineStream.subscribe(object : Subscriber<Headline> {
override fun onSubscribe(s: Subscription) {
s.request(1) (3)
}
override fun onNext(headline: Headline) {
println("Received Headline = " + headline.text!!)
future.complete(headline) (4)
}
override fun onError(t: Throwable) {
future.completeExceptionally(t) (5)
}
override fun onComplete() {
// no-op (6)
}
})
1 | The jsonStream method is used return a Flowable |
2 | A CompletableFuture is used in the example to receive a value, but what you do with each emitted item is application specific |
3 | The Subscription is used to request a single item. You can use the Subscription to regulate back pressure and demand. |
4 | The onNext method is called when an item is emitted |
5 | The onError method is called when an error occurs |
6 | The onComplete method is called when all Headline instances have been emitted |
Note neither the server or the client in the example above perform blocking I/O at any point.