Utilities

The module jug.utils has a few functions which are meant to be used in writing jugfiles.

Identity

This is simply implemented as:

  1. @TaskGenerator
  2. def identity(x):
  3. return x

This might seem like the most pointless function, but it can be helpful in speeding things up. Consider the following case:

  1. from glob import glob
  2. def load(fname):
  3. return open(fname).readlines()
  4. @TaskGenerator
  5. def process(inputs, parameter):
  6. ...
  7. inputs = []
  8. for f in glob('*.data'):
  9. inputs.extend(load(f))
  10. # inputs is a large list
  11. results = {}
  12. for p in range(1000):
  13. results[p] = process(inputs, p)

How is this processed? Every time process is called, a new jug.Task is generated. This task has two arguments: inputs and an integer. When the hash of the task is computed, both its arguments are analysed. inputs is a large list of strings. Therefore, it is going to take a very long time to process all of the hashes.

Consider the variation:

  1. from jug.utils import identity
  2. # ...
  3. # same as above
  4. inputs = identity(inputs)
  5. results = {}
  6. for p in range(1000):
  7. results[p] = process(inputs, p)

Now, the long list is only hashed once! It is transformed into a Task (we reuse the name inputs to keep things clear) and each process call can now compute its hash very fast.

Using identity to induce dependencies

identity can also be used to introduce dependencies. One can define a helper function:

  1. def value_after(val, token):
  2. from jug.utils import identity
  3. return identity( [val, token] )[0]

Now, this function, will always return its first argument, but will only run once its second argument is available. Here is a typical use case:

  1. Function process takes an output file name
  2. Function postprocess takes as input the output filename of process

Now, you want to run process and then postprocess, but since communication is done with files, Jug does not see that these functions depend on each other. value_after is the solution:

  1. token = process(input, ofile='output.txt')
  2. postprocess(value_after('output.txt', token))

This works independently of whatever process returns (even if it is None).

jug_execute

This is a simple wrapper around subprocess.call(). It adds two important pieces of functionality:

  1. it checks the exit code and raises an exception if not zero (this can be disabled by passing check_exit=False).

  2. It takes an argument called run_after which is ignored but can be used to declare dependencies between tasks. Thus, it can be used to ensure that a specific process only runs after something else has run:

    1. from jug.utils import jug_execute
    2. from jug import TaskGenerator
    3. @TaskGenerator
    4. def my_computation(input, ouput_filename):
    5. ...
    6. token = my_computation(input, 'output.txt')
    7. # We want to run gzip, but **only after** `my_computation` has run:
    8. jug_execute(['gzip', 'output.txt'], run_after=token)

cached_glob

cached_glob is a simple utility to perform the following common operation:

  1. from glob import glob
  2. from jug import CachedFunction
  3. files = CachedFunction(glob, pattern)
  4. files.sort()

Where pattern is a glob pattern can be simply written as:

  1. from jug.utils import cached_glob
  2. files = cached_glob(pattern)

jug.utils.``timed_path(path)

Returns an object that returns path when passed to a jug Task with the exception that it uses the paths mtime (modification time) and the file size in the hash. Thus, if the file is touched or changes size, this triggers an invalidation of the results (which propagates to all dependent tasks).

Parameters:
  • ipath : str

    A filesystem path

  • Returns:
  • opath : str

    A task equivalent to (lambda: ipath).

  • jug.utils.``identity(x)

    identity implements the identity function as a Task (i.e., value(identity(x)) == x)

    This seems pointless, but if x is, for example, a very large list, then using this function might speed up some computations. Consider:

    1. large = list(range(100000))
    2. large = jug.utils.identity(large)
    3. for i in range(100):
    4. Task(process, large, i)

    This way the list large is going to get hashed just once. Without the call to jug.utils.identity, it would get hashed at each loop iteration.

    https://jug.readthedocs.io/en/latest/utilities.html#identity

    Parameters:
  • x : any object
  • Returns:
  • x : x
  • class jug.utils.``CustomHash(obj, hash_function)

    Set a custom hash function

    This is an advanced feature and you can shoot yourself in the foot with it. Make sure you know what you are doing. In particular, hash_function should be a strong hash: hash_function(obj0) == hash_function(obj1) is taken to imply that obj0 == obj1. The hash function should return a bytes object.

    You can use the helpers in the jug.hash module (in particular hash_one) to help you. The implementation of timed_path is a good example of how to use a CustomHash:

    1. def hash_with_mtime_size(path):
    2. from .hash import hash_one
    3. st = os.stat_result(os.stat(path))
    4. mtime = st.st_mtime
    5. size = st.st_size
    6. return hash_one((path, mtime, size))
    7. def timed_path(path):
    8. return CustomHash(path, hash_with_mtime_size)

    The path object (a string or bytes) is wrapped with a hashing function which checks the file value.

    Parameters:
  • obj : any object
    hash_function : function

    This should take your object and return a str

  • jug.utils.``sync_move(src, dst)

    Sync the file and move it

    This ensures that the move is truly atomic

    Parameters:
  • src : filename

    Source file

    dst: filename

    Destination file

  • jug.utils.``cached_glob(pat)

    A short-hand for

    from jug import CachedFunction from glob import glob CachedFunction(glob, pattern)

    with the extra bonus that results are returns sorted

    Parameters:
  • pat: Same as glob.glob
  • Returns:
  • files : list of str