7.2.2 Streaming with @Client

The @Client annotation can also handle streaming HTTP responses.

Streaming JSON with @Client

For example to write a client that streams data from the controller defined in the JSON Streaming section of the documentation you can simply define a client that returns an unbound Publisher such as a RxJava Flowable or Reactor Flux:

HeadlineClient.java

  1. import io.micronaut.http.MediaType;
  2. import io.micronaut.http.annotation.Get;
  3. import io.micronaut.http.client.annotation.Client;
  4. import io.reactivex.Flowable;
  5. import reactor.core.publisher.Flux;
  6. @Client("/streaming")
  7. public interface HeadlineClient {
  8. @Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
  9. Flowable<Headline> streamHeadlines(); (2)

HeadlineClient.java

  1. import io.micronaut.http.MediaType
  2. import io.micronaut.http.annotation.Get
  3. import io.micronaut.http.client.annotation.Client
  4. import io.reactivex.Flowable
  5. import reactor.core.publisher.Flux
  6. @Client("/streaming")
  7. interface HeadlineClient {
  8. @Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
  9. Flowable<Headline> streamHeadlines() (2)
  10. }

HeadlineClient.java

  1. import io.micronaut.http.MediaType
  2. import io.micronaut.http.annotation.Get
  3. import io.micronaut.http.client.annotation.Client
  4. import io.reactivex.Flowable
  5. import reactor.core.publisher.Flux
  6. @Client("/streaming")
  7. interface HeadlineClient {
  8. @Get(value = "/headlines", processes = [MediaType.APPLICATION_JSON_STREAM]) (1)
  9. fun streamHeadlines(): Flowable<Headline> (2)
1The @Get method is defined as processing responses of type APPLICATION_JSON_STREAM
2A Flowable is used as the return type

The following example shows how the previously defined HeadlineClient can be invoked from a JUnit test:

Streaming HeadlineClient

  1. @Test
  2. public void testClientAnnotationStreaming() throws Exception {
  3. try( EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class) ) {
  4. HeadlineClient headlineClient = embeddedServer
  5. .getApplicationContext()
  6. .getBean(HeadlineClient.class); (1)
  7. Maybe<Headline> firstHeadline = headlineClient.streamHeadlines().firstElement(); (2)
  8. Headline headline = firstHeadline.blockingGet(); (3)
  9. assertNotNull( headline );
  10. assertTrue( headline.getText().startsWith("Latest Headline") );
  11. }
  12. }

Streaming HeadlineClient

  1. void "test client annotation streaming"() throws Exception {
  2. when:
  3. HeadlineClient headlineClient = embeddedServer.getApplicationContext()
  4. .getBean(HeadlineClient.class) (1)
  5. Maybe<Headline> firstHeadline = headlineClient.streamHeadlines().firstElement() (2)
  6. Headline headline = firstHeadline.blockingGet() (3)
  7. then:
  8. null != headline
  9. headline.getText().startsWith("Latest Headline")
  10. }

Streaming HeadlineClient

  1. "test client annotation streaming" {
  2. val headlineClient = embeddedServer
  3. .applicationContext
  4. .getBean(HeadlineClient::class.java) (1)
  5. val firstHeadline = headlineClient.streamHeadlines().firstElement() (2)
  6. val headline = firstHeadline.blockingGet() (3)
  7. headline shouldNotBe null
  8. headline.text shouldStartWith "Latest Headline"
  9. }
1The client is retrieved from the ApplicationContext
2The firstElement method is used to return the first emitted item from the Flowable as a Maybe.
3The blockingGet() is used in the test to retrieve the result.

Streaming Clients and Response Types

The example defined in the previous section expects the server to respond with a stream of JSON objects and the content type to be application/x-json-stream. For example:

A JSON Stream

  1. {"title":"The Stand"}
  2. {"title":"The Shining"}

The reason for this is simple, a sequence of JSON object is not, in fact, valid JSON and hence the response content type cannot be application/json. For the JSON to be valid it would have to return an array:

A JSON Array

  1. [
  2. {"title":"The Stand"},
  3. {"title":"The Shining"}
  4. ]

Micronaut’s client does however support streaming of both individual JSON objects via application/x-json-stream and also JSON arrays defined with application/json.

If the server returns application/json and a non-single Publisher is returned (such as an Flowable or a Reactor Flux) then the client with stream the array elements as they become available.

Streaming Clients and Read Timeout

When streaming responses from servers, the underlying HTTP client will not apply the default readTimeout setting (which defaults to 10 seconds) of the HttpClientConfiguration since the delay between reads for streaming responses may differ from normal reads.

Instead the read-idle-timeout setting (which defaults to 60 seconds) is used to dictate when a connection should be closed after becoming idle.

If you are streaming data from a server that defines a longer delay than 60 seconds between items being sent to the client you should adjust the readIdleTimeout. The following configuration in application.yml demonstrates how:

Adjusting the readIdleTimeout

  1. micronaut:
  2. http:
  3. client:
  4. read-idle-timeout: 5m

The above example sets the readIdleTimeout to 5 minutes.

Streaming Server Sent Events

Micronaut features a native client for Server Sent Events (SSE) defined by the interface SseClient.

You can use this client to stream SSE events from any server that emits them.

Although SSE streams are typically consumed by a browser EventSource, there are a few cases where you may wish to consume a SSE stream via SseClient such as in unit testing or when a Micronaut service acts as a gateway for another service.

The @Client annotation also supports consuming SSE streams. For example, consider the following controller method that produces a stream of SSE events:

SSE Controller

  1. @Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM) (1)
  2. Flux<Event<Headline>> streamHeadlines() {
  3. return Flux.<Event<Headline>>create((emitter) -> { (2)
  4. Headline headline = new Headline();
  5. headline.setText("Latest Headline at " + ZonedDateTime.now());
  6. emitter.next(Event.of(headline));
  7. emitter.complete();
  8. }).repeat(100) (3)
  9. .delayElements(Duration.ofSeconds(1)); (4)
  10. }

