Workflow
Workflows are a simple yet powerful construct that takes a callable and returns elements. Workflows operate well with pipelines but can work with any callable object. Workflows are streaming and work on data in batches, allowing large volumes of data to be processed efficiently.
Given that pipelines are callable objects, workflows enable efficient processing of pipeline data. Transformers models typically work with smaller batches of data, workflows are well suited to feed a series of transformers pipelines.
An example of the most basic workflow:
workflow = Workflow([Task(lambda x: [y * 2 for y in x])])
list(workflow([1, 2, 3]))
This example multiplies each input value by 2 and returns transformed elements via a generator.
Since workflows run as generators, output must be consumed for execution to occur. The following snippets show how output can be consumed.
# Small dataset where output fits in memory
list(workflow(elements))
# Large dataset
for output in workflow(elements):
function(output)
# Large dataset where output is discarded
for _ in workflow(elements):
pass
Workflows are run with Python or configuration. Examples of both methods are shown below.
Example
A full-featured example is shown below in Python. This workflow transcribes a set of audio files, translates the text into French and indexes the data.
from txtai.embeddings import Embeddings
from txtai.pipeline import Transcription, Translation
from txtai.workflow import FileTask, Task, Workflow
# Embeddings instance
embeddings = Embeddings({
"path": "sentence-transformers/paraphrase-MiniLM-L3-v2",
"content": True
})
# Transcription instance
transcribe = Transcription()
# Translation instance
translate = Translation()
tasks = [
FileTask(transcribe, r"\.wav$"),
Task(lambda x: translate(x, "fr"))
]
# List of files to process
data = [
"US_tops_5_million.wav",
"Canadas_last_fully.wav",
"Beijing_mobilises.wav",
"The_National_Park.wav",
"Maine_man_wins_1_mil.wav",
"Make_huge_profits.wav"
]
# Workflow that translate text to French
workflow = Workflow(tasks)
# Index data
embeddings.index((uid, text, None) for uid, text in enumerate(workflow(data)))
# Search
embeddings.search("wildlife", 1)
Configuration-driven example
Workflows can be defined using Python as shown above but they can also run with YAML configuration.
writable: true
embeddings:
path: sentence-transformers/paraphrase-MiniLM-L3-v2
content: true
# Transcribe audio to text
transcription:
# Translate text between languages
translation:
workflow:
index:
tasks:
- action: transcription
select: "\\.wav$"
task: file
- action: translation
args: ["fr"]
- action: index
# Create and run the workflow
from txtai.app import Application
# Create and run the workflow
app = Application("workflow.yml")
list(app.workflow("index", [
"US_tops_5_million.wav",
"Canadas_last_fully.wav",
"Beijing_mobilises.wav",
"The_National_Park.wav",
"Maine_man_wins_1_mil.wav",
"Make_huge_profits.wav"
]))
# Search
app.search("wildlife")
The code above executes a workflow defined in the file workflow.yml
. The API is used to run the workflow locally, there is minimal overhead running workflows in this manner. It’s a matter of preference.
See the following links for more information.
Methods
Workflows are callable objects. Workflows take an input of iterable data elements and output iterable data elements.
__init__(self, tasks, batch=100, workers=None, name=None, stream=None)
special
Creates a new workflow. Workflows are lists of tasks to execute.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks | list of workflow tasks | required | |
batch | how many items to process at a time, defaults to 100 | 100 | |
workers | number of concurrent workers | None | |
name | workflow name | None | |
stream | workflow stream processor | None |
Source code in txtai/workflow/base.py
def __init__(self, tasks, batch=100, workers=None, name=None, stream=None):
"""
Creates a new workflow. Workflows are lists of tasks to execute.
Args:
tasks: list of workflow tasks
batch: how many items to process at a time, defaults to 100
workers: number of concurrent workers
name: workflow name
stream: workflow stream processor
"""
self.tasks = tasks
self.batch = batch
self.workers = workers
self.name = name
self.stream = stream
# Set default number of executor workers to max number of actions in a task
self.workers = max(len(task.action) for task in self.tasks) if not self.workers else self.workers
__call__(self, elements)
special
Executes a workflow for input elements. This method returns a generator that yields transformed data elements.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
elements | iterable data elements | required |
Returns:
Type | Description |
---|---|
generator that yields transformed data elements |
Source code in txtai/workflow/base.py
def __call__(self, elements):
"""
Executes a workflow for input elements. This method returns a generator that yields transformed
data elements.
Args:
elements: iterable data elements
Returns:
generator that yields transformed data elements
"""
# Create execute instance for this run
with Execute(self.workers) as executor:
# Run task initializers
self.initialize()
# Process elements with stream processor, if available
elements = self.stream(elements) if self.stream else elements
# Process elements in batches
for batch in self.chunk(elements):
yield from self.process(batch, executor)
# Run task finalizers
self.finalize()
schedule(self, cron, elements, iterations=None)
Schedules a workflow using a cron expression and elements.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cron | cron expression | required | |
elements | iterable data elements passed to workflow each call | required | |
iterations | number of times to run workflow, defaults to run indefinitely | None |
Source code in txtai/workflow/base.py
def schedule(self, cron, elements, iterations=None):
"""
Schedules a workflow using a cron expression and elements.
Args:
cron: cron expression
elements: iterable data elements passed to workflow each call
iterations: number of times to run workflow, defaults to run indefinitely
"""
# Check that croniter is installed
if not CRONITER:
raise ImportError('Workflow scheduling is not available - install "workflow" extra to enable')
logger.info("'%s' scheduler started with schedule %s", self.name, cron)
maxiterations = iterations
while iterations is None or iterations > 0:
# Schedule using localtime
schedule = croniter(cron, datetime.now().astimezone()).get_next(datetime)
logger.info("'%s' next run scheduled for %s", self.name, schedule.isoformat())
time.sleep(schedule.timestamp() - time.time())
# Run workflow
# pylint: disable=W0703
try:
for _ in self(elements):
pass
except Exception:
logger.error(traceback.format_exc())
# Decrement iterations remaining, if necessary
if iterations is not None:
iterations -= 1
logger.info("'%s' max iterations (%d) reached", self.name, maxiterations)
More examples
See this link for a full list of workflow examples.