Extractor
The Extractor pipeline joins a prompt, context data store and generative model together to extract knowledge.
The data store can be an embeddings database or a similarity instance with associated input text. The generative model can be a prompt-driven large language model (LLM), an extractive question-answering model or a custom pipeline. This is known as prompt-driven search or retrieval augmented generation (RAG).
Example
The following shows a simple example using this pipeline.
from txtai.embeddings import Embeddings
from txtai.pipeline import Extractor
# LLM prompt
def prompt(question):
return f"""
Answer the following question using the provided context.
Question:
{question}
Context:
"""
# Input data
data = [
"US tops 5 million confirmed virus cases",
"Canada's last fully intact ice shelf has suddenly collapsed, " +
"forming a Manhattan-sized iceberg",
"Beijing mobilises invasion craft along coast as Taiwan tensions escalate",
"The National Park Service warns against sacrificing slower friends " +
"in a bear attack",
"Maine man wins $1M from $25 lottery ticket",
"Make huge profits without work, earn up to $100,000 a day"
]
# Build embeddings index
embeddings = Embeddings({"content": True})
embeddings.index([(uid, text, None) for uid, text in enumerate(data)])
# Create and run pipeline
extractor = Extractor(embeddings, "google/flan-t5-base")
extractor([{"query": "What was won?", "question": prompt("What was won?")}])
See the links below for more detailed examples.
Notebook | Description | |
---|---|---|
Prompt-driven search with LLMs | Embeddings-guided and Prompt-driven search with Large Language Models (LLMs) | |
Prompt templates and task chains | Build model prompts and connect tasks together with workflows | |
Extractive QA with txtai | Introduction to extractive question-answering with txtai | |
Extractive QA with Elasticsearch | Run extractive question-answering queries with Elasticsearch | |
Extractive QA to build structured data | Build structured datasets using extractive question-answering |
Configuration-driven example
Pipelines are run with Python or configuration. Pipelines can be instantiated in configuration using the lower case name of the pipeline. Configuration-driven pipelines are run with workflows or the API.
config.yml
# Allow documents to be indexed
writable: True
# Content is required for extractor pipeline
embeddings:
content: True
extractor:
path: google/flan-t5-base
workflow:
search:
tasks:
- task: extractor
template: |
Answer the following question using the provided context.
Question:
{text}
Context:
action: extractor
Run with Workflows
Built in tasks make using the extractor pipeline easier.
from txtai.app import Application
# Create and run pipeline with workflow
app = Application("config.yml")
app.add([
"US tops 5 million confirmed virus cases",
"Canada's last fully intact ice shelf has suddenly collapsed, " +
"forming a Manhattan-sized iceberg",
"Beijing mobilises invasion craft along coast as Taiwan tensions escalate",
"The National Park Service warns against sacrificing slower friends " +
"in a bear attack",
"Maine man wins $1M from $25 lottery ticket",
"Make huge profits without work, earn up to $100,000 a day"
])
app.index()
list(app.workflow("search", ["What was won?"]))
Run with API
CONFIG=config.yml uvicorn "txtai.api:app" &
curl \
-X POST "http://localhost:8000/workflow" \
-H "Content-Type: application/json" \
-d '{"name": "search", "elements": ["What was won"]}'
Methods
Python documentation for the pipeline.
__init__(self, similarity, path, quantize=False, gpu=True, model=None, tokenizer=None, minscore=None, mintokens=None, context=None, task=None, output='default', **kwargs)
special
Builds a new extractor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
similarity | similarity instance (embeddings or similarity pipeline) | required | |
path | path to model, supports Questions, Generator, Sequences or custom pipeline | required | |
quantize | True if model should be quantized before inference, False otherwise. | False | |
gpu | if gpu inference should be used (only works if GPUs are available) | True | |
model | optional existing pipeline model to wrap | None | |
tokenizer | Tokenizer class | None | |
minscore | minimum score to include context match, defaults to None | None | |
mintokens | minimum number of tokens to include context match, defaults to None | None | |
context | topn context matches to include, defaults to 3 | None | |
task | model task (language-generation, sequence-sequence or question-answering), defaults to auto-detect | None | |
output | output format, ‘default’ returns (name, answer), ‘flatten’ returns answers and ‘reference’ returns (name, answer, reference) | ‘default’ | |
kwargs | additional keyword arguments to pass to pipeline model | {} |
Source code in txtai/pipeline/text/extractor.py
def __init__(
self,
similarity,
path,
quantize=False,
gpu=True,
model=None,
tokenizer=None,
minscore=None,
mintokens=None,
context=None,
task=None,
output="default",
**kwargs,
):
"""
Builds a new extractor.
Args:
similarity: similarity instance (embeddings or similarity pipeline)
path: path to model, supports Questions, Generator, Sequences or custom pipeline
quantize: True if model should be quantized before inference, False otherwise.
gpu: if gpu inference should be used (only works if GPUs are available)
model: optional existing pipeline model to wrap
tokenizer: Tokenizer class
minscore: minimum score to include context match, defaults to None
mintokens: minimum number of tokens to include context match, defaults to None
context: topn context matches to include, defaults to 3
task: model task (language-generation, sequence-sequence or question-answering), defaults to auto-detect
output: output format, 'default' returns (name, answer), 'flatten' returns answers and 'reference' returns (name, answer, reference)
kwargs: additional keyword arguments to pass to pipeline model
"""
# Similarity instance
self.similarity = similarity
# Question-Answer model. Can be prompt-driven LLM or extractive qa
self.model = self.load(path, quantize, gpu, model, task, **kwargs)
# Tokenizer class use default method if not set
self.tokenizer = tokenizer if tokenizer else Tokenizer() if hasattr(self.similarity, "scoring") and self.similarity.scoring else None
# Minimum score to include context match
self.minscore = minscore if minscore is not None else 0.0
# Minimum number of tokens to include context match
self.mintokens = mintokens if mintokens is not None else 0.0
# Top n context matches to include for context
self.context = context if context else 3
# Output format
self.output = output
__call__(self, queue, texts=None)
special
Finds answers to input questions. This method runs queries to find the top n best matches and uses that as the context. A model is then run against the context for each input question, with the answer returned.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queue | input question queue (name, query, question, snippet), can be list of tuples or dicts | required | |
texts | optional list of text for context, otherwise runs embeddings search | None |
Returns:
Type | Description |
---|---|
list of answers matching input format (tuple or dict) containing fields as specified by output format |
Source code in txtai/pipeline/text/extractor.py
def __call__(self, queue, texts=None):
"""
Finds answers to input questions. This method runs queries to find the top n best matches and uses that as the context.
A model is then run against the context for each input question, with the answer returned.
Args:
queue: input question queue (name, query, question, snippet), can be list of tuples or dicts
texts: optional list of text for context, otherwise runs embeddings search
Returns:
list of answers matching input format (tuple or dict) containing fields as specified by output format
"""
# Save original queue format
inputs = queue
# Convert dictionary inputs to tuples
if queue and isinstance(queue[0], dict):
# Convert dict to tuple
queue = [tuple(row.get(x) for x in ["name", "query", "question", "snippet"]) for row in queue]
# Rank texts by similarity for each query
results = self.query([query for _, query, _, _ in queue], texts)
# Build question-context pairs
names, queries, questions, contexts, topns, snippets = [], [], [], [], [], []
for x, (name, query, question, snippet) in enumerate(queue):
# Get top n best matching segments
topn = sorted(results[x], key=lambda y: y[2], reverse=True)[: self.context]
# Generate context using ordering from texts, if available, otherwise order by score
context = " ".join(text for _, text, _ in (sorted(topn, key=lambda y: y[0]) if texts else topn))
names.append(name)
queries.append(query)
questions.append(question)
contexts.append(context)
topns.append(topn)
snippets.append(snippet)
# Run pipeline and return answers
answers = self.answers(names, questions, contexts, [[text for _, text, _ in topn] for topn in topns], snippets)
# Apply output formatting to answers and return
return self.apply(inputs, queries, answers, topns)