Writing a Filter
Consider a hypothetical use case whereby you wish to trace each request to the Micronaut “Hello World” example using some external system. The external system could be a database, a distributed tracing service and may require I/O operations.
What you don’t want to do is block the underlying Netty event loop within your filter, instead you want the filter to proceed with execution once any I/O is complete.
As an example, consider the following example TraceService
that uses RxJava to compose an I/O operation:
A TraceService Example using RxJava
import io.micronaut.http.HttpRequest;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Singleton;
@Singleton
public class TraceService {
private static final Logger LOG = LoggerFactory.getLogger(TraceService.class);
Flowable<Boolean> trace(HttpRequest<?> request) {
return Flowable.fromCallable(() -> { (1)
if (LOG.isDebugEnabled()) {
LOG.debug("Tracing request: " + request.getUri());
}
// trace logic here, potentially performing I/O (2)
return true;
}).subscribeOn(Schedulers.io()); (3)
}
}
A TraceService Example using RxJava
import io.micronaut.http.HttpRequest
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import javax.inject.Singleton
@Singleton
class TraceService {
private static final Logger LOG = LoggerFactory.getLogger(TraceService.class)
Flowable<Boolean> trace(HttpRequest<?> request) {
Flowable.fromCallable({ -> (1)
if (LOG.isDebugEnabled()) {
LOG.debug("Tracing request: " + request.getUri())
}
// trace logic here, potentially performing I/O (2)
return true
}).subscribeOn(Schedulers.io()) (3)
}
}
A TraceService Example using RxJava
import io.micronaut.http.HttpRequest
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
import org.slf4j.LoggerFactory
import javax.inject.Singleton
@Singleton
class TraceService {
private val LOG = LoggerFactory.getLogger(TraceService::class.java)
internal fun trace(request: HttpRequest<*>): Flowable<Boolean> {
return Flowable.fromCallable {
(1)
if (LOG.isDebugEnabled) {
LOG.debug("Tracing request: " + request.uri)
}
// trace logic here, potentially performing I/O (2)
true
}.subscribeOn(Schedulers.io()) (3)
}
}
1 | The Flowable type is used to create logic that executes potentially blocking operations to write the trace data from the request |
2 | Since this is just an example the logic does nothing and a place holder comment is used |
3 | The RxJava I/O scheduler is used to execute the logic |
You can then inject this implementation into your filter definition:
An Example HttpServerFilter
import io.micronaut.http.*;
import io.micronaut.http.annotation.Filter;
import io.micronaut.http.filter.*;
import org.reactivestreams.Publisher;
@Filter("/hello/**") (1)
public class TraceFilter implements HttpServerFilter { (2)
private final TraceService traceService;
public TraceFilter(TraceService traceService) { (3)
this.traceService = traceService;
}
}
An Example HttpServerFilter
import io.micronaut.http.HttpRequest
import io.micronaut.http.MutableHttpResponse
import io.micronaut.http.annotation.Filter
import io.micronaut.http.filter.HttpServerFilter
import io.micronaut.http.filter.ServerFilterChain
import org.reactivestreams.Publisher
@Filter("/hello/**") (1)
class TraceFilter implements HttpServerFilter { (2)
private final TraceService traceService
TraceFilter(TraceService traceService) { (3)
this.traceService = traceService
}
}
An Example HttpServerFilter
import io.micronaut.http.HttpRequest
import io.micronaut.http.MutableHttpResponse
import io.micronaut.http.annotation.Filter
import io.micronaut.http.filter.HttpServerFilter
import io.micronaut.http.filter.ServerFilterChain
import org.reactivestreams.Publisher
@Filter("/hello/**") (1)
class TraceFilter((2)
private val traceService: TraceService)(3)
: HttpServerFilter {
}
1 | The Filter annotation is used to define the URI patterns the filter matches |
2 | The class implements the HttpServerFilter interface |
3 | The previously defined TraceService is injected via a constructor argument |
The final step is write the doFilter
implementation of the HttpServerFilter interface.
The doFilter implementation
@Override
public Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request, ServerFilterChain chain) {
return traceService.trace(request) (1)
.switchMap(aBoolean -> chain.proceed(request)) (2)
.doOnNext(res -> (3)
res.getHeaders().add("X-Trace-Enabled", "true")
);
}
The doFilter implementation
@Override
Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request, ServerFilterChain chain) {
traceService.trace(request) (1)
.switchMap({ aBoolean -> chain.proceed(request) }) (2)
.doOnNext({ res -> (3)
res.getHeaders().add("X-Trace-Enabled", "true")
})
}
The doFilter implementation
override fun doFilter(request: HttpRequest<*>, chain: ServerFilterChain): Publisher<MutableHttpResponse<*>> {
return traceService.trace(request) (1)
.switchMap { aBoolean -> chain.proceed(request) } (2)
.doOnNext { res ->
(3)
res.headers.add("X-Trace-Enabled", "true")
}
}
1 | The previously defined TraceService is called to trace the request |
2 | If the trace call succeeds then the filter switches back to resuming the request processing using RxJava’s switchMap method, which invokes the proceed method of the ServerFilterChain |
3 | Finally, RxJava’s doOnNext method is used to add a header called X-Trace-Enabled to the response. |
The previous example demonstrates some key concepts such as executing logic in a non-blocking matter before proceeding with the request and modifying the outgoing response.
The examples use RxJava, however you can use any reactive framework that supports the Reactive streams specifications |