Tasks
Workflows execute tasks. Tasks are callable objects with a number of parameters to control the processing of data at a given step. While similar to pipelines, tasks encapsulate processing and don’t perform signficant transformations on their own. Tasks perform logic to prepare content for the underlying action(s).
A simple task is shown below.
Task(lambda x: [y * 2 for y in x])
The task above executes the function above for all input elements.
Tasks work well with pipelines, since pipelines are callable objects. The example below will summarize each input element.
summary = Summary()
Task(summary)
Tasks can operate independently but work best with workflows, as workflows add large-scale stream processing.
summary = Summary()
task = Task(summary)
task(["Very long text here"])
workflow = Workflow([task])
list(workflow(["Very long text here"]))
Tasks can also be created with configuration as part of a workflow.
workflow:
tasks:
- action: summary
Creates a new task. A task defines two methods, type of data it accepts and the action to execute for each data element. Action is a callable function or list of callable functions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action | action(s) to execute on each data element | None | |
select | filter(s) used to select data to process | None | |
unpack | if data elements should be unpacked or unwrapped from (id, data, tag) tuples | True | |
column | column index to select if element is a tuple, defaults to all | None | |
merge | merge mode for joining multi-action outputs, defaults to hstack | ‘hstack’ | |
initialize | action to execute before processing | None | |
finalize | action to execute after processing | None | |
concurrency | sets concurrency method when execute instance available valid values: “thread” for thread-based concurrency, “process” for process-based concurrency | None | |
onetomany | if one-to-many data transformations should be enabled, defaults to True | True | |
kwargs | additional keyword arguments | {} |
Source code in txtai/workflow/task/base.py
def __init__(
self,
action=None,
select=None,
unpack=True,
column=None,
merge="hstack",
initialize=None,
finalize=None,
concurrency=None,
onetomany=True,
**kwargs,
):
"""
Creates a new task. A task defines two methods, type of data it accepts and the action to execute
for each data element. Action is a callable function or list of callable functions.
Args:
action: action(s) to execute on each data element
select: filter(s) used to select data to process
unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
column: column index to select if element is a tuple, defaults to all
merge: merge mode for joining multi-action outputs, defaults to hstack
initialize: action to execute before processing
finalize: action to execute after processing
concurrency: sets concurrency method when execute instance available
valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
onetomany: if one-to-many data transformations should be enabled, defaults to True
kwargs: additional keyword arguments
"""
# Standardize into list of actions
if not action:
action = []
elif not isinstance(action, list):
action = [action]
self.action = action
self.select = select
self.unpack = unpack
self.column = column
self.merge = merge
self.initialize = initialize
self.finalize = finalize
self.concurrency = concurrency
self.onetomany = onetomany
# Check for custom registration. Adds additional instance members and validates required dependencies available.
if hasattr(self, "register"):
self.register(**kwargs)
elif kwargs:
# Raise error if additional keyword arguments passed in without register method
kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")
Multi-action task concurrency
The default processing mode is to run actions sequentially. Multiprocessing support is already built in at a number of levels. Any of the GPU models will maximize GPU utilization for example and even in CPU mode, concurrency is utilized. But there are still use cases for task action concurrency. For example, if the system has multiple GPUs, the task runs external sequential code, or the task has a large number of I/O tasks.
In addition to sequential processing, multi-action tasks can run either multithreaded or with multiple processes. The advantages of each approach are discussed below.
multithreading - no overhead of creating separate processes or pickling data. But Python can only execute a single thread due the GIL, so this approach won’t help with CPU bound actions. This method works well with I/O bound actions and GPU actions.
multiprocessing - separate subprocesses are created and data is exchanged via pickling. This method can fully utilize all CPU cores since each process runs independently. This method works well with CPU bound actions.
More information on multiprocessing can be found in the Python documentation.
Multi-action task merges
Multi-action tasks will generate parallel outputs for the input data. The task output can be merged together in a couple different ways.
Merges outputs column-wise. Returns a list of tuples which will be interpreted as a one to one transformation.
Column-wise merge example (2 actions)
Inputs: [a, b, c]
Outputs => [[a1, b1, c1], [a2, b2, c2]]
Column Merge => [(a1, a2), (b1, b2), (c1, c2)]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs | task outputs | required |
Returns:
Type | Description |
---|---|
list of aggregated/zipped outputs as tuples (column-wise) |
Source code in txtai/workflow/task/base.py
def hstack(self, outputs):
"""
Merges outputs column-wise. Returns a list of tuples which will be interpreted as a one to one transformation.
Column-wise merge example (2 actions)
Inputs: [a, b, c]
Outputs => [[a1, b1, c1], [a2, b2, c2]]
Column Merge => [(a1, a2), (b1, b2), (c1, c2)]
Args:
outputs: task outputs
Returns:
list of aggregated/zipped outputs as tuples (column-wise)
"""
# If all outputs are numpy arrays, use native method
if all(isinstance(output, np.ndarray) for output in outputs):
return np.stack(outputs, axis=1)
# If all outputs are torch tensors, use native method
# pylint: disable=E1101
if all(torch.is_tensor(output) for output in outputs):
return torch.stack(outputs, axis=1)
return list(zip(*outputs))
Merges outputs row-wise. Returns a list of lists which will be interpreted as a one to many transformation.
Row-wise merge example (2 actions)
Inputs: [a, b, c]
Outputs => [[a1, b1, c1], [a2, b2, c2]]
Row Merge => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs | task outputs | required |
Returns:
Type | Description |
---|---|
list of aggregated/zipped outputs as one to many transforms (row-wise) |
Source code in txtai/workflow/task/base.py
def vstack(self, outputs):
"""
Merges outputs row-wise. Returns a list of lists which will be interpreted as a one to many transformation.
Row-wise merge example (2 actions)
Inputs: [a, b, c]
Outputs => [[a1, b1, c1], [a2, b2, c2]]
Row Merge => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]
Args:
outputs: task outputs
Returns:
list of aggregated/zipped outputs as one to many transforms (row-wise)
"""
# If all outputs are numpy arrays, use native method
if all(isinstance(output, np.ndarray) for output in outputs):
return np.concatenate(np.stack(outputs, axis=1))
# If all outputs are torch tensors, use native method
# pylint: disable=E1101
if all(torch.is_tensor(output) for output in outputs):
return torch.cat(tuple(torch.stack(outputs, axis=1)))
# Flatten into lists of outputs per input row. Wrap as one to many transformation.
merge = []
for x in zip(*outputs):
combine = []
for y in x:
if isinstance(y, list):
combine.extend(y)
else:
combine.append(y)
merge.append(OneToMany(combine))
return merge
Merges outputs column-wise and concats values together into a string. Returns a list of strings.
Concat merge example (2 actions)
Inputs: [a, b, c]
Outputs => [[a1, b1, c1], [a2, b2, c2]]
Concat Merge => [(a1, a2), (b1, b2), (c1, c2)] => [“a1. a2”, “b1. b2”, “c1. c2”]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs | task outputs | required |
Returns:
Type | Description |
---|---|
list of concat outputs |
Source code in txtai/workflow/task/base.py
def concat(self, outputs):
"""
Merges outputs column-wise and concats values together into a string. Returns a list of strings.
Concat merge example (2 actions)
Inputs: [a, b, c]
Outputs => [[a1, b1, c1], [a2, b2, c2]]
Concat Merge => [(a1, a2), (b1, b2), (c1, c2)] => ["a1. a2", "b1. b2", "c1. c2"]
Args:
outputs: task outputs
Returns:
list of concat outputs
"""
return [". ".join([str(y) for y in x if y]) for x in self.hstack(outputs)]
Extract task output columns
With column-wise merging, each output row will be a tuple of output values for each task action. This can be fed as input to a downstream task and that task can have separate tasks work with each element.
A simple example:
workflow = Workflow([Task(lambda x: [y * 3 for y in x], unpack=False, column=0)])
list(workflow([(2, 8)]))
For the example input tuple of (2, 2), the workflow will only select the first element (2) and run the task against that element.
workflow = Workflow([Task([lambda x: [y * 3 for y in x],
lambda x: [y - 1 for y in x]],
unpack=False, column={0:0, 1:1})])
list(workflow([(2, 8)]))
The example above applies a separate action to each input column. This simple construct can help build extremely powerful workflow graphs!