7.3.2 Streaming with @Client
The @Client annotation can also handle streaming HTTP responses.
Streaming JSON with @Client
For example to write a client that streams data from the controller defined in the JSON Streaming section of the documentation you can simply define a client that returns an unbound Publisher such as a RxJava Flowable or Reactor Flux
:
HeadlineClient.java
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.client.annotation.Client;
import io.reactivex.Flowable;
@Client("/streaming")
public interface HeadlineClient {
@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
Flowable<Headline> streamHeadlines(); (2)
HeadlineClient.java
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Get
import io.micronaut.http.client.annotation.Client
import io.reactivex.Flowable
@Client("/streaming")
interface HeadlineClient {
@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
Flowable<Headline> streamHeadlines() (2)
}
HeadlineClient.java
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Get
import io.micronaut.http.client.annotation.Client
import io.reactivex.Flowable
@Client("/streaming")
interface HeadlineClient {
@Get(value = "/headlines", processes = [MediaType.APPLICATION_JSON_STREAM]) (1)
fun streamHeadlines(): Flowable<Headline> (2)
1 | The @Get method is defined as processing responses of type APPLICATION_JSON_STREAM |
2 | A Flowable is used as the return type |
The following example shows how the previously defined HeadlineClient
can be invoked from a JUnit test:
Streaming HeadlineClient
@Test
public void testClientAnnotationStreaming() throws Exception {
try( EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class) ) {
HeadlineClient headlineClient = embeddedServer
.getApplicationContext()
.getBean(HeadlineClient.class); (1)
Maybe<Headline> firstHeadline = headlineClient.streamHeadlines().firstElement(); (2)
Headline headline = firstHeadline.blockingGet(); (3)
assertNotNull( headline );
assertTrue( headline.getText().startsWith("Latest Headline") );
}
}
Streaming HeadlineClient
void "test client annotation streaming"() throws Exception {
when:
HeadlineClient headlineClient = embeddedServer.getApplicationContext()
.getBean(HeadlineClient.class) (1)
Maybe<Headline> firstHeadline = headlineClient.streamHeadlines().firstElement() (2)
Headline headline = firstHeadline.blockingGet() (3)
then:
null != headline
headline.getText().startsWith("Latest Headline")
}
Streaming HeadlineClient
"test client annotation streaming" {
val headlineClient = embeddedServer
.applicationContext
.getBean(HeadlineClient::class.java) (1)
val firstHeadline = headlineClient.streamHeadlines().firstElement() (2)
val headline = firstHeadline.blockingGet() (3)
headline shouldNotBe null
headline.text shouldStartWith "Latest Headline"
}
1 | The client is retrieved from the ApplicationContext |
2 | The firstElement method is used to return the first emitted item from the Flowable as a Maybe. |
3 | The blockingGet() is used in the test to retrieve the result. |
Streaming Clients and Response Types
The example defined in the previous section expects the server to respond with a stream of JSON objects and the content type to be application/x-json-stream
. For example:
A JSON Stream
{"title":"The Stand"}
{"title":"The Shining"}
The reason for this is simple, a sequence of JSON object is not, in fact, valid JSON and hence the response content type cannot be application/json
. For the JSON to be valid it would have to return an array:
A JSON Array
[
{"title":"The Stand"},
{"title":"The Shining"}
]
Micronaut’s client does however support streaming of both individual JSON objects via application/x-json-stream
and also JSON arrays defined with application/json
.
If the server returns application/json
and a non-single Publisher is returned (such as an Flowable or a Reactor Flux
) then the client with stream the array elements as they become available.
Streaming Clients and Read Timeout
When streaming responses from servers, the underlying HTTP client will not apply the default readTimeout
setting (which defaults to 10 seconds) of the HttpClientConfiguration since the delay between reads for streaming responses may differ from normal reads.
Instead the read-idle-timeout
setting (which defaults to 60 seconds) is used to dictate when a connection should be closed after becoming idle.
If you are streaming data from a server that defines a longer delay than 60 seconds between items being sent to the client you should adjust the readIdleTimeout
. The following configuration in application.yml
demonstrates how:
Adjusting the readIdleTimeout
micronaut:
http:
client:
read-idle-timeout: 5m
The above example sets the readIdleTimeout
to 5 minutes.
Streaming Server Sent Events
Micronaut features a native client for Server Sent Events (SSE) defined by the interface SseClient.
You can use this client to stream SSE events from any server that emits them.
Although SSE streams are typically consumed by a browser EventSource , there are a few cases where you may wish to consume a SSE stream via SseClient such as in unit testing or when a Micronaut service acts as a gateway for another service. |
The @Client annotation also supports consuming SSE streams. For example, consider the following controller method that produces a stream of SSE events:
SSE Controller
@Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM) (1)
Flowable<Event<Headline>> streamHeadlines() {
return Flowable.<Event<Headline>>create((emitter) -> { (2)
Headline headline = new Headline();
headline.setText("Latest Headline at " + ZonedDateTime.now());
emitter.onNext(Event.of(headline));
emitter.onComplete();
}, BackpressureStrategy.BUFFER).repeat(100) (3)
.delay(1, TimeUnit.SECONDS); (4)
}
SSE Controller
@Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM) (1)
Flowable<Event<Headline>> streamHeadlines() {
Flowable.<Event<Headline>>create( { emitter -> (2)
Headline headline = new Headline()
headline.setText("Latest Headline at " + ZonedDateTime.now())
emitter.onNext(Event.of(headline))
emitter.onComplete()
}, BackpressureStrategy.BUFFER).repeat(100) (3)
.delay(1, TimeUnit.SECONDS) (4)
}
SSE Controller
@Get(value = "/headlines", processes = [MediaType.TEXT_EVENT_STREAM]) (1)
internal fun streamHeadlines(): Flowable<Event<Headline>> {
return Flowable.create<Event<Headline>>( { (2)
emitter ->
val headline = Headline()
headline.text = "Latest Headline at " + ZonedDateTime.now()
emitter.onNext(Event.of(headline))
emitter.onComplete()
}, BackpressureStrategy.BUFFER).repeat(100) (3)
.delay(1, TimeUnit.SECONDS) (4)
}
1 | The controller defines a @Get annotation that produces a MediaType.TEXT_EVENT_STREAM |
2 | The method itself uses Reactor to emit a hypothetical Headline object |
3 | The repeat method is used to repeat the emission 100 times |
4 | With a delay of 1 second between each item emitted. |
Notice that the return type of the controller is also Event and that the Event.of
method is used to create events to stream to the client.
To define a client that consumes the events you simply have to define a method that processes MediaType.TEXT_EVENT_STREAM
:
SSE Client
@Client("/streaming/sse")
public interface HeadlineClient {
@Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM)
Flowable<Event<Headline>> streamHeadlines();
}
SSE Client
@Client("/streaming/sse")
interface HeadlineClient {
@Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM)
Flowable<Event<Headline>> streamHeadlines()
}
SSE Client
@Client("/streaming/sse")
interface HeadlineClient {
@Get(value = "/headlines", processes = [MediaType.TEXT_EVENT_STREAM])
fun streamHeadlines(): Flowable<Event<Headline>>
}
The generic type of the Flux
or Flowable
can be either an Event, in which case you will receive the full event object, or a POJO, in which case you will receive only the data contained within the event converted from JSON.