- Service Task
- Example
- Configuration-driven example
- Methods
__init__(action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs)
register(url=None, method=None, params=None, batch=True, extract=None)
Service Task
The Service Task extracts content from a http service.
Example
The following shows a simple example using this task as part of a workflow.
from txtai.workflow import ServiceTask, Workflow
workflow = Workflow([ServiceTask(url="https://service.url/action)])
workflow(["parameter"])
Configuration-driven example
This task can also be created with workflow configuration.
workflow:
tasks:
- task: service
url: https://service.url/action
Methods
Python documentation for the task.
__init__(action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs)
Creates a new task. A task defines two methods, type of data it accepts and the action to execute for each data element. Action is a callable function or list of callable functions.
Parameters:
Name | Type | Description | Default |
---|
action | | action(s) to execute on each data element | None |
select | | filter(s) used to select data to process | None |
unpack | | if data elements should be unpacked or unwrapped from (id, data, tag) tuples | True |
column | | column index to select if element is a tuple, defaults to all | None |
merge | | merge mode for joining multi-action outputs, defaults to hstack | ‘hstack’ |
initialize | | action to execute before processing | None |
finalize | | action to execute after processing | None |
concurrency | | sets concurrency method when execute instance available valid values: “thread” for thread-based concurrency, “process” for process-based concurrency | None |
onetomany | | if one-to-many data transformations should be enabled, defaults to True | True |
kwargs | | additional keyword arguments | {} |
Source code in txtai/workflow/task/base.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| def init( self, action=None, select=None, unpack=True, column=None, merge=”hstack”, initialize=None, finalize=None, concurrency=None, onetomany=True, kwargs, ): “”” Creates a new task. A task defines two methods, type of data it accepts and the action to execute for each data element. Action is a callable function or list of callable functions.
Args: action: action(s) to execute on each data element select: filter(s) used to select data to process unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples column: column index to select if element is a tuple, defaults to all merge: merge mode for joining multi-action outputs, defaults to hstack initialize: action to execute before processing finalize: action to execute after processing concurrency: sets concurrency method when execute instance available valid values: “thread” for thread-based concurrency, “process” for process-based concurrency onetomany: if one-to-many data transformations should be enabled, defaults to True kwargs: additional keyword arguments “””
# Standardize into list of actions if not action: action = [] elif not isinstance(action, list): action = [action]
self.action = action self.select = select self.unpack = unpack self.column = column self.merge = merge self.initialize = initialize self.finalize = finalize self.concurrency = concurrency self.onetomany = onetomany
# Check for custom registration. Adds additional instance members and validates required dependencies available. if hasattr(self, “register”): self.register(kwargs) elif kwargs: # Raise error if additional keyword arguments passed in without register method kwargs = “, “.join(f”‘{kw}’” for kw in kwargs) raise TypeError(f”init() got unexpected keyword arguments: {kwargs}”)
|
Adds service parameters to task. Checks if required dependencies are installed.
Parameters:
Name | Type | Description | Default |
---|
url | | | None |
method | | | None |
params | | | None |
batch | | if True, all elements are passed in a single batch request, otherwise a service call is executed per element | True |
extract | | list of sections to extract from response | None |
Source code in txtai/workflow/task/service.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| def register(self, url=None, method=None, params=None, batch=True, extract=None): “”” Adds service parameters to task. Checks if required dependencies are installed.
Args: url: url to connect to method: http method, GET or POST params: default query parameters batch: if True, all elements are passed in a single batch request, otherwise a service call is executed per element extract: list of sections to extract from response “””
if not XML_TO_DICT: raise ImportError(‘ServiceTask is not available - install “workflow” extra to enable’)
# pylint: disable=W0201 # Save URL, method and parameter defaults self.url = url self.method = method self.params = params
# If True, all elements are passed in a single batch request, otherwise a service call is executed per element self.batch = batch
# Save sections to extract. Supports both a single string and a hierarchical list of sections. self.extract = extract if self.extract: self.extract = [self.extract] if isinstance(self.extract, str) else self.extract
|