6.25 Server Sent Events
The Micronaut HTTP server supports emitting Server Sent Events (SSE) using the Event API.
To emit events from the server, return a Reactive Streams Publisher that emits objects of type Event.
The Publisher itself could publish events from a background task, via an event system, etc.
Imagine for an example a event stream of news headlines; you may define a data class as follows:
Headline
public class Headline {
private String title;
private String description;
public Headline() {}
public Headline(String title, String description) {
this.title = title;
this.description = description;
}
public String getTitle() {
return title;
}
public String getDescription() {
return description;
}
public void setTitle(String title) {
this.title = title;
}
public void setDescription(String description) {
this.description = description;
}
}
Headline
class Headline {
String title
String description
Headline() {}
Headline(String title, String description) {
this.title = title;
this.description = description;
}
}
Headline
class Headline {
var title: String? = null
var description: String? = null
constructor()
constructor(title: String, description: String) {
this.title = title
this.description = description
}
}
To emit news headline events, write a controller that returns a Publisher of Event instances using whichever Reactive library you prefer. The example below uses Project Reactor‘s reactor:Flux[] via the generate
method:
Publishing Server Sent Events from a Controller
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@Controller("/headlines")
public class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = MediaType.TEXT_EVENT_STREAM)
public Publisher<Event<Headline>> index() { (1)
String[] versions = {"1.0", "2.0"}; (2)
return Flux.generate(() -> 0, (i, emitter) -> { (3)
if (i < versions.length) {
emitter.next( (4)
Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
);
} else {
emitter.complete(); (5)
}
return ++i;
});
}
}
Publishing Server Sent Events from a Controller
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
@Controller("/headlines")
class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = MediaType.TEXT_EVENT_STREAM)
Publisher<Event<Headline>> index() { (1)
String[] versions = ["1.0", "2.0"] (2)
Flux.generate(() -> 0, (i, emitter) -> {
if (i < versions.length) {
emitter.next( (4)
Event.of(new Headline("Micronaut ${versions[i]} Released", "Come and get it"))
)
} else {
emitter.complete() (5)
}
return i + 1
})
}
}
Publishing Server Sent Events from a Controller
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
import java.util.concurrent.Callable
import java.util.function.BiFunction
@Controller("/headlines")
class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = [MediaType.TEXT_EVENT_STREAM])
fun index(): Publisher<Event<Headline>> { (1)
val versions = arrayOf("1.0", "2.0") (2)
return Flux.generate(
{ 0 },
BiFunction { i: Int, emitter: SynchronousSink<Event<Headline>> -> (3)
if (i < versions.size) {
emitter.next( (4)
Event.of(
Headline(
"Micronaut " + versions[i] + " Released", "Come and get it"
)
)
)
} else {
emitter.complete() (5)
}
return@BiFunction i + 1
})
}
}
1 | The controller method returns a Publisher of Event |
2 | A headline is emitted for each version of Micronaut |
3 | The reactor:Flux[] type’s generate method generates a Publisher. The generate method accepts an initial value and a lambda that accepts the value and a Emitter. Note that this example executes on the same thread as the controller action, but you could use subscribeOn or map an existing “hot” Flux. |
4 | The Emitter interface onNext method emits objects of type Event. The Event.of(ET) factory method constructs the event. |
5 | The Emitter interface onComplete method indicates when to finish sending server sent events. |
You typically want to schedule SSE event streams on a separate executor. The previous example uses @ExecuteOn to execute the stream on the I/O executor. |
The above example sends back a response of type text/event-stream
and for each Event emitted the Headline
type previously will be converted to JSON resulting in responses such as:
Server Sent Event Response Output
data: {"title":"Micronaut 1.0 Released","description":"Come and get it"}
data: {"title":"Micronaut 2.0 Released","description":"Come and get it"}
You can use the methods of the Event interface to customize the Server Sent Event data sent back, including associating event ids, comments, retry timeouts, etc.