Context Propagation in Quarkus

Traditional blocking code uses ThreadLocal variables to store contextual objects in order to avoid passing them as parameters everywhere. Many Quarkus extensions require those contextual objects to operate properly: RESTEasy, ArC and Transaction for example.

If you write reactive/async code, you have to cut your work into a pipeline of code blocks that get executed “later”, and in practice after the method you defined them in have returned. As such, try/finally blocks as well as ThreadLocal variables stop working, because your reactive code gets executed in another thread, after the caller ran its finally block.

MicroProfile Context Propagation was made to make those Quarkus extensions work properly in reactive/async settings. It works by capturing those contextual values that used to be in thread-locals, and restoring them when your code is called.

Setting it up

If you are using Mutiny (the quarkus-mutiny extension), you just need to add the the quarkus-smallrye-context-propagation extension to enable context propagation.

In other words, add the following dependencies to your pom.xml:

  1. <dependencies>
  2. <!-- Mutiny and RestEasy support extensions if not already included -->
  3. <dependency>
  4. <groupId>io.quarkus</groupId>
  5. <artifactId>quarkus-mutiny</artifactId>
  6. </dependency>
  7. <dependency>
  8. <groupId>io.quarkus</groupId>
  9. <artifactId>quarkus-resteasy-mutiny</artifactId>
  10. </dependency>
  11. <!-- Context Propagation extension -->
  12. <dependency>
  13. <groupId>io.quarkus</groupId>
  14. <artifactId>quarkus-smallrye-context-propagation</artifactId>
  15. </dependency>
  16. </dependencies>

With this, you will get context propagation for ArC, RESTEasy and transactions, if you are using them.

Usage example with Mutiny

Mutiny

This section uses Mutiny reactive types, if you’re not familiar with them, read the Getting Started with Reactive guide first.

Let’s write a REST endpoint that reads the next 3 items from a Kafka topic, stores them in a database using Hibernate ORM with Panache (all in the same transaction) before returning them to the client, you can do it like this:

  1. // Get the prices stream
  2. @Inject
  3. @Channel("prices") Publisher<Double> prices;
  4. @Transactional
  5. @GET
  6. @Path("/prices")
  7. @Produces(MediaType.SERVER_SENT_EVENTS)
  8. @SseElementType(MediaType.TEXT_PLAIN)
  9. public Publisher<Double> prices() {
  10. // get the next three prices from the price stream
  11. return Multi.createFrom().publisher(prices)
  12. .transform().byTakingFirstItems(3)
  13. .map(price -> {
  14. // store each price before we send them
  15. Price priceEntity = new Price();
  16. priceEntity.value = price;
  17. // here we are all in the same transaction
  18. // thanks to context propagation
  19. priceEntity.persist();
  20. return price;
  21. // the transaction is committed once the stream completes
  22. });
  23. }

Notice that thanks to Mutiny support for context propagation, this works out of the box. The 3 items are persisted using the same transaction and this transaction is committed when the stream completes.

Usage example for CompletionStage

If you are using CompletionStage you need manual context propagation. You can do that by injecting a ThreadContext or ManagedExecutor that will propagate every context. For example, here we use the Vert.x Web Client to get the list of Star Wars people, then store them in the database using Hibernate ORM with Panache (all in the same transaction) before returning them to the client as JSON using JSON-B or Jackson:

  1. @Inject ThreadContext threadContext;
  2. @Inject ManagedExecutor managedExecutor;
  3. @Inject Vertx vertx;
  4. @Transactional
  5. @GET
  6. @Path("/people")
  7. @Produces(MediaType.APPLICATION_JSON)
  8. public CompletionStage<List<Person>> people() throws SystemException {
  9. // Create a REST client to the Star Wars API
  10. WebClient client = WebClient.create(vertx,
  11. new WebClientOptions()
  12. .setDefaultHost("swapi.dev")
  13. .setDefaultPort(443)
  14. .setSsl(true));
  15. // get the list of Star Wars people, with context capture
  16. return threadContext.withContextCapture(client.get("/api/people/").send())
  17. .thenApplyAsync(response -> {
  18. JsonObject json = response.bodyAsJsonObject();
  19. List<Person> persons = new ArrayList<>(json.getInteger("count"));
  20. // Store them in the DB
  21. // Note that we're still in the same transaction as the outer method
  22. for (Object element : json.getJsonArray("results")) {
  23. Person person = new Person();
  24. person.name = ((JsonObject) element).getString("name");
  25. person.persist();
  26. persons.add(person);
  27. }
  28. return persons;
  29. }, managedExecutor);
  30. }

Using ThreadContext or ManagedExecutor you can wrap most useful functional types and CompletionStage in order to get context propagated.

The injected ManagedExecutor uses the Quarkus thread pool.

Adding support for RxJava2

You need to include the following modules to get RxJava2 support:

  1. <dependencies>
  2. <!-- Automatic context propagation for RxJava2 -->
  3. <dependency>
  4. <groupId>io.smallrye</groupId>
  5. <artifactId>smallrye-context-propagation-propagators-rxjava2</artifactId>
  6. </dependency>
  7. <!--
  8. Required if you want transactions extended to the end of methods returning
  9. an RxJava2 type.
  10. -->
  11. <dependency>
  12. <groupId>io.smallrye.reactive</groupId>
  13. <artifactId>smallrye-reactive-converter-rxjava2</artifactId>
  14. </dependency>
  15. <!-- Required if you return RxJava2 types from your REST endpoints -->
  16. <dependency>
  17. <groupId>org.jboss.resteasy</groupId>
  18. <artifactId>resteasy-rxjava2</artifactId>
  19. </dependency>
  20. </dependencies>