6.23 Server Sent Events
The Micronaut HTTP server supports emitting Server Sent Events (SSE) using the Event API.
To emit events from the server you simply return a Reactive Streams Publisher that emits objects of type Event.
The Publisher itself could publish events from a background task, via an event system or whatever.
Imagine for an example a event stream of news headlines, you may define a data class as follows:
Headline
public class Headline {
private String title;
private String description;
public Headline() { }
public Headline(String title, String description) {
this.title = title;
this.description = description;
}
public String getTitle() {
return title;
}
public String getDescription() {
return description;
}
public void setTitle(String title) {
this.title = title;
}
public void setDescription(String description) {
this.description = description;
}
}
Headline
class Headline {
String title;
String description;
Headline() {}
Headline(String title, String description) {
this.title = title;
this.description = description;
}
}
Headline
class Headline {
var title: String? = null
var description: String? = null
constructor() {}
constructor(title: String, description: String) {
this.title = title
this.description = description
}
}
To emit news headline events you can write a controller that returns a Publisher of Event instances using which ever Reactive library you prefer. The example below uses RxJava 2’s Flowable via the generate
method:
Publishing Server Sent Events from a Controller
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.*;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.reactivex.Flowable;
import org.reactivestreams.Publisher;
@Controller("/headlines")
public class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = MediaType.TEXT_EVENT_STREAM)
public Publisher<Event<Headline>> index() { (1)
String[] versions = new String[]{"1.0", "2.0"}; (2)
return Flowable.generate(() -> 0, (i, emitter) -> { (3)
if (i < versions.length) {
emitter.onNext( (4)
Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
);
} else {
emitter.onComplete(); (5)
}
return ++i;
});
}
}
Publishing Server Sent Events from a Controller
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import io.reactivex.Emitter
import io.reactivex.Flowable
import io.reactivex.functions.BiFunction
import org.reactivestreams.Publisher
@Controller("/headlines")
class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = MediaType.TEXT_EVENT_STREAM)
Publisher<Event<Headline>> index() { (1)
String[] versions = ["1.0", "2.0"] (2)
def initialState = { -> 0 }
def emitterFunction = { Integer i, Emitter emitter -> (3)
if (i < versions.length) {
emitter.onNext( (4)
Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
)
} else {
emitter.onComplete() (5)
}
return ++i
}
return Flowable.generate(initialState, emitterFunction as BiFunction<Integer,Emitter<Event<Headline>>,Integer>)
}
}
Publishing Server Sent Events from a Controller
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import io.reactivex.Emitter
import io.reactivex.Flowable
import io.reactivex.functions.BiFunction
import org.reactivestreams.Publisher
import java.util.concurrent.Callable
@Controller("/headlines")
class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = [MediaType.TEXT_EVENT_STREAM])
fun index(): Publisher<Event<Headline>> { (1)
val versions = arrayOf("1.0", "2.0") (2)
return Flowable.generate<Event<Headline>, Int>(Callable<Int>{ 0 }, BiFunction { (3)
i: Int, emitter: Emitter<Event<Headline>> ->
var nextInt: Int = i
if (i < versions.size) {
emitter.onNext( (4)
Event.of<Headline>(Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
)
} else {
emitter.onComplete() (5)
}
++nextInt
})
}
}
1 | The controller method returns a Publisher of Event |
2 | For each version of Micronaut a headline is emitted |
3 | The Flowable type’s generate method is used to generate a Publisher. The generate method accepts an initial value and a lambda that accepts the value and a Emitter. Note that this example executes on the same thread as the controller action, but you could use subscribeOn or map and existing “hot” Flowable. |
4 | The Emitter interface’s onNext method is used to emit objects of type Event. The Event.of(ET) factory method is used to construct the event. |
5 | The Emitter interface’s onComplete method is used to indicate when to finish sending server sent events. |
You typically want to schedule SSE event streams on a separate executor. The previous example uses @ExecuteOn to execute the stream on the I/O executor. |
The above example will send back a response of type text/event-stream
and for each Event emitted the Headline
type previously will be converted to JSON resulting in responses such as:
Server Sent Event Response Output
data: {"title":"Micronaut 1.0 Released","description":"Come and get it"}
data: {"title":"Micronaut 2.0 Released","description":"Come and get it"}
You can use the methods of the Event interface to customize the Server Sent Event data sent back including associating event ids, comments, retry timeouts etc.