Using externally hosted ML models for batch ingestion

Introduced 2.15

If you are ingesting multiple documents and generating embeddings by invoking an externally hosted model, you can use batch ingestion to improve performance.

When using the Bulk API to ingest documents, processors that support batch ingestion will split documents into batches and send each batch of documents to an externally hosted model in a single request.

The text_embedding and sparse_encoding processors currently support batch ingestion.

Step 1: Register a model group

You can register a model in two ways:

  • You can use model_group_id to register a model version to an existing model group.
  • If you do not use model_group_id, then ML Commons creates a model with a new model group.

To register a model group, send the following request:

  1. POST /_plugins/_ml/model_groups/_register
  2. {
  3. "name": "remote_model_group",
  4. "description": "A model group for external models"
  5. }

copy

The response contains the model group ID that you’ll use to register a model to this model group:

  1. {
  2. "model_group_id": "wlcnb4kBJ1eYAeTMHlV6",
  3. "status": "CREATED"
  4. }

To learn more about model groups, see Model access control.

Step 2: Create a connector

You can create a standalone connector that can be reused for multiple models. Alternatively, you can specify a connector when creating a model so that it can be used only for that model. For more information and example connectors, see Connectors.

The Connectors Create API, /_plugins/_ml/connectors/_create, creates connectors that facilitate registering and deploying external models in OpenSearch. Using the endpoint parameter, you can connect ML Commons to any supported ML tool by using its specific API endpoint. For example, you can connect to a ChatGPT model by using the api.openai.com endpoint:

  1. POST /_plugins/_ml/connectors/_create
  2. {
  3. "name": "OpenAI Chat Connector",
  4. "description": "The connector to public OpenAI model service for GPT 3.5",
  5. "version": 1,
  6. "protocol": "http",
  7. "parameters": {
  8. "endpoint": "api.openai.com",
  9. "model": "gpt-3.5-turbo",
  10. "input_docs_processed_step_size": 100
  11. },
  12. "credential": {
  13. "openAI_key": "..."
  14. },
  15. "actions": [
  16. {
  17. "action_type": "predict",
  18. "method": "POST",
  19. "url": "https://${parameters.endpoint}/v1/chat/completions",
  20. "headers": {
  21. "Authorization": "Bearer ${credential.openAI_key}"
  22. },
  23. "request_body": "{ \"model\": \"${parameters.model}\", \"messages\": ${parameters.messages} }"
  24. }
  25. ]
  26. }

copy

The parameters.input_docs_processed_step_size parameter is used to set the maximum batch size for documents sent to a remote server. You can set this parameter to the maximum batch size supported by the remote server or to a smaller number for optimal performance.

The response contains the connector ID for the newly created connector:

  1. {
  2. "connector_id": "a1eMb4kBJ1eYAeTMAljY"
  3. }

Step 3: Register an externally hosted model

To register an externally hosted model to the model group created in step 1, provide the model group ID from step 1 and the connector ID from step 2 in the following request. You must specify the function_name as remote:

  1. POST /_plugins/_ml/models/_register
  2. {
  3. "name": "openAI-gpt-3.5-turbo",
  4. "function_name": "remote",
  5. "model_group_id": "wlcnb4kBJ1eYAeTMHlV6",
  6. "description": "test model",
  7. "connector_id": "a1eMb4kBJ1eYAeTMAljY"
  8. }

copy

OpenSearch returns the task ID of the register operation:

  1. {
  2. "task_id": "cVeMb4kBJ1eYAeTMFFgj",
  3. "status": "CREATED"
  4. }

To check the status of the operation, provide the task ID to the Tasks API:

  1. GET /_plugins/_ml/tasks/cVeMb4kBJ1eYAeTMFFgj

copy

