Tutorial: Using Cohere with Elasticsearch

Tutorial: Using Cohere with Elasticsearch

The instructions in this tutorial shows you how to compute embeddings with Cohere using the inference API and store them for efficient vector or hybrid search in Elasticsearch. This tutorial will use the Python Elasticsearch client to perform the operations.

You’ll learn how to:

  • create an inference endpoint for text embedding using the Cohere service,
  • create the necessary index mapping for the Elasticsearch index,
  • build an inference pipeline to ingest documents into the index together with the embeddings,
  • perform hybrid search on the data,
  • rerank search results by using Cohere’s rerank model,
  • design a RAG system with Cohere’s Chat API.

The tutorial uses the SciFact data set.

Refer to Cohere’s tutorial for an example using a different data set.

You can also review the Colab notebook version of this tutorial.

Requirements

  • A paid Cohere account is required to use the Inference API with the Cohere service as the Cohere free trial API usage is limited,
  • an Elastic Cloud account,
  • Python 3.7 or higher.

Install required packages

Install Elasticsearch and Cohere:

  1. !pip install elasticsearch
  2. !pip install cohere

Import the required packages:

  1. from elasticsearch import Elasticsearch, helpers
  2. import cohere
  3. import json
  4. import requests

Create the Elasticsearch client

To create your Elasticsearch client, you need:

  1. ELASTICSEARCH_ENDPOINT = "elastic_endpoint"
  2. ELASTIC_API_KEY = "elastic_api_key"
  3. client = Elasticsearch(
  4. cloud_id=ELASTICSEARCH_ENDPOINT,
  5. api_key=ELASTIC_API_KEY
  6. )
  7. # Confirm the client has connected
  8. print(client.info())

Create the inference endpoint

Create the inference endpoint first. In this example, the inference endpoint uses Cohere’s embed-english-v3.0 model and the embedding_type is set to byte.

  1. COHERE_API_KEY = "cohere_api_key"
  2. client.inference.put_model(
  3. task_type="text_embedding",
  4. inference_id="cohere_embeddings",
  5. body={
  6. "service": "cohere",
  7. "service_settings": {
  8. "api_key": COHERE_API_KEY,
  9. "model_id": "embed-english-v3.0",
  10. "embedding_type": "byte"
  11. }
  12. },
  13. )

You can find your API keys in your Cohere dashboard under the API keys section.

Create the index mapping

Create the index mapping for the index that will contain the embeddings.

  1. client.indices.create(
  2. index="cohere-embeddings",
  3. settings={"index": {"default_pipeline": "cohere_embeddings"}},
  4. mappings={
  5. "properties": {
  6. "text_embedding": {
  7. "type": "dense_vector",
  8. "dims": 1024,
  9. "element_type": "byte",
  10. },
  11. "text": {"type": "text"},
  12. "id": {"type": "integer"},
  13. "title": {"type": "text"}
  14. }
  15. },
  16. )

Create the inference pipeline

Now you have an inference endpoint and an index ready to store embeddings. The next step is to create an ingest pipeline with an inference processor that will create the embeddings using the inference endpoint and stores them in the index.

  1. client.ingest.put_pipeline(
  2. id="cohere_embeddings",
  3. description="Ingest pipeline for Cohere inference.",
  4. processors=[
  5. {
  6. "inference": {
  7. "model_id": "cohere_embeddings",
  8. "input_output": {
  9. "input_field": "text",
  10. "output_field": "text_embedding",
  11. },
  12. }
  13. }
  14. ],
  15. )

Prepare data and insert documents

This example uses the SciFact data set that you can find on HuggingFace.

  1. url = 'https://huggingface.co/datasets/mteb/scifact/raw/main/corpus.jsonl'
  2. # Fetch the JSONL data from the URL
  3. response = requests.get(url)
  4. response.raise_for_status() # Ensure noticing bad responses
  5. # Split the content by new lines and parse each line as JSON
  6. data = [json.loads(line) for line in response.text.strip().split('\n') if line]
  7. # Now data is a list of dictionaries
  8. # Change `_id` key to `id` as `_id` is a reserved key in Elasticsearch.
  9. for item in data:
  10. if '_id' in item:
  11. item['id'] = item.pop('_id')
  12. # Prepare the documents to be indexed
  13. documents = []
  14. for line in data:
  15. data_dict = line
  16. documents.append({
  17. "_index": "cohere-embeddings",
  18. "_source": data_dict,
  19. }
  20. )
  21. # Use the bulk endpoint to index
  22. helpers.bulk(client, documents)
  23. print("Data ingestion completed, text embeddings generated!")

