Map/Reduce

Mapping, reducing and all that Jazz

Jug is not a map/reduce framework. However, it is still useful to sometimes frame problems in that framework. And applying the same function to a large collection of elements (in a word, mapping) is exactly the absurdly parallel problem that Jug excels at.

Naïve Solution

Let’s say you want to double all numbers between 0 and 1023. You could do this:

  1. from jug import TaskGenerator
  2. @TaskGenerator
  3. def double(x):
  4. return 2*x
  5. numbers = range(1024)
  6. result = map(double, numbers)

This might work well for this problem. However, if instead of 1,024 numbers, you had 1 million and each computation was very fast, then this would actually be very inefficient: you are generating one task per computation. As a rule of thumb, you want your computations to last at least a few seconds, otherwise, the overhead of maintaining the infrastructure becomes too large.

Grouping computations

You can use jug.mapreduce.map to achieve a better result:

  1. from jug import mapreduce
  2. result = mapreduce.map(double, numbers, map_step=32)

The map_step argument defines how many calls to double will be performed in a single Task.

You can also include a reduce step:

  1. @TaskGenerator
  2. def add(a, b):
  3. return a + b
  4. final = mapreduce.map(add, double, numbers, map_step=32)

this is sort of equivalent to:

  1. final = reduce(add, map(double, numbers))

except that the order in which the reduction is done is not from left to right! In fact, this only works well if the reduction function is associative.

Curried mapping

The above is fine, but sometimes you need to pass multiple arguments to the function you want to loop over:

  1. @TaskGenerator
  2. def distance_to(x, ref):
  3. return abs(x - ref)
  4. ref = 34.34
  5. inputs = range(1024)
  6. result = [distance_to(x, ref) for x in inputs]

This works, but we are back to where we were: too many small Tasks!

currymap to the rescue:

  1. result = mapreduce.currymap(distance_to, [(x,ref) for x in inputs])

Arguably this function should have been called uncurrymap (as it is equivalent to the Haskell expression map . uncurry), but that just doesn’t sound right (I also like to think it’s the programming equivalent to the Currywurst, a culinary concept which almost makes me chuckle).

Example

The canonical example for map/reduce is counting words in files. Here, we will do the same with some very small files:

  1. inputs = [
  2. "banana apple apple banana",
  3. "apple pear football",
  4. "pear",
  5. "banana apple apple",
  6. "football banana",
  7. "apple pear",
  8. "waldorf salad",
  9. ]

The mapper function will output a dictionary of counts:

  1. def count1(str):
  2. from collections import defaultdict
  3. counts = defaultdict(int)
  4. for word in str.split():
  5. counts[word] += 1
  6. return counts

(We used the very useful collections.defaultdict).

While the reducer adds two dictionaries together:

  1. def merge_dicts(rhs, lhs):
  2. # Note that we SHOULDN'T modify arguments, so we will create a copy
  3. rhs = rhs.copy()
  4. for k,v in lhs.iteritems():
  5. rhs[k] += v
  6. return rhs

We can now use jug.mapreduce.mapreduce to put these together:

  1. final_counts = jug.mapreduce.mapreduce(
  2. merge_dicts,
  3. count1,
  4. inputs,
  5. map_step=1)

Running jug status shows up the structure of our problem:

  1. Task name Waiting Ready Finished Running
  2. ----------------------------------------------------------------------------------------
  3. jug.mapreduce._jug_map_reduce 0 6 0 0
  4. jug.mapreduce._jug_reduce 1 0 0 0
  5. ........................................................................................
  6. Total: 1 6 0 0

If we had more than just 6 “files”, the values in the table would be much larger. Let’s also assume that this is part of some much larger programme that computes counts and then does some further processing with them.

Once that task is done, we might not care anymore about the break up into 6 units. So, we can wrap the whole thing into a compound task:

  1. final_counts = CompoundTask(jug.mapreduce.mapreduce,
  2. merge_dicts,
  3. count1,
  4. inputs,
  5. map_step=1)

At first, this does not do much. The status is the same:

  1. Task name Waiting Ready Finished Running
  2. ----------------------------------------------------------------------------------------
  3. jug.compound.compound_task_execute 1 0 0 0
  4. jug.mapreduce._jug_map_reduce 0 6 0 0
  5. jug.mapreduce._jug_reduce 1 0 0 0
  6. ........................................................................................
  7. Total: 2 6 0 0

But if we execute the tasks and re-check the status:

  1. Task name Waiting Ready Finished Running
  2. ----------------------------------------------------------------------------------------
  3. jug.mapreduce.mapreduce 0 0 1 0
  4. ........................................................................................
  5. Total: 0 0 1 0

Now, jug status reports a single task (the mapreduce task) and it is Finished.

Compound tasks not only lower the cognitive load, but they also make operations such as jug status much faster.

Full example source code

We left out the imports above, but other than that, it is a fully functional example:

  1. import jug.mapreduce
  2. from jug.compound import CompoundTask
  3. inputs = [
  4. "banana apple apple banana",
  5. "apple pear football",
  6. "pear",
  7. "banana apple apple",
  8. "football banana",
  9. "apple pear",
  10. "waldorf salad",
  11. ]
  12. def count1(str):
  13. from collections import defaultdict
  14. counts = defaultdict(int)
  15. for word in str.split():
  16. counts[word] += 1
  17. return counts
  18. def merge_dicts(rhs, lhs):
  19. # Note that we SHOULDN'T modify arguments, so we will create a copy
  20. rhs = rhs.copy()
  21. for k,v in lhs.iteritems():
  22. rhs[k] += v
  23. return rhs
  24. #final_counts = jug.mapreduce.mapreduce(
  25. # merge_dicts,
  26. # count1,
  27. # inputs,
  28. # map_step=1)
  29. final_counts = CompoundTask(jug.mapreduce.mapreduce,
  30. merge_dicts,
  31. count1,
  32. inputs,
  33. map_step=1)