Data flows

RxJava is not only great at combining different sources of events, it is also very helpful with data flows. Unlike a Vert.x or JDK future, a Flowable emits a stream of events, not just a single one. And it comes with an extensive set of data manipulation operators.

We can use a few of them to refactor the fetchAllPages database verticle method:

  1. public WikiDatabaseService fetchAllPages(Handler<AsyncResult<JsonArray>> resultHandler) {
  2. dbClient.rxQuery(sqlQueries.get(SqlQuery.ALL_PAGES))
  3. .flatMapPublisher(res -> { (1)
  4. List<JsonArray> results = res.getResults();
  5. return Flowable.fromIterable(results); (2)
  6. })
  7. .map(json -> json.getString(0)) (3)
  8. .sorted() (4)
  9. .collect(JsonArray::new, JsonArray::add) (5)
  10. .subscribe(SingleHelper.toObserver(resultHandler));
  11. return this;
  12. }
  1. With flatMapPublisher we will create a Flowable from the item emitted by the Single<Result>.

  2. fromIterable converts the database results Iterable into a Flowable emitting the database row items.

  3. Since we only need the page name we can map each JsonObject row to the first column.

  4. The client expects the data to be sorted in alphabetical order.

  5. The event bus service reply consists in a single JsonArray. collect creates a new one with JsonArray::new and later adds items as they are emitted with JsonArray::add.