The database verticle

Connecting to a database using JDBC requires of course a driver and configuration, which we had hard-coded in the first iteration.

Configurable SQL queries

While the verticle will turn the previously hard-coded values to configuration parameters, we will also go a step further by loading the SQL queries from a properties file.

The queries will be loaded from a file passed as a configuration parameter or from a default resource if none is being provided. The advantage of this approach is that the verticle can adapt both to different JDBC drivers and SQL dialects.

The verticle class preamble consists mainly of configuration key definitions:

  1. public class WikiDatabaseVerticle extends AbstractVerticle {
  2. public static final String CONFIG_WIKIDB_JDBC_URL = "wikidb.jdbc.url";
  3. public static final String CONFIG_WIKIDB_JDBC_DRIVER_CLASS = "wikidb.jdbc.driver_class";
  4. public static final String CONFIG_WIKIDB_JDBC_MAX_POOL_SIZE = "wikidb.jdbc.max_pool_size";
  5. public static final String CONFIG_WIKIDB_SQL_QUERIES_RESOURCE_FILE = "wikidb.sqlqueries.resource.file";
  6. public static final String CONFIG_WIKIDB_QUEUE = "wikidb.queue";
  7. private static final Logger LOGGER = LoggerFactory.getLogger(WikiDatabaseVerticle.class);
  8. // (...)

SQL queries are being stored in a properties file, with the default ones for HSQLDB being located in src/main/resources/db-queries.properties:

  1. create-pages-table=create table if not exists Pages (Id integer identity primary key, Name varchar(255) unique, Content clob)
  2. get-page=select Id, Content from Pages where Name = ?
  3. create-page=insert into Pages values (NULL, ?, ?)
  4. save-page=update Pages set Content = ? where Id = ?
  5. all-pages=select Name from Pages
  6. delete-page=delete from Pages where Id = ?

The following code from the WikiDatabaseVerticle class loads the SQL queries from a file, and make them available from a map:

  1. private enum SqlQuery {
  2. CREATE_PAGES_TABLE,
  3. ALL_PAGES,
  4. GET_PAGE,
  5. CREATE_PAGE,
  6. SAVE_PAGE,
  7. DELETE_PAGE
  8. }
  9. private final HashMap<SqlQuery, String> sqlQueries = new HashMap<>();
  10. private void loadSqlQueries() throws IOException {
  11. String queriesFile = config().getString(CONFIG_WIKIDB_SQL_QUERIES_RESOURCE_FILE);
  12. InputStream queriesInputStream;
  13. if (queriesFile != null) {
  14. queriesInputStream = new FileInputStream(queriesFile);
  15. } else {
  16. queriesInputStream = getClass().getResourceAsStream("/db-queries.properties");
  17. }
  18. Properties queriesProps = new Properties();
  19. queriesProps.load(queriesInputStream);
  20. queriesInputStream.close();
  21. sqlQueries.put(SqlQuery.CREATE_PAGES_TABLE, queriesProps.getProperty("create-pages-table"));
  22. sqlQueries.put(SqlQuery.ALL_PAGES, queriesProps.getProperty("all-pages"));
  23. sqlQueries.put(SqlQuery.GET_PAGE, queriesProps.getProperty("get-page"));
  24. sqlQueries.put(SqlQuery.CREATE_PAGE, queriesProps.getProperty("create-page"));
  25. sqlQueries.put(SqlQuery.SAVE_PAGE, queriesProps.getProperty("save-page"));
  26. sqlQueries.put(SqlQuery.DELETE_PAGE, queriesProps.getProperty("delete-page"));
  27. }

We use the SqlQuery enumeration type to avoid string constants later in the code. The code of the verticle start method is the following:

  1. private JDBCClient dbClient;
  2. @Override
  3. public void start(Promise<Void> promise) throws Exception {
  4. /*
  5. * Note: this uses blocking APIs, but data is small...
  6. */
  7. loadSqlQueries(); (1)
  8. dbClient = JDBCClient.createShared(vertx, new JsonObject()
  9. .put("url", config().getString(CONFIG_WIKIDB_JDBC_URL, "jdbc:hsqldb:file:db/wiki"))
  10. .put("driver_class", config().getString(CONFIG_WIKIDB_JDBC_DRIVER_CLASS, "org.hsqldb.jdbcDriver"))
  11. .put("max_pool_size", config().getInteger(CONFIG_WIKIDB_JDBC_MAX_POOL_SIZE, 30)));
  12. dbClient.getConnection(ar -> {
  13. if (ar.failed()) {
  14. LOGGER.error("Could not open a database connection", ar.cause());
  15. promise.fail(ar.cause());
  16. } else {
  17. SQLConnection connection = ar.result();
  18. connection.execute(sqlQueries.get(SqlQuery.CREATE_PAGES_TABLE), create -> { (2)
  19. connection.close();
  20. if (create.failed()) {
  21. LOGGER.error("Database preparation error", create.cause());
  22. promise.fail(create.cause());
  23. } else {
  24. vertx.eventBus().consumer(config().getString(CONFIG_WIKIDB_QUEUE, "wikidb.queue"), this::onMessage); (3)
  25. promise.complete();
  26. }
  27. });
  28. }
  29. });
  30. }
  1. Interestingly we break an important principle in Vert.x which is to avoid blocking APIs, but since there are no asynchronous APIs for accessing resources on the classpath our options are limited. We could use the Vert.x executeBlocking method to offload the blocking I/O operations from the event loop to a worker thread, but since the data is very small there is no obvious benefit in doing so.

  2. Here is an example of using SQL queries.

  3. The consumer method registers an event bus destination handler.

Dispatching requests

The event bus message handler is the onMessage method:

  1. public enum ErrorCodes {
  2. NO_ACTION_SPECIFIED,
  3. BAD_ACTION,
  4. DB_ERROR
  5. }
  6. public void onMessage(Message<JsonObject> message) {
  7. if (!message.headers().contains("action")) {
  8. LOGGER.error("No action header specified for message with headers {} and body {}",
  9. message.headers(), message.body().encodePrettily());
  10. message.fail(ErrorCodes.NO_ACTION_SPECIFIED.ordinal(), "No action header specified");
  11. return;
  12. }
  13. String action = message.headers().get("action");
  14. switch (action) {
  15. case "all-pages":
  16. fetchAllPages(message);
  17. break;
  18. case "get-page":
  19. fetchPage(message);
  20. break;
  21. case "create-page":
  22. createPage(message);
  23. break;
  24. case "save-page":
  25. savePage(message);
  26. break;
  27. case "delete-page":
  28. deletePage(message);
  29. break;
  30. default:
  31. message.fail(ErrorCodes.BAD_ACTION.ordinal(), "Bad action: " + action);
  32. }
  33. }

We defined a ErrorCodes enumeration for errors, which we use to report back to the message sender. To do so, the fail method of the Message class provides a convenient shortcut to reply with an error, and the original message sender gets a failed AsyncResult.

Reducing the JDBC client boilerplate

So far we have seen the complete interaction to perform a SQL query:

  1. retrieve a connection,

  2. perform requests,

  3. release the connection.

This leads to code where lots of error processing needs to happen for each asynchronous operation, as in:

  1. dbClient.getConnection(car -> {
  2. if (car.succeeded()) {
  3. SQLConnection connection = car.result();
  4. connection.query(sqlQueries.get(SqlQuery.ALL_PAGES), res -> {
  5. connection.close();
  6. if (res.succeeded()) {
  7. List<String> pages = res.result()
  8. .getResults()
  9. .stream()
  10. .map(json -> json.getString(0))
  11. .sorted()
  12. .collect(Collectors.toList());
  13. message.reply(new JsonObject().put("pages", new JsonArray(pages)));
  14. } else {
  15. reportQueryError(message, res.cause());
  16. }
  17. });
  18. } else {
  19. reportQueryError(message, car.cause());
  20. }
  21. });

Starting from Vert.x 3.5.0, the JDBC client now supports one-shot operations where a connection is being acquired to do a SQL operation, then released internally. The same code as above now reduces to:

  1. dbClient.query(sqlQueries.get(SqlQuery.ALL_PAGES), res -> {
  2. if (res.succeeded()) {
  3. List<String> pages = res.result()
  4. .getResults()
  5. .stream()
  6. .map(json -> json.getString(0))
  7. .sorted()
  8. .collect(Collectors.toList());
  9. message.reply(new JsonObject().put("pages", new JsonArray(pages)));
  10. } else {
  11. reportQueryError(message, res.cause());
  12. }
  13. });

This is very useful for cases where the connection is being acquired for a single operation. Performance-wise it is important to note that re-using a connection for chained SQL operations is better.

The rest of the class consists of private methods called when onMessage dispatches incoming messages:

  1. private void fetchAllPages(Message<JsonObject> message) {
  2. dbClient.query(sqlQueries.get(SqlQuery.ALL_PAGES), res -> {
  3. if (res.succeeded()) {
  4. List<String> pages = res.result()
  5. .getResults()
  6. .stream()
  7. .map(json -> json.getString(0))
  8. .sorted()
  9. .collect(Collectors.toList());
  10. message.reply(new JsonObject().put("pages", new JsonArray(pages)));
  11. } else {
  12. reportQueryError(message, res.cause());
  13. }
  14. });
  15. }
  16. private void fetchPage(Message<JsonObject> message) {
  17. String requestedPage = message.body().getString("page");
  18. JsonArray params = new JsonArray().add(requestedPage);
  19. dbClient.queryWithParams(sqlQueries.get(SqlQuery.GET_PAGE), params, fetch -> {
  20. if (fetch.succeeded()) {
  21. JsonObject response = new JsonObject();
  22. ResultSet resultSet = fetch.result();
  23. if (resultSet.getNumRows() == 0) {
  24. response.put("found", false);
  25. } else {
  26. response.put("found", true);
  27. JsonArray row = resultSet.getResults().get(0);
  28. response.put("id", row.getInteger(0));
  29. response.put("rawContent", row.getString(1));
  30. }
  31. message.reply(response);
  32. } else {
  33. reportQueryError(message, fetch.cause());
  34. }
  35. });
  36. }
  37. private void createPage(Message<JsonObject> message) {
  38. JsonObject request = message.body();
  39. JsonArray data = new JsonArray()
  40. .add(request.getString("title"))
  41. .add(request.getString("markdown"));
  42. dbClient.updateWithParams(sqlQueries.get(SqlQuery.CREATE_PAGE), data, res -> {
  43. if (res.succeeded()) {
  44. message.reply("ok");
  45. } else {
  46. reportQueryError(message, res.cause());
  47. }
  48. });
  49. }
  50. private void savePage(Message<JsonObject> message) {
  51. JsonObject request = message.body();
  52. JsonArray data = new JsonArray()
  53. .add(request.getString("markdown"))
  54. .add(request.getString("id"));
  55. dbClient.updateWithParams(sqlQueries.get(SqlQuery.SAVE_PAGE), data, res -> {
  56. if (res.succeeded()) {
  57. message.reply("ok");
  58. } else {
  59. reportQueryError(message, res.cause());
  60. }
  61. });
  62. }
  63. private void deletePage(Message<JsonObject> message) {
  64. JsonArray data = new JsonArray().add(message.body().getString("id"));
  65. dbClient.updateWithParams(sqlQueries.get(SqlQuery.DELETE_PAGE), data, res -> {
  66. if (res.succeeded()) {
  67. message.reply("ok");
  68. } else {
  69. reportQueryError(message, res.cause());
  70. }
  71. });
  72. }
  73. private void reportQueryError(Message<JsonObject> message, Throwable cause) {
  74. LOGGER.error("Database query error", cause);
  75. message.fail(ErrorCodes.DB_ERROR.ordinal(), cause.getMessage());
  76. }