7.3.2 Streaming with @Client
The @Client annotation can also handle streaming HTTP responses.
Streaming JSON with @Client
For example, to write a client that streams data from the controller defined in the JSON Streaming section of the documentation, you can define a client that returns an unbound Publisher such as Reactor’s reactor:Flux[] or a RxJava’s Flowable:
HeadlineClient.java
import io.micronaut.http.annotation.Get;
import io.micronaut.http.client.annotation.Client;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import static io.micronaut.http.MediaType.APPLICATION_JSON_STREAM;
@Client("/streaming")
public interface HeadlineClient {
@Get(value = "/headlines", processes = APPLICATION_JSON_STREAM) (1)
Publisher<Headline> streamHeadlines(); (2)
HeadlineClient.java
import io.micronaut.http.annotation.Get
import io.micronaut.http.client.annotation.Client
import org.reactivestreams.Publisher
import static io.micronaut.http.MediaType.APPLICATION_JSON_STREAM
@Client("/streaming")
interface HeadlineClient {
@Get(value = "/headlines", processes = APPLICATION_JSON_STREAM) (1)
Publisher<Headline> streamHeadlines() (2)
}
HeadlineClient.java
import io.micronaut.http.MediaType.APPLICATION_JSON_STREAM
import io.micronaut.http.annotation.Get
import io.micronaut.http.client.annotation.Client
import reactor.core.publisher.Flux
@Client("/streaming")
interface HeadlineClient {
@Get(value = "/headlines", processes = [APPLICATION_JSON_STREAM]) (1)
fun streamHeadlines(): Flux<Headline> (2)
1 | The @Get method processes responses of type APPLICATION_JSON_STREAM |
2 | The return type is Publisher |
The following example shows how the previously defined HeadlineClient
can be invoked from a test:
Streaming HeadlineClient
@Test
public void testClientAnnotationStreaming() {
try(EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class)) {
HeadlineClient headlineClient = embeddedServer
.getApplicationContext()
.getBean(HeadlineClient.class); (1)
Mono<Headline> firstHeadline = Mono.from(headlineClient.streamHeadlines()); (2)
Headline headline = firstHeadline.block(); (3)
assertNotNull(headline);
assertTrue(headline.getText().startsWith("Latest Headline"));
}
}
Streaming HeadlineClient
void "test client annotation streaming"() throws Exception {
when:
def headlineClient = embeddedServer.applicationContext
.getBean(HeadlineClient) (1)
Mono<Headline> firstHeadline = Mono.from(headlineClient.streamHeadlines()) (2)
Headline headline = firstHeadline.block() (3)
then:
headline
headline.text.startsWith("Latest Headline")
}
Streaming HeadlineClient
"test client annotation streaming" {
val headlineClient = embeddedServer
.applicationContext
.getBean(HeadlineClient::class.java) (1)
val firstHeadline = headlineClient.streamHeadlines().next() (2)
val headline = firstHeadline.block() (3)
headline shouldNotBe null
headline.text shouldStartWith "Latest Headline"
}
1 | The client is retrieved from the ApplicationContext |
2 | The next method emits the first element emmited by the reactor:Flux[] into a reactor:Mono[]. |
3 | The block() method retrieves the result in the test. |
Streaming Clients and Response Types
The example defined in the previous section expects the server to respond with a stream of JSON objects, and the content type to be application/x-json-stream
. For example:
A JSON Stream
{"title":"The Stand"}
{"title":"The Shining"}
The reason for this is simple; a sequence of JSON object is not, in fact, valid JSON and hence the response content type cannot be application/json
. For the JSON to be valid it would have to return an array:
A JSON Array
[
{"title":"The Stand"},
{"title":"The Shining"}
]
Micronaut’s client does however support streaming of both individual JSON objects via application/x-json-stream
and also JSON arrays defined with application/json
.
If the server returns application/json
and a non-single Publisher is returned (such as a Reactor’s reactor:Flux[] or a RxJava’s Flowable), the client streams the array elements as they become available.
Streaming Clients and Read Timeout
When streaming responses from servers, the underlying HTTP client will not apply the default readTimeout
setting (which defaults to 10 seconds) of the HttpClientConfiguration since the delay between reads for streaming responses may differ from normal reads.
Instead, the read-idle-timeout
setting (which defaults to 5 minutes) dictates when to close a connection after it becomes idle.
If you stream data from a server that defines a longer delay than 5 minutes between items, you should adjust readIdleTimeout
. The following configuration in application.yml
demonstrates how:
Adjusting the readIdleTimeout
micronaut:
http:
client:
read-idle-timeout: 10m
The above example sets the readIdleTimeout
to ten minutes.
Streaming Server Sent Events
Micronaut features a native client for Server Sent Events (SSE) defined by the interface SseClient.
You can use this client to stream SSE events from any server that emits them.
Although SSE streams are typically consumed by a browser EventSource , there are cases where you may wish to consume an SSE stream via SseClient, such as in unit tests or when a Micronaut service acts as a gateway for another service. |
The @Client annotation also supports consuming SSE streams. For example, consider the following controller method that produces a stream of SSE events:
SSE Controller
@Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM) (1)
Publisher<Event<Headline>> streamHeadlines() {
return Flux.<Event<Headline>>create((emitter) -> { (2)
Headline headline = new Headline();
headline.setText("Latest Headline at " + ZonedDateTime.now());
emitter.next(Event.of(headline));
emitter.complete();
}, FluxSink.OverflowStrategy.BUFFER)
.repeat(100) (3)
.delayElements(Duration.of(1, ChronoUnit.SECONDS)); (4)
}
SSE Controller
@Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM) (1)
Flux<Event<Headline>> streamHeadlines() {
Flux.<Event<Headline>>create( { emitter -> (2)
Headline headline = new Headline(text: "Latest Headline at ${ZonedDateTime.now()}")
emitter.next(Event.of(headline))
emitter.complete()
}, FluxSink.OverflowStrategy.BUFFER)
.repeat(100) (3)
.delayElements(Duration.of(1, ChronoUnit.SECONDS)) (4)
}
SSE Controller
@Get(value = "/headlines", processes = [TEXT_EVENT_STREAM]) (1)
internal fun streamHeadlines(): Flux<Event<Headline>> {
return Flux.create<Event<Headline>>( { emitter -> (2)
val headline = Headline()
headline.text = "Latest Headline at ${ZonedDateTime.now()}"
emitter.next(Event.of(headline))
emitter.complete()
}, FluxSink.OverflowStrategy.BUFFER)
.repeat(100) (3)
.delayElements(Duration.of(1, ChronoUnit.SECONDS)) (4)
}
1 | The controller defines a @Get annotation that produces a MediaType.TEXT_EVENT_STREAM |
2 | The method uses Reactor to emit a Headline object |
3 | The repeat method repeats the emission 100 times |
4 | With a delay of one second between each |
Notice that the return type of the controller is also Event and that the Event.of
method creates events to stream to the client.
To define a client that consumes the events, define a method that processes MediaType.TEXT_EVENT_STREAM
:
SSE Client
@Client("/streaming/sse")
public interface HeadlineClient {
@Get(value = "/headlines", processes = TEXT_EVENT_STREAM)
Publisher<Event<Headline>> streamHeadlines();
}
SSE Client
@Client("/streaming/sse")
interface HeadlineClient {
@Get(value = "/headlines", processes = TEXT_EVENT_STREAM)
Flux<Event<Headline>> streamHeadlines()
}
SSE Client
@Client("/streaming/sse")
interface HeadlineClient {
@Get(value = "/headlines", processes = [TEXT_EVENT_STREAM])
fun streamHeadlines(): Flux<Event<Headline>>
}
The generic type of the Flux
can be either an Event, in which case you will receive the full event object, or a POJO, in which case you will receive only the data contained within the event converted from JSON.