When the operation is complete, the state changes to COMPLETED:

  1. {
  2. "model_id": "cleMb4kBJ1eYAeTMFFg4",
  3. "task_type": "REGISTER_MODEL",
  4. "function_name": "REMOTE",
  5. "state": "COMPLETED",
  6. "worker_node": [
  7. "XPcXLV7RQoi5m8NI_jEOVQ"
  8. ],
  9. "create_time": 1689793598499,
  10. "last_update_time": 1689793598530,
  11. "is_async": false
  12. }

Take note of the returned model_id because you’ll need it to deploy the model.

Step 4: Deploy the model

Starting in OpenSearch version 2.13, externally hosted models are deployed automatically when you send a Predict API request for the first time. To disable automatic deployment for an externally hosted model, set plugins.ml_commons.model_auto_deploy.enable to false:

  1. PUT _cluster/settings
  2. {
  3. "persistent": {
  4. "plugins.ml_commons.model_auto_deploy.enable" : "false"
  5. }
  6. }

copy

To undeploy the model, use the Undeploy API:

  1. POST /_plugins/_ml/models/cleMb4kBJ1eYAeTMFFg4/_deploy

copy

The response contains the task ID, which you can use to check the status of the deploy operation:

  1. {
  2. "task_id": "vVePb4kBJ1eYAeTM7ljG",
  3. "status": "CREATED"
  4. }

As in the previous step, check the status of the operation by calling the Tasks API:

  1. GET /_plugins/_ml/tasks/vVePb4kBJ1eYAeTM7ljG

copy

When the operation is complete, the state changes to COMPLETED:

  1. {
  2. "model_id": "cleMb4kBJ1eYAeTMFFg4",
  3. "task_type": "DEPLOY_MODEL",
  4. "function_name": "REMOTE",
  5. "state": "COMPLETED",
  6. "worker_node": [
  7. "n-72khvBTBi3bnIIR8FTTw"
  8. ],
  9. "create_time": 1689793851077,
  10. "last_update_time": 1689793851101,
  11. "is_async": true
  12. }

Step 5: Create an ingest pipeline

The following example request creates an ingest pipeline with a text_embedding processor. The processor converts the text in the passage_text field into text embeddings and stores the embeddings in passage_embedding:

  1. PUT /_ingest/pipeline/nlp-ingest-pipeline
  2. {
  3. "description": "A text embedding pipeline",
  4. "processors": [
  5. {
  6. "text_embedding": {
  7. "model_id": "cleMb4kBJ1eYAeTMFFg4",
  8. "field_map": {
  9. "passage_text": "passage_embedding"
  10. },
  11. "batch_size": 5
  12. }
  13. }
  14. ]
  15. }

copy

Step 6: Perform bulk indexing

To ingest documents in bulk, call the Bulk API and provide the pipeline parameter. If you don’t provide a pipeline parameter, then the default ingest pipeline for the index will be used for ingestion:

  1. POST _bulk?batch_size=5&pipeline=nlp-ingest-pipeline
  2. { "create": { "_index": "testindex1", "_id": "2" } }
  3. { "passage_text": "hello world" }
  4. { "create": { "_index": "testindex1", "_id": "3" } }
  5. { "passage_text": "big apple" }
  6. { "create": { "_index": "testindex1", "_id": "4" } }
  7. { "passage_text": "golden gate bridge" }
  8. { "create": { "_index": "testindex1", "_id": "5" } }
  9. { "passage_text": "fine tune" }
  10. { "create": { "_index": "testindex1", "_id": "6" } }
  11. { "passage_text": "random test" }
  12. { "create": { "_index": "testindex1", "_id": "7" } }
  13. { "passage_text": "sun and moon" }
  14. { "create": { "_index": "testindex1", "_id": "8" } }
  15. { "passage_text": "windy" }
  16. { "create": { "_index": "testindex1", "_id": "9" } }
  17. { "passage_text": "new york" }
  18. { "create": { "_index": "testindex1", "_id": "10" } }
  19. { "passage_text": "fantastic" }

copy