Service Task
The Service Task extracts content from a http service.
Example
The following shows a simple example using this task as part of a workflow.
from txtai.workflow import ServiceTask, Workflow
workflow = Workflow([ServiceTask(url="https://service.url/action)])
workflow(["parameter"])
Configuration-driven example
This task can also be created with workflow configuration.
workflow:
tasks:
- task: service
url: https://service.url/action
Methods
Python documentation for the task.
__init__(self, action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs)
special
Source code in txtai/workflow/task/base.py
def __init__(
self,
action=None,
select=None,
unpack=True,
column=None,
merge="hstack",
initialize=None,
finalize=None,
concurrency=None,
onetomany=True,
**kwargs,
):
"""
Creates a new task. A task defines two methods, type of data it accepts and the action to execute
for each data element. Action is a callable function or list of callable functions.
Args:
action: action(s) to execute on each data element
select: filter(s) used to select data to process
unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
column: column index to select if element is a tuple, defaults to all
merge: merge mode for joining multi-action outputs, defaults to hstack
initialize: action to execute before processing
finalize: action to execute after processing
concurrency: sets concurrency method when execute instance available
valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
onetomany: if one-to-many data transformations should be enabled, defaults to True
kwargs: additional keyword arguments
"""
# Standardize into list of actions
if not action:
action = []
elif not isinstance(action, list):
action = [action]
self.action = action
self.select = select
self.unpack = unpack
self.column = column
self.merge = merge
self.initialize = initialize
self.finalize = finalize
self.concurrency = concurrency
self.onetomany = onetomany
# Check for custom registration. Adds additional instance members and validates required dependencies available.
if hasattr(self, "register"):
self.register(**kwargs)
elif kwargs:
# Raise error if additional keyword arguments passed in without register method
kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")
register(self, url=None, method=None, params=None, batch=True, extract=None)
Adds service parameters to task. Checks if required dependencies are installed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url | url to connect to | None | |
method | http method, GET or POST | None | |
params | default query parameters | None | |
batch | if True, all elements are passed in a single batch request, otherwise a service call is executed per element | True | |
extract | list of sections to extract from response | None |
Source code in txtai/workflow/task/service.py
def register(self, url=None, method=None, params=None, batch=True, extract=None):
"""
Adds service parameters to task. Checks if required dependencies are installed.
Args:
url: url to connect to
method: http method, GET or POST
params: default query parameters
batch: if True, all elements are passed in a single batch request, otherwise a service call is executed per element
extract: list of sections to extract from response
"""
if not XML_TO_DICT:
raise ImportError('ServiceTask is not available - install "workflow" extra to enable')
# pylint: disable=W0201
# Save URL, method and parameter defaults
self.url = url
self.method = method
self.params = params
# If True, all elements are passed in a single batch request, otherwise a service call is executed per element
self.batch = batch
# Save sections to extract. Supports both a single string and a hierarchical list of sections.
self.extract = extract
if self.extract:
self.extract = [self.extract] if isinstance(self.extract, str) else self.extract