6.13 Reactive HTTP Request Processing
As mentioned previously, Micronaut is built on Netty which is designed around an Event loop model and non-blocking I/O. Micronaut executes code defined in @Controller beans in the same thread as the request thread (an Event Loop thread).
This makes it critical that if you do any blocking I/O operations (for example interactions with Hibernate/JPA or JDBC) that you offload those tasks to a separate thread pool that does not block the Event loop.
For example the following configuration configures the I/O thread pool as a fixed thread pool with 75 threads (similar to what a traditional blocking server such as Tomcat uses in the thread-per-request model):
Configuring the IO thread pool
micronaut:
executors:
io:
type: fixed
nThreads: 75
To use this thread pool in a @Controller bean you have a number of options. The simplest is to use the @ExecuteOn annotation, which can be declared at the type or method level to indicate which configured thread pool to run the method(s) of the controller on:
Using @ExecuteOn
import io.micronaut.docs.http.server.reactive.PersonService;
import io.micronaut.docs.ioc.beans.Person;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
@Controller("/executeOn/people")
public class PersonController {
private final PersonService personService;
PersonController(PersonService personService) {
this.personService = personService;
}
@Get("/{name}")
@ExecuteOn(TaskExecutors.IO) (1)
Person byName(String name) {
return personService.findByName(name);
}
}
Using @ExecuteOn
import io.micronaut.docs.http.server.reactive.PersonService
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
@Controller("/executeOn/people")
class PersonController {
private final PersonService personService
PersonController(PersonService personService) {
this.personService = personService
}
@Get("/{name}")
@ExecuteOn(TaskExecutors.IO) (1)
Person byName(String name) {
personService.findByName(name)
}
}
Using @ExecuteOn
import io.micronaut.docs.http.server.reactive.PersonService
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
@Controller("/executeOn/people")
class PersonController (private val personService: PersonService) {
@Get("/{name}")
@ExecuteOn(TaskExecutors.IO) (1)
fun byName(name: String): Person {
return personService.findByName(name)
}
}
1 | The @ExecuteOn annotation is used to execute the operation on the I/O thread pool |
The value of the @ExecuteOn annotation can be any named executor defined under micronaut.executors
.
Generally speaking for database operations you want a thread pool configured that matches the maximum number of connections specified in the database connection pool. |
An alternative to the @ExecuteOn annotation is to use the facility provided by the reactive library you have chosen. Reactive implementations such as Project Reactor or RxJava feature a subscribeOn
method which lets you alter which thread executes user code. For example:
Reactive subscribeOn Example
import io.micronaut.docs.ioc.beans.Person;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.scheduling.TaskExecutors;
import jakarta.inject.Named;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import io.micronaut.core.async.annotation.SingleResult;
import java.util.concurrent.ExecutorService;
@Controller("/subscribeOn/people")
public class PersonController {
private final Scheduler scheduler;
private final PersonService personService;
PersonController(
@Named(TaskExecutors.IO) ExecutorService executorService, (1)
PersonService personService) {
this.scheduler = Schedulers.fromExecutorService(executorService);
this.personService = personService;
}
@Get("/{name}")
@SingleResult
Publisher<Person> byName(String name) {
return Mono
.fromCallable(() -> personService.findByName(name)) (2)
.subscribeOn(scheduler); (3)
}
}
Reactive subscribeOn Example
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import jakarta.inject.Named
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.util.concurrent.ExecutorService
@Controller("/subscribeOn/people")
class PersonController {
private final Scheduler scheduler
private final PersonService personService
PersonController(
@Named(TaskExecutors.IO) ExecutorService executorService, (1)
PersonService personService) {
this.scheduler = Schedulers.fromExecutorService(executorService)
this.personService = personService
}
@Get("/{name}")
Mono<Person> byName(String name) {
return Mono
.fromCallable({ -> personService.findByName(name) }) (2)
.subscribeOn(scheduler) (3)
}
}
Reactive subscribeOn Example
import io.micronaut.docs.ioc.beans.Person
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import java.util.concurrent.ExecutorService
import jakarta.inject.Named
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
@Controller("/subscribeOn/people")
class PersonController internal constructor(
@Named(TaskExecutors.IO) executorService: ExecutorService, (1)
private val personService: PersonService) {
private val scheduler: Scheduler = Schedulers.fromExecutorService(executorService)
@Get("/{name}")
fun byName(name: String): Mono<Person> {
return Mono
.fromCallable { personService.findByName(name) } (2)
.subscribeOn(scheduler) (3)
}
}
1 | The configured I/O executor service is injected |
2 | The Mono::fromCallable method wraps the blocking operation |
3 | The Project Reactor subscribeOn method schedules the operation on the I/O thread pool |