Reusing Code from Batching: transform()

As you may have noticed, while the functions you called on a DStream
are named the same as those you called on an RDD in the batch example,
they are not the same methods, and it may not be clear how to reuse the code from the batch examples. In
this section, we refactor the code from the batch examples and show how
to reuse it here.

DStreams have transform functions which allows you to call
any arbitrary RDD to RDD functions to RDD’s in the DStream. The
transform functions are perfect for reusing any RDD to RDD functions
that you may have written in batch code and want to port over to
streaming. Let’s look at some code to illustrate this point.

Let’s say we have separated out a function, responseCodeCount from our
batch example that can compute the response code count given the
apache access logs RDD:

  1. public static JavaPairRDD<Integer, Long> responseCodeCount(
  2. JavaRDD<ApacheAccessLog> accessLogRDD) {
  3. return accessLogRDD
  4. .mapToPair(s -> new Tuple2<>(s.getResponseCode(), 1L))
  5. .reduceByKey(SUM_REDUCER);
  6. }

The responseCodeCountDStream can be created by calling transformToPair with the responseCodeCount function to the accessLogDStream.
Then, you can finish up by calling updateStateByKey to keep a running count of the response codes for all of time,
and use forEachRDD to print the values out:

  1. // Compute Response Code to Count.
  2. // Notice the user transformToPair to produce the a DStream of
  3. // response code counts, and then updateStateByKey to accumulate
  4. // the response code counts for all of time.
  5. JavaPairDStream<Integer, Long> responseCodeCountDStream = accessLogDStream
  6. .transformToPair(LogAnalyzerStreamingTotalRefactored::responseCodeCount);
  7. JavaPairDStream<Integer, Long> cumulativeResponseCodeCountDStream =
  8. responseCodeCountDStream.updateStateByKey(COMPUTE_RUNNING_SUM);
  9. cumulativeResponseCodeCountDStream.foreachRDD(rdd -> {
  10. System.out.println("Response code counts: " + rdd.take(100));
  11. });

It is possible to combine transform functions before and after an
updateStateByKey as well:

  1. // A DStream of ipAddresses accessed > 10 times.
  2. JavaDStream<String> ipAddressesDStream = accessLogDStream
  3. .transformToPair(LogAnalyzerStreamingTotalRefactored::ipAddressCount)
  4. .updateStateByKey(COMPUTE_RUNNING_SUM)
  5. .transform(LogAnalyzerStreamingTotalRefactored::filterIPAddress);
  6. ipAddressesDStream.foreachRDD(rdd -> {
  7. List<String> ipAddresses = rdd.take(100);
  8. System.out.println("All IPAddresses > 10 times: " + ipAddresses);
  9. });

Take a closer look at LogAnalyzerStreamingTotalRefactored.java
now to see how that code has been refactored to reuse code from the batch example.