6.12 Reactive HTTP Request Processing
As mentioned previously, Micronaut is built on Netty which is designed around an Event loop model and non-blocking I/O. Micronaut will execute code defined in @Controller beans in the same thread as the request thread (an Event Loop thread).
This makes it critical that if you do any blocking I/O operations (for example interactions with Hibernate/JPA or JDBC) that you offload those tasks to a separate thread pool that does not block the Event loop.
For example the following configuration will configure the I/O thread pool as a fixed thread pool with 75 threads (similar to what a traditional blocking server such as Tomcat uses in the thread per request model):
Configuring the IO thread pool
micronaut:
executors:
io:
type: fixed
nThreads: 75
To use this thread pool in a @Controller bean you have a number of options. The simplest option is to use the @ExecuteOn annotation which can be declared at the type or method level to indicate which configured thread pool you wish to run the method or methods of the controller on:
Using @ExecuteOn
import io.micronaut.http.annotation.*;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
@Controller("/executeOn/people")
public class PersonController {
private final PersonService personService;
PersonController(
PersonService personService) {
this.personService = personService;
}
@Get("/{name}")
@ExecuteOn(TaskExecutors.IO) (1)
Person byName(String name) {
return personService.findByName(name);
}
}
Using @ExecuteOn
import io.micronaut.http.annotation.*
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
@Controller("/executeOn/people")
class PersonController {
private final PersonService personService
PersonController(
PersonService personService) {
this.personService = personService
}
@Get("/{name}")
@ExecuteOn(TaskExecutors.IO) (1)
Person byName(String name) {
return personService.findByName(name)
}
}
Using @ExecuteOn
import io.micronaut.http.annotation.*
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
@Controller("/executeOn/people")
class PersonController (private val personService: PersonService) {
@Get("/{name}")
@ExecuteOn(TaskExecutors.IO) (1)
fun byName(name: String): Person {
return personService.findByName(name)
}
}
1 | The @ExecuteOn annotation is used to execute the operation on the I/O thread pool |
The value of the @ExecuteOn annotation can be any named executor defined under micronaut.executors
.
Generally speaking for database operations you will want a thread pool configured that matches maximum number of connections you have specified in the database connection pool. |
An alternative to the @ExecuteOn annotation is to use the facility provided by the reactive library you have chosen. RxJava for example features a subscribeOn
method which allows you to alter which thread executes user code. For example:
RxJava subscribeOn Example
import io.micronaut.http.annotation.*;
import io.micronaut.scheduling.TaskExecutors;
import io.reactivex.*;
import io.reactivex.schedulers.Schedulers;
import javax.inject.Named;
import java.util.concurrent.ExecutorService;
@Controller("/subscribeOn/people")
public class PersonController {
private final Scheduler scheduler;
private final PersonService personService;
PersonController(
@Named(TaskExecutors.IO) ExecutorService executorService, (1)
PersonService personService) {
this.scheduler = Schedulers.from(executorService);
this.personService = personService;
}
@Get("/{name}")
Single<Person> byName(String name) {
return Single.fromCallable(() ->
personService.findByName(name) (2)
).subscribeOn(scheduler); (3)
}
}
RxJava subscribeOn Example
import io.micronaut.http.annotation.*
import io.micronaut.scheduling.TaskExecutors
import io.reactivex.*
import io.reactivex.schedulers.Schedulers
import javax.inject.Named
import java.util.concurrent.ExecutorService
@Controller("/subscribeOn/people")
class PersonController {
private final Scheduler scheduler
private final PersonService personService
PersonController(
@Named(TaskExecutors.IO) ExecutorService executorService, (1)
PersonService personService) {
this.scheduler = Schedulers.from(executorService)
this.personService = personService
}
@Get("/{name}")
Single<Person> byName(String name) {
return Single.fromCallable({ -> (2)
personService.findByName(name)
}).subscribeOn(scheduler) (3)
}
}
RxJava subscribeOn Example
import io.micronaut.http.annotation.*
import io.micronaut.scheduling.TaskExecutors
import io.reactivex.*
import io.reactivex.schedulers.Schedulers
import java.util.concurrent.ExecutorService
import javax.inject.Named
@Controller("/subscribeOn/people")
class PersonController internal constructor(
@Named(TaskExecutors.IO) executorService: ExecutorService, (1)
val personService: PersonService) {
private val scheduler: Scheduler
init {
scheduler = Schedulers.from(executorService)
}
@Get("/{name}")
fun byName(name: String): Single<Person> {
return Single.fromCallable { personService.findByName(name) } (2)
.subscribeOn(scheduler) (3)
}
}
1 | The configured I/O executor service is injected |
2 | RxJava’s fromCallable method is used to wrap the blocking operation |
3 | RxJava’s subscribeOn method is used to schedule the operation on the I/O thread pool |