Your index is populated with the SciFact data and text embeddings for the text field.

Let’s start querying the index!

The code below performs a hybrid search. The kNN query computes the relevance of search results based on vector similarity using the text_embedding field, the lexical search query uses BM25 retrieval to compute keyword similarity on the title and text fields.

  1. query = "What is biosimilarity?"
  2. response = client.search(
  3. index="cohere-embeddings",
  4. size=100,
  5. knn={
  6. "field": "text_embedding",
  7. "query_vector_builder": {
  8. "text_embedding": {
  9. "model_id": "cohere_embeddings",
  10. "model_text": query,
  11. }
  12. },
  13. "k": 10,
  14. "num_candidates": 50,
  15. },
  16. query={
  17. "multi_match": {
  18. "query": query,
  19. "fields": ["text", "title"]
  20. }
  21. }
  22. )
  23. raw_documents = response["hits"]["hits"]
  24. # Display the first 10 results
  25. for document in raw_documents[0:10]:
  26. print(f'Title: {document["_source"]["title"]}\nText: {document["_source"]["text"]}\n')
  27. # Format the documents for ranking
  28. documents = []
  29. for hit in response["hits"]["hits"]:
  30. documents.append(hit["_source"]["text"])
Rerank search results

To combine the results more effectively, use Cohere’s Rerank v3 model through the inference API to provide a more precise semantic reranking of the results.

Create an inference endpoint with your Cohere API key and the used model name as the model_id (rerank-english-v3.0 in this example).

  1. client.inference.put_model(
  2. task_type="rerank",
  3. inference_id="cohere_rerank",
  4. body={
  5. "service": "cohere",
  6. "service_settings":{
  7. "api_key": COHERE_API_KEY,
  8. "model_id": "rerank-english-v3.0"
  9. },
  10. "task_settings": {
  11. "top_n": 10,
  12. },
  13. }
  14. )

Rerank the results using the new inference endpoint.

  1. # Pass the query and the search results to the service
  2. response = client.inference.inference(
  3. inference_id="cohere_rerank",
  4. body={
  5. "query": query,
  6. "input": documents,
  7. "task_settings": {
  8. "return_documents": False
  9. }
  10. }
  11. )
  12. # Reconstruct the input documents based on the index provided in the rereank response
  13. ranked_documents = []
  14. for document in response.body["rerank"]:
  15. ranked_documents.append({
  16. "title": raw_documents[int(document["index"])]["_source"]["title"],
  17. "text": raw_documents[int(document["index"])]["_source"]["text"]
  18. })
  19. # Print the top 10 results
  20. for document in ranked_documents[0:10]:
  21. print(f"Title: {document['title']}\nText: {document['text']}\n")

The response is a list of documents in descending order of relevance. Each document has a corresponding index that reflects the order of the documents when they were sent to the inference endpoint.

Retrieval Augmented Generation (RAG) with Cohere and Elasticsearch

RAG is a method for generating text using additional information fetched from an external data source. With the ranked results, you can build a RAG system on the top of what you previously created by using Cohere’s Chat API.

Pass in the retrieved documents and the query to receive a grounded response using Cohere’s newest generative model Command R+.

Then pass in the query and the documents to the Chat API, and print out the response.

  1. response = co.chat(message=query, documents=ranked_documents, model='command-r-plus')
  2. source_documents = []
  3. for citation in response.citations:
  4. for document_id in citation.document_ids:
  5. if document_id not in source_documents:
  6. source_documents.append(document_id)
  7. print(f"Query: {query}")
  8. print(f"Response: {response.text}")
  9. print("Sources:")
  10. for document in response.documents:
  11. if document['id'] in source_documents:
  12. print(f"{document['title']}: {document['text']}")

The response will look similar to this:

  1. Query: What is biosimilarity?
  2. Response: Biosimilarity is based on the comparability concept, which has been used successfully for several decades to ensure close similarity of a biological product before and after a manufacturing change. Over the last 10 years, experience with biosimilars has shown that even complex biotechnology-derived proteins can be copied successfully.
  3. Sources:
  4. Interchangeability of Biosimilars: A European Perspective: (...)