SSE Controller

  1. @Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM) (1)
  2. Flux<Event<Headline>> streamHeadlines() {
  3. Flux.<Event<Headline>>create { emitter -> (2)
  4. Headline headline = new Headline()
  5. headline.setText("Latest Headline at " + ZonedDateTime.now())
  6. emitter.next(Event.of(headline))
  7. emitter.complete()
  8. }.repeat(100) (3)
  9. .delayElements(Duration.ofSeconds(1)) (4)
  10. }

SSE Controller

  1. @Get(value = "/headlines", processes = [MediaType.TEXT_EVENT_STREAM]) (1)
  2. internal fun streamHeadlines(): Flux<Event<Headline>> {
  3. return Flux.create<Event<Headline>> { (2)
  4. emitter ->
  5. val headline = Headline()
  6. headline.text = "Latest Headline at " + ZonedDateTime.now()
  7. emitter.next(Event.of(headline))
  8. emitter.complete()
  9. }.repeat(100) (3)
  10. .delayElements(Duration.ofSeconds(1)) (4)
  11. }
1The controller defines a @Get annotation that produces a MediaType.TEXT_EVENT_STREAM
2The method itself uses Reactor to emit a hypothetical Headline object
3The repeat method is used to repeat the emission 100 times
4With a delay of 1 second between each item emitted.

Notice that the return type of the controller is also Event and that the Event.of method is used to create events to stream to the client.

To define a client that consumes the events you simply have to define a method that processes MediaType.TEXT_EVENT_STREAM:

SSE Client

  1. @Client("/streaming/sse")
  2. public interface HeadlineClient {
  3. @Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM)
  4. Flux<Event<Headline>> streamHeadlines();
  5. }

SSE Client

  1. @Client("/streaming/sse")
  2. interface HeadlineClient {
  3. @Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM)
  4. Flux<Event<Headline>> streamHeadlines()
  5. }

SSE Client

  1. @Client("/streaming/sse")
  2. interface HeadlineClient {
  3. @Get(value = "/headlines", processes = [MediaType.TEXT_EVENT_STREAM])
  4. fun streamHeadlines(): Flux<Event<Headline>>
  5. }

The generic type of the Flux or Flowable can be either an Event, in which case you will receive the full event object, or a POJO, in which case you will receive only the data contained within the event converted from JSON.