Data flows
RxJava is not only great at combining different sources of events, it is also very helpful with data flows. Unlike a Vert.x or JDK future, a Flowable
emits a stream of events, not just a single one. And it comes with an extensive set of data manipulation operators.
We can use a few of them to refactor the fetchAllPages
database verticle method:
public WikiDatabaseService fetchAllPages(Handler<AsyncResult<JsonArray>> resultHandler) {
dbClient.rxQuery(sqlQueries.get(SqlQuery.ALL_PAGES))
.flatMapPublisher(res -> { (1)
List<JsonArray> results = res.getResults();
return Flowable.fromIterable(results); (2)
})
.map(json -> json.getString(0)) (3)
.sorted() (4)
.collect(JsonArray::new, JsonArray::add) (5)
.subscribe(SingleHelper.toObserver(resultHandler));
return this;
}
With
flatMapPublisher
we will create aFlowable
from the item emitted by theSingle<Result>
.fromIterable
converts the database resultsIterable
into aFlowable
emitting the database row items.Since we only need the page name we can
map
eachJsonObject
row to the first column.The client expects the data to be
sorted
in alphabetical order.The event bus service reply consists in a single
JsonArray
.collect
creates a new one withJsonArray::new
and later adds items as they are emitted withJsonArray::add
.