- Methods
__init__(self, config=None, models=None, **kwargs)
special
batchexplain(self, queries, texts=None, limit=None)
batchsearch(self, queries, limit=None, weights=None, index=None)
batchsimilarity(self, queries, data)
batchterms(self, queries)
batchtransform(self, documents, category=None)
checkarchive(self, path)
close(self)
columns(self, config)
configure(self, config)
count(self)
createann(self)
createcloud(self, **cloud)
createdatabase(self)
creategraph(self)
createindexes(self)
createscoring(self)
defaultallowed(self)
defaults(self)
delete(self, ids)
exists(self, path=None, cloud=None, **kwargs)
explain(self, query, texts=None, limit=None)
index(self, documents, reindex=False)
info(self)
initindex(self, reindex)
isdense(self)
issparse(self)
isweighted(self)
load(self, path=None, cloud=None, **kwargs)
loadconfig(self, path)
loadquery(self)
loadvectors(self)
normalize(self, embeddings)
reindex(self, config=None, function=None, **kwargs)
save(self, path, cloud=None, **kwargs)
saveconfig(self, path)
score(self, documents)
search(self, query, limit=None, weights=None, index=None)
similarity(self, query, data)
terms(self, query)
transform(self, document)
upsert(self, documents)
Methods
Embeddings is the engine that delivers semantic search. Data is transformed into embeddings vectors where similar concepts will produce similar vectors. Indexes both large and small are built with these vectors. The indexes are used to find results that have the same meaning, not necessarily the same keywords.
Source code in txtai/embeddings/base.py
class Embeddings:
"""
Embeddings is the engine that delivers semantic search. Data is transformed into embeddings vectors where similar concepts
will produce similar vectors. Indexes both large and small are built with these vectors. The indexes are used to find results
that have the same meaning, not necessarily the same keywords.
"""
# pylint: disable = W0231
def __init__(self, config=None, models=None, **kwargs):
"""
Creates a new embeddings index. Embeddings indexes are thread-safe for read operations but writes must be synchronized.
Args:
config: embeddings configuration
models: models cache, used for model sharing between embeddings
kwargs: additional configuration as keyword args
"""
# Index configuration
self.config = None
# Dimensionality reduction - word vectors only
self.reducer = None
# Dense vector model - transforms data into similarity vectors
self.model = None
# Approximate nearest neighbor index
self.ann = None
# Document database
self.database = None
# Resolvable functions
self.functions = None
# Graph network
self.graph = None
# Sparse vectors
self.scoring = None
# Query model
self.query = None
# Index archive
self.archive = None
# Subindexes for this embeddings instance
self.indexes = None
# Models cache
self.models = models
# Merge configuration into single dictionary
config = {**config, **kwargs} if config and kwargs else kwargs if kwargs else config
# Set initial configuration
self.configure(config)
def score(self, documents):
"""
Builds a term weighting scoring index. Only used by word vectors models.
Args:
documents: iterable of (id, data, tags), (id, data) or data
"""
# Build scoring index for word vectors term weighting
if self.isweighted():
self.scoring.index(Stream(self)(documents))
def index(self, documents, reindex=False):
"""
Builds an embeddings index. This method overwrites an existing index.
Args:
documents: iterable of (id, data, tags), (id, data) or data
reindex: if this is a reindex operation in which case database creation is skipped, defaults to False
"""
# Initialize index
self.initindex(reindex)
# Create transform and stream
transform = Transform(self, Action.REINDEX if reindex else Action.INDEX)
stream = Stream(self, Action.REINDEX if reindex else Action.INDEX)
with tempfile.NamedTemporaryFile(mode="wb", suffix=".npy") as buffer:
# Load documents into database and transform to vectors
ids, dimensions, embeddings = transform(stream(documents), buffer)
if embeddings is not None:
# Build LSA model (if enabled). Remove principal components from embeddings.
if self.config.get("pca"):
self.reducer = Reducer(embeddings, self.config["pca"])
self.reducer(embeddings)
# Normalize embeddings
self.normalize(embeddings)
# Save index dimensions
self.config["dimensions"] = dimensions
# Create approximate nearest neighbor index
self.ann = self.createann()
# Add embeddings to the index
self.ann.index(embeddings)
# Save indexids-ids mapping for indexes with no database, except when this is a reindex
if ids and not reindex and not self.database:
self.config["ids"] = ids
# Index scoring, if necessary
# This must occur before graph index in order to be available to the graph
if self.issparse():
self.scoring.index()
# Index subindexes, if necessary
if self.indexes:
self.indexes.index()
# Index graph, if necessary
if self.graph:
self.graph.index(Search(self, True), self.batchsimilarity)
def upsert(self, documents):
"""
Runs an embeddings upsert operation. If the index exists, new data is
appended to the index, existing data is updated. If the index doesn't exist,
this method runs a standard index operation.
Args:
documents: iterable of (id, data, tags), (id, data) or data
"""
# Run standard insert if index doesn't exist or it has no records
if not self.count():
self.index(documents)
return
# Create transform and stream
transform = Transform(self, Action.UPSERT)
stream = Stream(self, Action.UPSERT)
with tempfile.NamedTemporaryFile(mode="wb", suffix=".npy") as buffer:
# Load documents into database and transform to vectors
ids, _, embeddings = transform(stream(documents), buffer)
if embeddings is not None:
# Remove principal components from embeddings, if necessary
if self.reducer:
self.reducer(embeddings)
# Normalize embeddings
self.normalize(embeddings)
# Append embeddings to the index
self.ann.append(embeddings)
# Save indexids-ids mapping for indexes with no database
if ids and not self.database:
self.config["ids"] = self.config["ids"] + ids
# Scoring upsert, if necessary
# This must occur before graph upsert in order to be available to the graph
if self.issparse():
self.scoring.upsert()
# Subindexes upsert, if necessary
if self.indexes:
self.indexes.upsert()
# Graph upsert, if necessary
if self.graph:
self.graph.upsert(Search(self, True), self.batchsimilarity)
def delete(self, ids):
"""
Deletes from an embeddings index. Returns list of ids deleted.
Args:
ids: list of ids to delete
Returns:
list of ids deleted
"""
# List of internal indices for each candidate id to delete
indices = []
# List of deleted ids
deletes = []
if self.database:
# Retrieve indexid-id mappings from database
ids = self.database.ids(ids)
# Parse out indices and ids to delete
indices = [i for i, _ in ids]
deletes = sorted(set(uid for _, uid in ids))
# Delete ids from database
self.database.delete(deletes)
elif self.ann or self.scoring:
# Lookup indexids from config for indexes with no database
indexids = self.config["ids"]
# Find existing ids
for uid in ids:
indices.extend([index for index, value in enumerate(indexids) if uid == value])
# Clear config ids
for index in indices:
deletes.append(indexids[index])
indexids[index] = None
# Delete indices for all indexes and data stores
if indices:
# Delete ids from ann
if self.isdense():
self.ann.delete(indices)
# Delete ids from scoring
if self.issparse():
self.scoring.delete(indices)
# Delete ids from subindexes
if self.indexes:
self.indexes.delete(indices)
# Delete ids from graph
if self.graph:
self.graph.delete(indices)
return deletes
def reindex(self, config=None, function=None, **kwargs):
"""
Recreates embeddings index using config. This method only works if document content storage is enabled.
Args:
config: new config
function: optional function to prepare content for indexing
kwargs: additional configuration as keyword args
"""
if self.database:
# Merge configuration into single dictionary
config = {**config, **kwargs} if config and kwargs else config if config else kwargs
# Keep content and objects parameters to ensure database is preserved
config["content"] = self.config["content"]
if "objects" in self.config:
config["objects"] = self.config["objects"]
# Reset configuration
self.configure(config)
# Reset function references
if self.functions:
self.functions.reset()
# Reindex
if function:
self.index(function(self.database.reindex(self.config)), True)
else:
self.index(self.database.reindex(self.config), True)
def transform(self, document):
"""
Transforms document into an embeddings vector.
Args:
documents: iterable of (id, data, tags), (id, data) or data
Returns:
embeddings vector
"""
return self.batchtransform([document])[0]
def batchtransform(self, documents, category=None):
"""
Transforms documents into embeddings vectors.
Args:
documents: iterable of (id, data, tags), (id, data) or data
category: category for instruction-based embeddings
Returns:
embeddings vectors
"""
# Initialize default parameters, if necessary
self.defaults()
# Convert documents into sentence embeddings
embeddings = self.model.batchtransform(Stream(self)(documents), category)
# Reduce the dimensionality of the embeddings. Scale the embeddings using this
# model to reduce the noise of common but less relevant terms.
if self.reducer:
self.reducer(embeddings)
# Normalize embeddings
self.normalize(embeddings)
return embeddings
def count(self):
"""
Total number of elements in this embeddings index.
Returns:
number of elements in this embeddings index
"""
if self.ann:
return self.ann.count()
if self.scoring:
return self.scoring.count()
if self.database:
return self.database.count()
if self.config.get("ids"):
return len([uid for uid in self.config["ids"] if uid is not None])
# Default to 0 when no suitable method found
return 0
def search(self, query, limit=None, weights=None, index=None):
"""
Finds documents most similar to the input query. This method will run either an index search
or an index + database search depending on if a database is available.
Args:
query: input query
limit: maximum results
weights: hybrid score weights, if applicable
index: index name, if applicable
Returns:
list of (id, score) for index search, list of dict for an index + database search
"""
results = self.batchsearch([query], limit, weights, index)
return results[0] if results else results
def batchsearch(self, queries, limit=None, weights=None, index=None):
"""
Finds documents most similar to the input queries. This method will run either an index search
or an index + database search depending on if a database is available.
Args:
queries: input queries
limit: maximum results
weights: hybrid score weights, if applicable
index: index name, if applicable
Returns:
list of (id, score) per query for index search, list of dict per query for an index + database search
"""
return Search(self)(queries, limit, weights, index)
def similarity(self, query, data):
"""
Computes the similarity between query and list of data. Returns a list of
(id, score) sorted by highest score, where id is the index in data.
Args:
query: input query
data: list of data
Returns:
list of (id, score)
"""
return self.batchsimilarity([query], data)[0]
def batchsimilarity(self, queries, data):
"""
Computes the similarity between list of queries and list of data. Returns a list
of (id, score) sorted by highest score per query, where id is the index in data.
Args:
queries: input queries
data: list of data
Returns:
list of (id, score) per query
"""
# Convert queries to embedding vectors
queries = self.batchtransform(((None, query, None) for query in queries), "query")
data = self.batchtransform(((None, row, None) for row in data), "data")
# Dot product on normalized vectors is equal to cosine similarity
scores = np.dot(queries, data.T).tolist()
# Add index and sort desc based on score
return [sorted(enumerate(score), key=lambda x: x[1], reverse=True) for score in scores]
def explain(self, query, texts=None, limit=None):
"""
Explains the importance of each input token in text for a query. This method requires either content to be enabled
or texts to be provided.
Args:
query: input query
texts: optional list of (text|list of tokens), otherwise runs search query
limit: optional limit if texts is None
Returns:
list of dict per input text where a higher token scores represents higher importance relative to the query
"""
results = self.batchexplain([query], texts, limit)
return results[0] if results else results
def batchexplain(self, queries, texts=None, limit=None):
"""
Explains the importance of each input token in text for a list of queries. This method requires either content to be enabled
or texts to be provided.
Args:
queries: input queries
texts: optional list of (text|list of tokens), otherwise runs search queries
limit: optional limit if texts is None
Returns:
list of dict per input text per query where a higher token scores represents higher importance relative to the query
"""
return Explain(self)(queries, texts, limit)
def terms(self, query):
"""
Extracts keyword terms from a query.
Args:
query: input query
Returns:
query reduced down to keyword terms
"""
return self.batchterms([query])[0]
def batchterms(self, queries):
"""
Extracts keyword terms from a list of queries.
Args:
queries: list of queries
Returns:
list of queries reduced down to keyword term strings
"""
return Terms(self)(queries)
def exists(self, path=None, cloud=None, **kwargs):
"""
Checks if an index exists at path.
Args:
path: input path
cloud: cloud storage configuration
kwargs: additional configuration as keyword args
Returns:
True if index exists, False otherwise
"""
# Check if this exists in a cloud instance
cloud = self.createcloud(cloud=cloud, **kwargs)
if cloud:
return cloud.exists(path)
# Check if this is an archive file and exists
path, apath = self.checkarchive(path)
if apath:
return os.path.exists(apath)
# Return true if path has a config or config.json file and an embeddings (dense) or scoring (sparse) file
return (
path
and (os.path.exists(f"{path}/config") or os.path.exists(f"{path}/config.json"))
and (os.path.exists(f"{path}/embeddings") or os.path.exists(f"{path}/scoring"))
)
def load(self, path=None, cloud=None, **kwargs):
"""
Loads an existing index from path.
Args:
path: input path
cloud: cloud storage configuration
kwargs: additional configuration as keyword args
"""
# Load from cloud, if configured
cloud = self.createcloud(cloud=cloud, **kwargs)
if cloud:
path = cloud.load(path)
# Check if this is an archive file and extract
path, apath = self.checkarchive(path)
if apath:
self.archive.load(apath)
# Load index configuration
self.config = self.loadconfig(path)
# Approximate nearest neighbor index - stores dense vectors
self.ann = self.createann()
if self.ann:
self.ann.load(f"{path}/embeddings")
# Dimensionality reduction model - word vectors only
if self.config.get("pca"):
self.reducer = Reducer()
self.reducer.load(f"{path}/lsa")
# Document database - stores document content
self.database = self.createdatabase()
if self.database:
self.database.load(f"{path}/documents")
# Sparse vectors - stores term sparse arrays
self.scoring = self.createscoring()
if self.scoring:
self.scoring.load(f"{path}/scoring")
# Subindexes
self.indexes = self.createindexes()
if self.indexes:
self.indexes.load(f"{path}/indexes")
# Graph network - stores relationships
self.graph = self.creategraph()
if self.graph:
self.graph.load(f"{path}/graph")
# Dense vectors - transforms data to embeddings vectors
self.model = self.loadvectors()
# Query model
self.query = self.loadquery()
def save(self, path, cloud=None, **kwargs):
"""
Saves an index in a directory at path unless path ends with tar.gz, tar.bz2, tar.xz or zip.
In those cases, the index is stored as a compressed file.
Args:
path: output path
cloud: cloud storage configuration
kwargs: additional configuration as keyword args
"""
if self.config:
# Check if this is an archive file
path, apath = self.checkarchive(path)
# Create output directory, if necessary
os.makedirs(path, exist_ok=True)
# Copy sentence vectors model
if self.config.get("storevectors"):
shutil.copyfile(self.config["path"], os.path.join(path, os.path.basename(self.config["path"])))
self.config["path"] = os.path.basename(self.config["path"])
# Save index configuration
self.saveconfig(path)
# Save approximate nearest neighbor index
if self.ann:
self.ann.save(f"{path}/embeddings")
# Save dimensionality reduction model (word vectors only)
if self.reducer:
self.reducer.save(f"{path}/lsa")
# Save document database
if self.database:
self.database.save(f"{path}/documents")
# Save scoring index
if self.scoring:
self.scoring.save(f"{path}/scoring")
# Save subindexes
if self.indexes:
self.indexes.save(f"{path}/indexes")
# Save graph
if self.graph:
self.graph.save(f"{path}/graph")
# If this is an archive, save it
if apath:
self.archive.save(apath)
# Save to cloud, if configured
cloud = self.createcloud(cloud=cloud, **kwargs)
if cloud:
cloud.save(apath if apath else path)
def close(self):
"""
Closes this embeddings index and frees all resources.
"""
self.ann, self.config, self.graph, self.archive = None, None, None, None
self.reducer, self.query, self.model, self.models = None, None, None, None
# Close database connection if open
if self.database:
self.database.close()
self.database, self.functions = None, None
# Close scoring instance if open
if self.scoring:
self.scoring.close()
self.scoring = None
# Close indexes if open
if self.indexes:
self.indexes.close()
self.indexes = None
def info(self):
"""
Prints the current embeddings index configuration.
"""
if self.config:
# Copy and edit config
config = self.config.copy()
# Remove ids array if present
config.pop("ids", None)
# Print configuration
print(json.dumps(config, sort_keys=True, default=str, indent=2))
def issparse(self):
"""
Checks if this instance has an associated scoring instance with term indexing enabled.
Returns:
True if term index is enabled, False otherwise
"""
return self.scoring and self.scoring.hasterms()
def isdense(self):
"""
Checks if this instance has an associated ANN instance.
Returns:
True if this instance has an associated ANN, False otherwise
"""
return self.ann is not None
def isweighted(self):
"""
Checks if this instance has an associated scoring instance with term weighting enabled.
Returns:
True if term weighting is enabled, False otherwise
"""
return self.scoring and not self.scoring.hasterms()
def configure(self, config):
"""
Sets the configuration for this embeddings index and loads config-driven models.
Args:
config: embeddings configuration
"""
# Configuration
self.config = config
# Dimensionality reduction model
self.reducer = None
# Create scoring instance for word vectors term weighting
scoring = self.config.get("scoring") if self.config else None
self.scoring = self.createscoring() if scoring and (not isinstance(scoring, dict) or not scoring.get("terms")) else None
# Dense vectors - transforms data to embeddings vectors
self.model = self.loadvectors() if self.config else None
# Query model
self.query = self.loadquery() if self.config else None
def initindex(self, reindex):
"""
Initialize new index.
Args:
reindex: if this is a reindex operation in which case database creation is skipped, defaults to False
"""
# Initialize default parameters, if necessary
self.defaults()
# Create document database, if necessary
if not reindex:
self.database = self.createdatabase()
# Reset archive since this is a new index
self.archive = None
# Initialize ANN, will be created after index transformations complete
self.ann = None
# Create scoring only if term indexing is enabled
scoring = self.config.get("scoring")
if scoring and isinstance(scoring, dict) and self.config["scoring"].get("terms"):
self.scoring = self.createscoring()
# Create subindexes, if necessary
self.indexes = self.createindexes()
# Create graph, if necessary
self.graph = self.creategraph()
def defaults(self):
"""
Apply default parameters to current configuration.
Returns:
configuration with default parameters set
"""
self.config = self.config if self.config else {}
# Expand sparse index shortcuts
if not self.config.get("scoring") and any(self.config.get(key) for key in ["keyword", "hybrid"]):
self.config["scoring"] = {"method": "bm25", "terms": True, "normalize": True}
# Check if default model should be loaded
if not self.model and self.defaultallowed():
self.config["path"] = "sentence-transformers/all-MiniLM-L6-v2"
# Load dense vectors model
self.model = self.loadvectors()
def defaultallowed(self):
"""
Tests if this embeddings instance can use a default model if not otherwise provided.
Returns:
True if a default model is allowed, False otherwise
"""
params = [("keyword", False), ("defaults", True)]
return all(self.config.get(key, default) == default for key, default in params)
def loadconfig(self, path):
"""
Loads index configuration. This method supports both config pickle files and config.json files.
Args:
path: path to directory
Returns:
dict
"""
# Configuration
config = None
# Determine if config is json or pickle
jsonconfig = os.path.exists(f"{path}/config.json")
# Set config file name
name = "config.json" if jsonconfig else "config"
# Load configuration
with open(f"{path}/{name}", "r" if jsonconfig else "rb") as handle:
config = json.load(handle) if jsonconfig else pickle.load(handle)
# Build full path to embedding vectors file
if config.get("storevectors"):
config["path"] = os.path.join(path, config["path"])
return config
def saveconfig(self, path):
"""
Saves index configuration. This method saves to JSON if possible, otherwise it falls back to pickle.
Args:
path: path to directory
Returns:
dict
"""
# Default to pickle config
jsonconfig = self.config.get("format", "pickle") == "json"
# Set config file name
name = "config.json" if jsonconfig else "config"
# Write configuration
with open(f"{path}/{name}", "w" if jsonconfig else "wb", encoding="utf-8" if jsonconfig else None) as handle:
if jsonconfig:
# Write config as JSON
json.dump(self.config, handle, default=str, indent=2)
else:
# Write config as pickle format
pickle.dump(self.config, handle, protocol=__pickle__)
def loadvectors(self):
"""
Loads a vector model set in config.
Returns:
vector model
"""
# Create model cache if subindexes are enabled
if "indexes" in self.config and self.models is None:
self.models = {}
# Model path
path = self.config.get("path")
# Check if model is cached
if self.models and path in self.models:
return self.models[path]
# Load and store uncached model
model = VectorsFactory.create(self.config, self.scoring)
if self.models is not None and path:
self.models[path] = model
return model
def loadquery(self):
"""
Loads a query model set in config.
Returns:
query model
"""
if "query" in self.config:
return Query(**self.config["query"])
return None
def checkarchive(self, path):
"""
Checks if path is an archive file.
Args:
path: path to check
Returns:
(working directory, current path) if this is an archive, original path otherwise
"""
# Create archive instance, if necessary
self.archive = ArchiveFactory.create()
# Check if path is an archive file
if self.archive.isarchive(path):
# Return temporary archive working directory and original path
return self.archive.path(), path
return path, None
def createcloud(self, **cloud):
"""
Creates a cloud instance from config.
Args:
cloud: cloud configuration
"""
# Merge keyword args and keys under the cloud parameter
config = cloud
if "cloud" in config and config["cloud"]:
config.update(config.pop("cloud"))
# Create cloud instance from config and return
return CloudFactory.create(config) if config else None
def createann(self):
"""
Creates an ANN from config.
Returns:
new ANN, if enabled in config
"""
return ANNFactory.create(self.config) if self.config.get("path") or self.defaultallowed() else None
def createdatabase(self):
"""
Creates a database from config. This method will also close any existing database connection.
Returns:
new database, if enabled in config
"""
# Free existing database resources
if self.database:
self.database.close()
config = self.config.copy()
# Create references to callable functions
self.functions = Functions(self) if "functions" in config else None
if self.functions:
config["functions"] = self.functions(config)
# Create database from config and return
return DatabaseFactory.create(config)
def creategraph(self):
"""
Creates a graph from config.
Returns:
new graph, if enabled in config
"""
if "graph" in self.config:
# Get or create graph configuration
config = self.config["graph"] if self.config["graph"] else {}
# Create configuration with custom columns, if necessary
config = self.columns(config)
return GraphFactory.create(config)
return None
def createindexes(self):
"""
Creates subindexes from config.
Returns:
list of subindexes
"""
# Load subindexes
if "indexes" in self.config:
indexes = {}
for index, config in self.config["indexes"].items():
# Create index with shared model cache
indexes[index] = Embeddings(config, models=self.models)
# Wrap as Indexes object
return Indexes(self, indexes)
return None
def createscoring(self):
"""
Creates a scoring from config.
Returns:
new scoring, if enabled in config
"""
# Free existing resources
if self.scoring:
self.scoring.close()
if "scoring" in self.config:
# Expand scoring to a dictionary, if necessary
config = self.config["scoring"]
config = config if isinstance(config, dict) else {"method": config}
# Create configuration with custom columns, if necessary
config = self.columns(config)
return ScoringFactory.create(config)
return None
def columns(self, config):
"""
Adds custom text/object column information if it's provided.
Args:
config: input configuration
Returns:
config with column information added
"""
# Add text/object columns if custom
if "columns" in self.config:
# Work on copy of configuration
config = config.copy()
# Copy columns to config
config["columns"] = self.config["columns"]
return config
def normalize(self, embeddings):
"""
Normalizes embeddings using L2 normalization. Operation applied directly on array.
Args:
embeddings: input embeddings matrix
"""
# Calculation is different for matrices vs vectors
if len(embeddings.shape) > 1:
embeddings /= np.linalg.norm(embeddings, axis=1)[:, np.newaxis]
else:
embeddings /= np.linalg.norm(embeddings)
__init__(self, config=None, models=None, **kwargs)
special
Creates a new embeddings index. Embeddings indexes are thread-safe for read operations but writes must be synchronized.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config | embeddings configuration | None | |
models | models cache, used for model sharing between embeddings | None | |
kwargs | additional configuration as keyword args | {} |
Source code in txtai/embeddings/base.py
def __init__(self, config=None, models=None, **kwargs):
"""
Creates a new embeddings index. Embeddings indexes are thread-safe for read operations but writes must be synchronized.
Args:
config: embeddings configuration
models: models cache, used for model sharing between embeddings
kwargs: additional configuration as keyword args
"""
# Index configuration
self.config = None
# Dimensionality reduction - word vectors only
self.reducer = None
# Dense vector model - transforms data into similarity vectors
self.model = None
# Approximate nearest neighbor index
self.ann = None
# Document database
self.database = None
# Resolvable functions
self.functions = None
# Graph network
self.graph = None
# Sparse vectors
self.scoring = None
# Query model
self.query = None
# Index archive
self.archive = None
# Subindexes for this embeddings instance
self.indexes = None
# Models cache
self.models = models
# Merge configuration into single dictionary
config = {**config, **kwargs} if config and kwargs else kwargs if kwargs else config
# Set initial configuration
self.configure(config)
batchexplain(self, queries, texts=None, limit=None)
Explains the importance of each input token in text for a list of queries. This method requires either content to be enabled or texts to be provided.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queries | input queries | required | |
texts | optional list of (text|list of tokens), otherwise runs search queries | None | |
limit | optional limit if texts is None | None |
Returns:
Type | Description |
---|---|
list of dict per input text per query where a higher token scores represents higher importance relative to the query |
Source code in txtai/embeddings/base.py
def batchexplain(self, queries, texts=None, limit=None):
"""
Explains the importance of each input token in text for a list of queries. This method requires either content to be enabled
or texts to be provided.
Args:
queries: input queries
texts: optional list of (text|list of tokens), otherwise runs search queries
limit: optional limit if texts is None
Returns:
list of dict per input text per query where a higher token scores represents higher importance relative to the query
"""
return Explain(self)(queries, texts, limit)
batchsearch(self, queries, limit=None, weights=None, index=None)
Finds documents most similar to the input queries. This method will run either an index search or an index + database search depending on if a database is available.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queries | input queries | required | |
limit | maximum results | None | |
weights | hybrid score weights, if applicable | None | |
index | index name, if applicable | None |
Returns:
Type | Description |
---|---|
list of (id, score) per query for index search, list of dict per query for an index + database search |
Source code in txtai/embeddings/base.py
def batchsearch(self, queries, limit=None, weights=None, index=None):
"""
Finds documents most similar to the input queries. This method will run either an index search
or an index + database search depending on if a database is available.
Args:
queries: input queries
limit: maximum results
weights: hybrid score weights, if applicable
index: index name, if applicable
Returns:
list of (id, score) per query for index search, list of dict per query for an index + database search
"""
return Search(self)(queries, limit, weights, index)
batchsimilarity(self, queries, data)
Computes the similarity between list of queries and list of data. Returns a list of (id, score) sorted by highest score per query, where id is the index in data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queries | input queries | required | |
data | list of data | required |
Returns:
Type | Description |
---|---|
list of (id, score) per query |
Source code in txtai/embeddings/base.py
def batchsimilarity(self, queries, data):
"""
Computes the similarity between list of queries and list of data. Returns a list
of (id, score) sorted by highest score per query, where id is the index in data.
Args:
queries: input queries
data: list of data
Returns:
list of (id, score) per query
"""
# Convert queries to embedding vectors
queries = self.batchtransform(((None, query, None) for query in queries), "query")
data = self.batchtransform(((None, row, None) for row in data), "data")
# Dot product on normalized vectors is equal to cosine similarity
scores = np.dot(queries, data.T).tolist()
# Add index and sort desc based on score
return [sorted(enumerate(score), key=lambda x: x[1], reverse=True) for score in scores]
batchterms(self, queries)
Extracts keyword terms from a list of queries.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queries | list of queries | required |
Returns:
Type | Description |
---|---|
list of queries reduced down to keyword term strings |
Source code in txtai/embeddings/base.py
def batchterms(self, queries):
"""
Extracts keyword terms from a list of queries.
Args:
queries: list of queries
Returns:
list of queries reduced down to keyword term strings
"""
return Terms(self)(queries)
batchtransform(self, documents, category=None)
Transforms documents into embeddings vectors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
documents | iterable of (id, data, tags), (id, data) or data | required | |
category | category for instruction-based embeddings | None |
Returns:
Type | Description |
---|---|
embeddings vectors |
Source code in txtai/embeddings/base.py
def batchtransform(self, documents, category=None):
"""
Transforms documents into embeddings vectors.
Args:
documents: iterable of (id, data, tags), (id, data) or data
category: category for instruction-based embeddings
Returns:
embeddings vectors
"""
# Initialize default parameters, if necessary
self.defaults()
# Convert documents into sentence embeddings
embeddings = self.model.batchtransform(Stream(self)(documents), category)
# Reduce the dimensionality of the embeddings. Scale the embeddings using this
# model to reduce the noise of common but less relevant terms.
if self.reducer:
self.reducer(embeddings)
# Normalize embeddings
self.normalize(embeddings)
return embeddings
checkarchive(self, path)
Checks if path is an archive file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | path to check | required |
Returns:
Type | Description |
---|---|
(working directory, current path) if this is an archive, original path otherwise |
Source code in txtai/embeddings/base.py
def checkarchive(self, path):
"""
Checks if path is an archive file.
Args:
path: path to check
Returns:
(working directory, current path) if this is an archive, original path otherwise
"""
# Create archive instance, if necessary
self.archive = ArchiveFactory.create()
# Check if path is an archive file
if self.archive.isarchive(path):
# Return temporary archive working directory and original path
return self.archive.path(), path
return path, None
close(self)
Closes this embeddings index and frees all resources.
Source code in txtai/embeddings/base.py
def close(self):
"""
Closes this embeddings index and frees all resources.
"""
self.ann, self.config, self.graph, self.archive = None, None, None, None
self.reducer, self.query, self.model, self.models = None, None, None, None
# Close database connection if open
if self.database:
self.database.close()
self.database, self.functions = None, None
# Close scoring instance if open
if self.scoring:
self.scoring.close()
self.scoring = None
# Close indexes if open
if self.indexes:
self.indexes.close()
self.indexes = None
columns(self, config)
Adds custom text/object column information if it’s provided.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config | input configuration | required |
Returns:
Type | Description |
---|---|
config with column information added |
Source code in txtai/embeddings/base.py
def columns(self, config):
"""
Adds custom text/object column information if it's provided.
Args:
config: input configuration
Returns:
config with column information added
"""
# Add text/object columns if custom
if "columns" in self.config:
# Work on copy of configuration
config = config.copy()
# Copy columns to config
config["columns"] = self.config["columns"]
return config
configure(self, config)
Sets the configuration for this embeddings index and loads config-driven models.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config | embeddings configuration | required |
Source code in txtai/embeddings/base.py
def configure(self, config):
"""
Sets the configuration for this embeddings index and loads config-driven models.
Args:
config: embeddings configuration
"""
# Configuration
self.config = config
# Dimensionality reduction model
self.reducer = None
# Create scoring instance for word vectors term weighting
scoring = self.config.get("scoring") if self.config else None
self.scoring = self.createscoring() if scoring and (not isinstance(scoring, dict) or not scoring.get("terms")) else None
# Dense vectors - transforms data to embeddings vectors
self.model = self.loadvectors() if self.config else None
# Query model
self.query = self.loadquery() if self.config else None
count(self)
Total number of elements in this embeddings index.
Returns:
Type | Description |
---|---|
number of elements in this embeddings index |
Source code in txtai/embeddings/base.py
def count(self):
"""
Total number of elements in this embeddings index.
Returns:
number of elements in this embeddings index
"""
if self.ann:
return self.ann.count()
if self.scoring:
return self.scoring.count()
if self.database:
return self.database.count()
if self.config.get("ids"):
return len([uid for uid in self.config["ids"] if uid is not None])
# Default to 0 when no suitable method found
return 0
createann(self)
Creates an ANN from config.
Returns:
Type | Description |
---|---|
new ANN, if enabled in config |
Source code in txtai/embeddings/base.py
def createann(self):
"""
Creates an ANN from config.
Returns:
new ANN, if enabled in config
"""
return ANNFactory.create(self.config) if self.config.get("path") or self.defaultallowed() else None
createcloud(self, **cloud)
Creates a cloud instance from config.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud | cloud configuration | {} |
Source code in txtai/embeddings/base.py
def createcloud(self, **cloud):
"""
Creates a cloud instance from config.
Args:
cloud: cloud configuration
"""
# Merge keyword args and keys under the cloud parameter
config = cloud
if "cloud" in config and config["cloud"]:
config.update(config.pop("cloud"))
# Create cloud instance from config and return
return CloudFactory.create(config) if config else None
createdatabase(self)
Creates a database from config. This method will also close any existing database connection.
Returns:
Type | Description |
---|---|
new database, if enabled in config |
Source code in txtai/embeddings/base.py
def createdatabase(self):
"""
Creates a database from config. This method will also close any existing database connection.
Returns:
new database, if enabled in config
"""
# Free existing database resources
if self.database:
self.database.close()
config = self.config.copy()
# Create references to callable functions
self.functions = Functions(self) if "functions" in config else None
if self.functions:
config["functions"] = self.functions(config)
# Create database from config and return
return DatabaseFactory.create(config)
creategraph(self)
Creates a graph from config.
Returns:
Type | Description |
---|---|
new graph, if enabled in config |
Source code in txtai/embeddings/base.py
def creategraph(self):
"""
Creates a graph from config.
Returns:
new graph, if enabled in config
"""
if "graph" in self.config:
# Get or create graph configuration
config = self.config["graph"] if self.config["graph"] else {}
# Create configuration with custom columns, if necessary
config = self.columns(config)
return GraphFactory.create(config)
return None
createindexes(self)
Creates subindexes from config.
Returns:
Type | Description |
---|---|
list of subindexes |
Source code in txtai/embeddings/base.py
def createindexes(self):
"""
Creates subindexes from config.
Returns:
list of subindexes
"""
# Load subindexes
if "indexes" in self.config:
indexes = {}
for index, config in self.config["indexes"].items():
# Create index with shared model cache
indexes[index] = Embeddings(config, models=self.models)
# Wrap as Indexes object
return Indexes(self, indexes)
return None
createscoring(self)
Creates a scoring from config.
Returns:
Type | Description |
---|---|
new scoring, if enabled in config |
Source code in txtai/embeddings/base.py
def createscoring(self):
"""
Creates a scoring from config.
Returns:
new scoring, if enabled in config
"""
# Free existing resources
if self.scoring:
self.scoring.close()
if "scoring" in self.config:
# Expand scoring to a dictionary, if necessary
config = self.config["scoring"]
config = config if isinstance(config, dict) else {"method": config}
# Create configuration with custom columns, if necessary
config = self.columns(config)
return ScoringFactory.create(config)
return None
defaultallowed(self)
Tests if this embeddings instance can use a default model if not otherwise provided.
Returns:
Type | Description |
---|---|
True if a default model is allowed, False otherwise |
Source code in txtai/embeddings/base.py
def defaultallowed(self):
"""
Tests if this embeddings instance can use a default model if not otherwise provided.
Returns:
True if a default model is allowed, False otherwise
"""
params = [("keyword", False), ("defaults", True)]
return all(self.config.get(key, default) == default for key, default in params)
defaults(self)
Apply default parameters to current configuration.
Returns:
Type | Description |
---|---|
configuration with default parameters set |
Source code in txtai/embeddings/base.py
def defaults(self):
"""
Apply default parameters to current configuration.
Returns:
configuration with default parameters set
"""
self.config = self.config if self.config else {}
# Expand sparse index shortcuts
if not self.config.get("scoring") and any(self.config.get(key) for key in ["keyword", "hybrid"]):
self.config["scoring"] = {"method": "bm25", "terms": True, "normalize": True}
# Check if default model should be loaded
if not self.model and self.defaultallowed():
self.config["path"] = "sentence-transformers/all-MiniLM-L6-v2"
# Load dense vectors model
self.model = self.loadvectors()
delete(self, ids)
Deletes from an embeddings index. Returns list of ids deleted.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids | list of ids to delete | required |
Returns:
Type | Description |
---|---|
list of ids deleted |
Source code in txtai/embeddings/base.py
def delete(self, ids):
"""
Deletes from an embeddings index. Returns list of ids deleted.
Args:
ids: list of ids to delete
Returns:
list of ids deleted
"""
# List of internal indices for each candidate id to delete
indices = []
# List of deleted ids
deletes = []
if self.database:
# Retrieve indexid-id mappings from database
ids = self.database.ids(ids)
# Parse out indices and ids to delete
indices = [i for i, _ in ids]
deletes = sorted(set(uid for _, uid in ids))
# Delete ids from database
self.database.delete(deletes)
elif self.ann or self.scoring:
# Lookup indexids from config for indexes with no database
indexids = self.config["ids"]
# Find existing ids
for uid in ids:
indices.extend([index for index, value in enumerate(indexids) if uid == value])
# Clear config ids
for index in indices:
deletes.append(indexids[index])
indexids[index] = None
# Delete indices for all indexes and data stores
if indices:
# Delete ids from ann
if self.isdense():
self.ann.delete(indices)
# Delete ids from scoring
if self.issparse():
self.scoring.delete(indices)
# Delete ids from subindexes
if self.indexes:
self.indexes.delete(indices)
# Delete ids from graph
if self.graph:
self.graph.delete(indices)
return deletes
exists(self, path=None, cloud=None, **kwargs)
Checks if an index exists at path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | input path | None | |
cloud | cloud storage configuration | None | |
kwargs | additional configuration as keyword args | {} |
Returns:
Type | Description |
---|---|
True if index exists, False otherwise |
Source code in txtai/embeddings/base.py
def exists(self, path=None, cloud=None, **kwargs):
"""
Checks if an index exists at path.
Args:
path: input path
cloud: cloud storage configuration
kwargs: additional configuration as keyword args
Returns:
True if index exists, False otherwise
"""
# Check if this exists in a cloud instance
cloud = self.createcloud(cloud=cloud, **kwargs)
if cloud:
return cloud.exists(path)
# Check if this is an archive file and exists
path, apath = self.checkarchive(path)
if apath:
return os.path.exists(apath)
# Return true if path has a config or config.json file and an embeddings (dense) or scoring (sparse) file
return (
path
and (os.path.exists(f"{path}/config") or os.path.exists(f"{path}/config.json"))
and (os.path.exists(f"{path}/embeddings") or os.path.exists(f"{path}/scoring"))
)
explain(self, query, texts=None, limit=None)
Explains the importance of each input token in text for a query. This method requires either content to be enabled or texts to be provided.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query | input query | required | |
texts | optional list of (text|list of tokens), otherwise runs search query | None | |
limit | optional limit if texts is None | None |
Returns:
Type | Description |
---|---|
list of dict per input text where a higher token scores represents higher importance relative to the query |
Source code in txtai/embeddings/base.py
def explain(self, query, texts=None, limit=None):
"""
Explains the importance of each input token in text for a query. This method requires either content to be enabled
or texts to be provided.
Args:
query: input query
texts: optional list of (text|list of tokens), otherwise runs search query
limit: optional limit if texts is None
Returns:
list of dict per input text where a higher token scores represents higher importance relative to the query
"""
results = self.batchexplain([query], texts, limit)
return results[0] if results else results
index(self, documents, reindex=False)
Builds an embeddings index. This method overwrites an existing index.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
documents | iterable of (id, data, tags), (id, data) or data | required | |
reindex | if this is a reindex operation in which case database creation is skipped, defaults to False | False |
Source code in txtai/embeddings/base.py
def index(self, documents, reindex=False):
"""
Builds an embeddings index. This method overwrites an existing index.
Args:
documents: iterable of (id, data, tags), (id, data) or data
reindex: if this is a reindex operation in which case database creation is skipped, defaults to False
"""
# Initialize index
self.initindex(reindex)
# Create transform and stream
transform = Transform(self, Action.REINDEX if reindex else Action.INDEX)
stream = Stream(self, Action.REINDEX if reindex else Action.INDEX)
with tempfile.NamedTemporaryFile(mode="wb", suffix=".npy") as buffer:
# Load documents into database and transform to vectors
ids, dimensions, embeddings = transform(stream(documents), buffer)
if embeddings is not None:
# Build LSA model (if enabled). Remove principal components from embeddings.
if self.config.get("pca"):
self.reducer = Reducer(embeddings, self.config["pca"])
self.reducer(embeddings)
# Normalize embeddings
self.normalize(embeddings)
# Save index dimensions
self.config["dimensions"] = dimensions
# Create approximate nearest neighbor index
self.ann = self.createann()
# Add embeddings to the index
self.ann.index(embeddings)
# Save indexids-ids mapping for indexes with no database, except when this is a reindex
if ids and not reindex and not self.database:
self.config["ids"] = ids
# Index scoring, if necessary
# This must occur before graph index in order to be available to the graph
if self.issparse():
self.scoring.index()
# Index subindexes, if necessary
if self.indexes:
self.indexes.index()
# Index graph, if necessary
if self.graph:
self.graph.index(Search(self, True), self.batchsimilarity)
info(self)
Prints the current embeddings index configuration.
Source code in txtai/embeddings/base.py
def info(self):
"""
Prints the current embeddings index configuration.
"""
if self.config:
# Copy and edit config
config = self.config.copy()
# Remove ids array if present
config.pop("ids", None)
# Print configuration
print(json.dumps(config, sort_keys=True, default=str, indent=2))
initindex(self, reindex)
Initialize new index.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reindex | if this is a reindex operation in which case database creation is skipped, defaults to False | required |
Source code in txtai/embeddings/base.py
def initindex(self, reindex):
"""
Initialize new index.
Args:
reindex: if this is a reindex operation in which case database creation is skipped, defaults to False
"""
# Initialize default parameters, if necessary
self.defaults()
# Create document database, if necessary
if not reindex:
self.database = self.createdatabase()
# Reset archive since this is a new index
self.archive = None
# Initialize ANN, will be created after index transformations complete
self.ann = None
# Create scoring only if term indexing is enabled
scoring = self.config.get("scoring")
if scoring and isinstance(scoring, dict) and self.config["scoring"].get("terms"):
self.scoring = self.createscoring()
# Create subindexes, if necessary
self.indexes = self.createindexes()
# Create graph, if necessary
self.graph = self.creategraph()
isdense(self)
Checks if this instance has an associated ANN instance.
Returns:
Type | Description |
---|---|
True if this instance has an associated ANN, False otherwise |
Source code in txtai/embeddings/base.py
def isdense(self):
"""
Checks if this instance has an associated ANN instance.
Returns:
True if this instance has an associated ANN, False otherwise
"""
return self.ann is not None
issparse(self)
Checks if this instance has an associated scoring instance with term indexing enabled.
Returns:
Type | Description |
---|---|
True if term index is enabled, False otherwise |
Source code in txtai/embeddings/base.py
def issparse(self):
"""
Checks if this instance has an associated scoring instance with term indexing enabled.
Returns:
True if term index is enabled, False otherwise
"""
return self.scoring and self.scoring.hasterms()
isweighted(self)
Checks if this instance has an associated scoring instance with term weighting enabled.
Returns:
Type | Description |
---|---|
True if term weighting is enabled, False otherwise |
Source code in txtai/embeddings/base.py
def isweighted(self):
"""
Checks if this instance has an associated scoring instance with term weighting enabled.
Returns:
True if term weighting is enabled, False otherwise
"""
return self.scoring and not self.scoring.hasterms()
load(self, path=None, cloud=None, **kwargs)
Loads an existing index from path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | input path | None | |
cloud | cloud storage configuration | None | |
kwargs | additional configuration as keyword args | {} |
Source code in txtai/embeddings/base.py
def load(self, path=None, cloud=None, **kwargs):
"""
Loads an existing index from path.
Args:
path: input path
cloud: cloud storage configuration
kwargs: additional configuration as keyword args
"""
# Load from cloud, if configured
cloud = self.createcloud(cloud=cloud, **kwargs)
if cloud:
path = cloud.load(path)
# Check if this is an archive file and extract
path, apath = self.checkarchive(path)
if apath:
self.archive.load(apath)
# Load index configuration
self.config = self.loadconfig(path)
# Approximate nearest neighbor index - stores dense vectors
self.ann = self.createann()
if self.ann:
self.ann.load(f"{path}/embeddings")
# Dimensionality reduction model - word vectors only
if self.config.get("pca"):
self.reducer = Reducer()
self.reducer.load(f"{path}/lsa")
# Document database - stores document content
self.database = self.createdatabase()
if self.database:
self.database.load(f"{path}/documents")
# Sparse vectors - stores term sparse arrays
self.scoring = self.createscoring()
if self.scoring:
self.scoring.load(f"{path}/scoring")
# Subindexes
self.indexes = self.createindexes()
if self.indexes:
self.indexes.load(f"{path}/indexes")
# Graph network - stores relationships
self.graph = self.creategraph()
if self.graph:
self.graph.load(f"{path}/graph")
# Dense vectors - transforms data to embeddings vectors
self.model = self.loadvectors()
# Query model
self.query = self.loadquery()
loadconfig(self, path)
Loads index configuration. This method supports both config pickle files and config.json files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | path to directory | required |
Returns:
Type | Description |
---|---|
dict |
Source code in txtai/embeddings/base.py
def loadconfig(self, path):
"""
Loads index configuration. This method supports both config pickle files and config.json files.
Args:
path: path to directory
Returns:
dict
"""
# Configuration
config = None
# Determine if config is json or pickle
jsonconfig = os.path.exists(f"{path}/config.json")
# Set config file name
name = "config.json" if jsonconfig else "config"
# Load configuration
with open(f"{path}/{name}", "r" if jsonconfig else "rb") as handle:
config = json.load(handle) if jsonconfig else pickle.load(handle)
# Build full path to embedding vectors file
if config.get("storevectors"):
config["path"] = os.path.join(path, config["path"])
return config
loadquery(self)
Loads a query model set in config.
Returns:
Type | Description |
---|---|
query model |
Source code in txtai/embeddings/base.py
def loadquery(self):
"""
Loads a query model set in config.
Returns:
query model
"""
if "query" in self.config:
return Query(**self.config["query"])
return None
loadvectors(self)
Loads a vector model set in config.
Returns:
Type | Description |
---|---|
vector model |
Source code in txtai/embeddings/base.py
def loadvectors(self):
"""
Loads a vector model set in config.
Returns:
vector model
"""
# Create model cache if subindexes are enabled
if "indexes" in self.config and self.models is None:
self.models = {}
# Model path
path = self.config.get("path")
# Check if model is cached
if self.models and path in self.models:
return self.models[path]
# Load and store uncached model
model = VectorsFactory.create(self.config, self.scoring)
if self.models is not None and path:
self.models[path] = model
return model
normalize(self, embeddings)
Normalizes embeddings using L2 normalization. Operation applied directly on array.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
embeddings | input embeddings matrix | required |
Source code in txtai/embeddings/base.py
def normalize(self, embeddings):
"""
Normalizes embeddings using L2 normalization. Operation applied directly on array.
Args:
embeddings: input embeddings matrix
"""
# Calculation is different for matrices vs vectors
if len(embeddings.shape) > 1:
embeddings /= np.linalg.norm(embeddings, axis=1)[:, np.newaxis]
else:
embeddings /= np.linalg.norm(embeddings)
reindex(self, config=None, function=None, **kwargs)
Recreates embeddings index using config. This method only works if document content storage is enabled.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config | new config | None | |
function | optional function to prepare content for indexing | None | |
kwargs | additional configuration as keyword args | {} |
Source code in txtai/embeddings/base.py
def reindex(self, config=None, function=None, **kwargs):
"""
Recreates embeddings index using config. This method only works if document content storage is enabled.
Args:
config: new config
function: optional function to prepare content for indexing
kwargs: additional configuration as keyword args
"""
if self.database:
# Merge configuration into single dictionary
config = {**config, **kwargs} if config and kwargs else config if config else kwargs
# Keep content and objects parameters to ensure database is preserved
config["content"] = self.config["content"]
if "objects" in self.config:
config["objects"] = self.config["objects"]
# Reset configuration
self.configure(config)
# Reset function references
if self.functions:
self.functions.reset()
# Reindex
if function:
self.index(function(self.database.reindex(self.config)), True)
else:
self.index(self.database.reindex(self.config), True)
save(self, path, cloud=None, **kwargs)
Saves an index in a directory at path unless path ends with tar.gz, tar.bz2, tar.xz or zip. In those cases, the index is stored as a compressed file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | output path | required | |
cloud | cloud storage configuration | None | |
kwargs | additional configuration as keyword args | {} |
Source code in txtai/embeddings/base.py
def save(self, path, cloud=None, **kwargs):
"""
Saves an index in a directory at path unless path ends with tar.gz, tar.bz2, tar.xz or zip.
In those cases, the index is stored as a compressed file.
Args:
path: output path
cloud: cloud storage configuration
kwargs: additional configuration as keyword args
"""
if self.config:
# Check if this is an archive file
path, apath = self.checkarchive(path)
# Create output directory, if necessary
os.makedirs(path, exist_ok=True)
# Copy sentence vectors model
if self.config.get("storevectors"):
shutil.copyfile(self.config["path"], os.path.join(path, os.path.basename(self.config["path"])))
self.config["path"] = os.path.basename(self.config["path"])
# Save index configuration
self.saveconfig(path)
# Save approximate nearest neighbor index
if self.ann:
self.ann.save(f"{path}/embeddings")
# Save dimensionality reduction model (word vectors only)
if self.reducer:
self.reducer.save(f"{path}/lsa")
# Save document database
if self.database:
self.database.save(f"{path}/documents")
# Save scoring index
if self.scoring:
self.scoring.save(f"{path}/scoring")
# Save subindexes
if self.indexes:
self.indexes.save(f"{path}/indexes")
# Save graph
if self.graph:
self.graph.save(f"{path}/graph")
# If this is an archive, save it
if apath:
self.archive.save(apath)
# Save to cloud, if configured
cloud = self.createcloud(cloud=cloud, **kwargs)
if cloud:
cloud.save(apath if apath else path)
saveconfig(self, path)
Saves index configuration. This method saves to JSON if possible, otherwise it falls back to pickle.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | path to directory | required |
Returns:
Type | Description |
---|---|
dict |
Source code in txtai/embeddings/base.py
def saveconfig(self, path):
"""
Saves index configuration. This method saves to JSON if possible, otherwise it falls back to pickle.
Args:
path: path to directory
Returns:
dict
"""
# Default to pickle config
jsonconfig = self.config.get("format", "pickle") == "json"
# Set config file name
name = "config.json" if jsonconfig else "config"
# Write configuration
with open(f"{path}/{name}", "w" if jsonconfig else "wb", encoding="utf-8" if jsonconfig else None) as handle:
if jsonconfig:
# Write config as JSON
json.dump(self.config, handle, default=str, indent=2)
else:
# Write config as pickle format
pickle.dump(self.config, handle, protocol=__pickle__)
score(self, documents)
Builds a term weighting scoring index. Only used by word vectors models.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
documents | iterable of (id, data, tags), (id, data) or data | required |
Source code in txtai/embeddings/base.py
def score(self, documents):
"""
Builds a term weighting scoring index. Only used by word vectors models.
Args:
documents: iterable of (id, data, tags), (id, data) or data
"""
# Build scoring index for word vectors term weighting
if self.isweighted():
self.scoring.index(Stream(self)(documents))
search(self, query, limit=None, weights=None, index=None)
Finds documents most similar to the input query. This method will run either an index search or an index + database search depending on if a database is available.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query | input query | required | |
limit | maximum results | None | |
weights | hybrid score weights, if applicable | None | |
index | index name, if applicable | None |
Returns:
Type | Description |
---|---|
list of (id, score) for index search, list of dict for an index + database search |
Source code in txtai/embeddings/base.py
def search(self, query, limit=None, weights=None, index=None):
"""
Finds documents most similar to the input query. This method will run either an index search
or an index + database search depending on if a database is available.
Args:
query: input query
limit: maximum results
weights: hybrid score weights, if applicable
index: index name, if applicable
Returns:
list of (id, score) for index search, list of dict for an index + database search
"""
results = self.batchsearch([query], limit, weights, index)
return results[0] if results else results
similarity(self, query, data)
Computes the similarity between query and list of data. Returns a list of (id, score) sorted by highest score, where id is the index in data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query | input query | required | |
data | list of data | required |
Returns:
Type | Description |
---|---|
list of (id, score) |
Source code in txtai/embeddings/base.py
def similarity(self, query, data):
"""
Computes the similarity between query and list of data. Returns a list of
(id, score) sorted by highest score, where id is the index in data.
Args:
query: input query
data: list of data
Returns:
list of (id, score)
"""
return self.batchsimilarity([query], data)[0]
terms(self, query)
Extracts keyword terms from a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query | input query | required |
Returns:
Type | Description |
---|---|
query reduced down to keyword terms |
Source code in txtai/embeddings/base.py
def terms(self, query):
"""
Extracts keyword terms from a query.
Args:
query: input query
Returns:
query reduced down to keyword terms
"""
return self.batchterms([query])[0]
transform(self, document)
Transforms document into an embeddings vector.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
documents | iterable of (id, data, tags), (id, data) or data | required |
Returns:
Type | Description |
---|---|
embeddings vector |
Source code in txtai/embeddings/base.py
def transform(self, document):
"""
Transforms document into an embeddings vector.
Args:
documents: iterable of (id, data, tags), (id, data) or data
Returns:
embeddings vector
"""
return self.batchtransform([document])[0]
upsert(self, documents)
Runs an embeddings upsert operation. If the index exists, new data is appended to the index, existing data is updated. If the index doesn’t exist, this method runs a standard index operation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
documents | iterable of (id, data, tags), (id, data) or data | required |
Source code in txtai/embeddings/base.py
def upsert(self, documents):
"""
Runs an embeddings upsert operation. If the index exists, new data is
appended to the index, existing data is updated. If the index doesn't exist,
this method runs a standard index operation.
Args:
documents: iterable of (id, data, tags), (id, data) or data
"""
# Run standard insert if index doesn't exist or it has no records
if not self.count():
self.index(documents)
return
# Create transform and stream
transform = Transform(self, Action.UPSERT)
stream = Stream(self, Action.UPSERT)
with tempfile.NamedTemporaryFile(mode="wb", suffix=".npy") as buffer:
# Load documents into database and transform to vectors
ids, _, embeddings = transform(stream(documents), buffer)
if embeddings is not None:
# Remove principal components from embeddings, if necessary
if self.reducer:
self.reducer(embeddings)
# Normalize embeddings
self.normalize(embeddings)
# Append embeddings to the index
self.ann.append(embeddings)
# Save indexids-ids mapping for indexes with no database
if ids and not self.database:
self.config["ids"] = self.config["ids"] + ids
# Scoring upsert, if necessary
# This must occur before graph upsert in order to be available to the graph
if self.issparse():
self.scoring.upsert()
# Subindexes upsert, if necessary
if self.indexes:
self.indexes.upsert()
# Graph upsert, if necessary
if self.graph:
self.graph.upsert(Search(self, True), self.batchsimilarity)