- Use a data stream
Use a data stream
After you set up a data stream, you can do the following:
- Add documents to a data stream
- Search a data stream
- Get statistics for a data stream
- Manually roll over a data stream
- Open closed backing indices
- Reindex with a data stream
- Update documents in a data stream by query
- Delete documents in a data stream by query
- Update or delete documents in a backing index
Add documents to a data stream
You can add documents to a data stream using two types of indexing requests:
Adding a document to a data stream adds the document to stream’s current write index.
You cannot add new documents to a stream’s other backing indices, even by sending requests directly to the index. This means you cannot submit the following requests directly to any backing index except the write index:
- An index API request with an
op_type
ofcreate
. Theop_type
parameter defaults tocreate
when adding new documents. - A bulk API request using a
create
action
Individual indexing requests
You can use an index API request with an op_type
of create
to add individual documents to a data stream.
The op_type
parameter defaults to create
when adding new documents.
The following index API request adds a new document to my-data-stream
.
POST /my-data-stream/_doc/
{
"@timestamp": "2020-12-07T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
}
You cannot add new documents to a data stream using the index API’s PUT /<target>/_doc/<_id>
request format. To specify a document ID, use the PUT /<target>/_create/<_id>
format instead.
Bulk indexing requests
You can use the bulk API to add multiple documents to a data stream in a single request. Each action in the bulk request must use the create
action.
Data streams do not support other bulk actions, such as index
.
The following bulk API request adds several new documents to my-data-stream
. Only the create
action is used.
PUT /my-data-stream/_bulk?refresh
{"create":{ }}
{ "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
Index with an ingest pipeline
You can use an ingest pipeline with an indexing request to pre-process data before it’s indexed to a data stream.
The following put pipeline API request creates the lowercase_message_field
ingest pipeline. The pipeline uses the lowercase
ingest processor to change the message
field value to lowercase before indexing.
PUT /_ingest/pipeline/lowercase_message_field
{
"description" : "Lowercases the message field value",
"processors" : [
{
"lowercase" : {
"field" : "message"
}
}
]
}
The following index API request adds a new document to my-data-stream
.
The request includes a ?pipeline=lowercase_message_field
query parameter. This parameter indicates Elasticsearch should use the lowercase_message_field
pipeline to pre-process the document before indexing it.
During pre-processing, the pipeline changes the letter case of the document’s message
field value from LOGIN Successful
to login successful
.
POST /my-data-stream/_doc?pipeline=lowercase_message_field
{
"@timestamp": "2020-12-08T11:12:01.000Z",
"user": {
"id": "I1YBEOxJ"
},
"message": "LOGIN Successful"
}
Search a data stream
The following search APIs support data streams:
The following search API request searches my-data-stream
for documents with a timestamp between today and yesterday that also have message
value of login successful
.
GET /my-data-stream/_search
{
"query": {
"bool": {
"must": {
"range": {
"@timestamp": {
"gte": "now-1d/d",
"lt": "now/d"
}
}
},
"should": {
"match": {
"message": "login successful"
}
}
}
}
}
You can use a comma-separated list to search multiple data streams, indices, and index aliases in the same request.
The following request searches my-data-stream
and my-data-stream-alt
, which are specified as a comma-separated list in the request path.
GET /my-data-stream,my-data-stream-alt/_search
{
"query": {
"match": {
"user.id": "8a4f500d"
}
}
}
Index patterns are also supported.
The following request uses the my-data-stream*
index pattern to search any data stream, index, or index alias beginning with my-data-stream
.
GET /my-data-stream*/_search
{
"query": {
"match": {
"user.id": "vlb44hny"
}
}
}
The following search request omits a target in the request path. The request searches all data streams and indices in the cluster.
GET /_search
{
"query": {
"match": {
"user.id": "l7gk7f82"
}
}
}
Get statistics for a data stream
You can use the data stream stats API to retrieve statistics for one or more data streams. These statistics include:
- A count of the stream’s backing indices
- The total store size of all shards for the stream’s backing indices
- The highest
@timestamp
value for the stream
Example
The following data stream stats API request retrieves statistics for my-data-stream
.
GET /_data_stream/my-data-stream/_stats?human=true
The API returns the following response.
{
"_shards": {
"total": 6,
"successful": 3,
"failed": 0
},
"data_stream_count": 1,
"backing_indices": 3,
"total_store_size": "624b",
"total_store_size_bytes": 624,
"data_streams": [
{
"data_stream": "my-data-stream",
"backing_indices": 3,
"store_size": "624b",
"store_size_bytes": 624,
"maximum_timestamp": 1607339167000
}
]
}
Manually roll over a data stream
A rollover creates a new backing index for a data stream. This new backing index becomes the stream’s write index and increments the stream’s generation.
In most cases, we recommend using ILM to automate rollovers for data streams. This lets you automatically roll over the current write index when it meets specified criteria, such as a maximum age or size.
However, you can also use the rollover API to manually perform a rollover. This can be useful if you want to apply mapping or setting changes to the stream’s write index after updating a data stream’s template.
The following rollover API request submits a manual rollover request for my-data-stream
.
POST /my-data-stream/_rollover/
Open closed backing indices
You may close one or more of a data stream’s backing indices as part of its ILM lifecycle or another workflow. A closed backing index cannot be searched, even for searches targeting its data stream. You also can’t update or delete documents in a closed index.
You can re-open individual backing indices by sending an open request directly to the index.
You also can conveniently re-open all closed backing indices for a data stream by sending an open request directly to the stream.
The following cat indices API request retrieves the status for my-data-stream
‘s backing indices.
GET /_cat/indices/my-data-stream?v&s=index&h=index,status
The API returns the following response. The response indicates my-data-stream
contains two closed backing indices: .ds-my-data-stream-000001
and .ds-my-data-stream-000002
.
index status
.ds-my-data-stream-000001 close
.ds-my-data-stream-000002 close
.ds-my-data-stream-000003 open
The following open API request re-opens any closed backing indices for my-data-stream
, including .ds-my-data-stream-000001
and .ds-my-data-stream-000002
.
POST /my-data-stream/_open/
You can resubmit the original cat indices API request to verify .ds-my-data-stream-000001
and .ds-my-data-stream-000002
were re-opened.
GET /_cat/indices/my-data-stream?v&s=index&h=index,status
The API returns the following response.
index status
.ds-my-data-stream-000001 open
.ds-my-data-stream-000002 open
.ds-my-data-stream-000003 open
Reindex with a data stream
You can use the reindex API to copy documents to a data stream from an existing index, index alias, or data stream.
A reindex copies documents from a source to a destination. The source and destination can be any pre-existing index, index alias, or data stream. However, the source and destination must be different. You cannot reindex a data stream into itself.
Because data streams are append-only, a reindex request to a data stream destination must have an op_type
of create
. This means a reindex can only add new documents to a data stream. It cannot update existing documents in the data stream destination.
A reindex can be used to:
- Convert an existing index alias and collection of time-based indices into a data stream.
- Apply a new or updated index template by reindexing an existing data stream into a new one. This applies mapping and setting changes in the template to each document and backing index of the data stream destination. See Use reindex to change mappings or settings.
If you only want to update the mappings or settings of a data stream’s write index, we recommend you update the data stream’s template and perform a rollover.
The following reindex request copies documents from the archive
index alias to my-data-stream
. Because the destination is a data stream, the request’s op_type
is create
.
POST /_reindex
{
"source": {
"index": "archive"
},
"dest": {
"index": "my-data-stream",
"op_type": "create"
}
}
You can also reindex documents from a data stream to an index, index alias, or data stream.
The following reindex request copies documents from my-data-stream
to the existing archive
index alias. Because the destination is not a data stream, the op_type
does not need to be specified.
POST /_reindex
{
"source": {
"index": "my-data-stream"
},
"dest": {
"index": "archive"
}
}
Update documents in a data stream by query
You cannot send indexing or update requests for existing documents directly to a data stream. These prohibited requests include:
- An index API request with an
op_type
ofindex
. Theop_type
parameter defaults toindex
for existing documents. - A bulk API request using the
index
orupdate
action.
Instead, you can use the update by query API to update documents in a data stream that matches a provided query.
The following update by query request updates documents in my-data-stream
with a user.id
of l7gk7f82
. The request uses a script to assign matching documents a new user.id
value of XgdX0NoX
.
POST /my-data-stream/_update_by_query
{
"query": {
"match": {
"user.id": "l7gk7f82"
}
},
"script": {
"source": "ctx._source.user.id = params.new_id",
"params": {
"new_id": "XgdX0NoX"
}
}
}
Delete documents in a data stream by query
You cannot send document deletion requests directly to a data stream. These prohibited requests include:
- A delete API request
- A bulk API request using the
delete
action.
Instead, you can use the delete by query API to delete documents in a data stream that matches a provided query.
The following delete by query request deletes documents in my-data-stream
with a user.id
of vlb44hny
.
POST /my-data-stream/_delete_by_query
{
"query": {
"match": {
"user.id": "vlb44hny"
}
}
}
Update or delete documents in a backing index
Alternatively, you can update or delete documents in a data stream by sending the update or deletion request to the backing index containing the document. To do this, you first need to get:
- The document ID
- The name of the backing index that contains the document
If you want to update a document, you must also get its current sequence number and primary term.
You can use a search request to retrieve this information.
The following search request retrieves documents in my-data-stream
with a user.id
of yWIumJd7
. By default, this search returns the document ID and backing index for any matching documents.
The request includes a "seq_no_primary_term": true
argument. This means the search also returns the sequence number and primary term for any matching documents.
GET /my-data-stream/_search
{
"seq_no_primary_term": true,
"query": {
"match": {
"user.id": "yWIumJd7"
}
}
}
The API returns the following response. The hits.hits
property contains information for any documents matching the search.
{
"took": 20,
"timed_out": false,
"_shards": {
"total": 3,
"successful": 3,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 0.2876821,
"hits": [
{
"_index": ".ds-my-data-stream-000003",
"_type": "_doc",
"_id": "bfspvnIBr7VVZlfp2lqX",
"_seq_no": 0,
"_primary_term": 1,
"_score": 0.2876821,
"_source": {
"@timestamp": "2020-12-07T11:06:07.000Z",
"user": {
"id": "yWIumJd7"
},
"message": "Login successful"
}
}
]
}
}
Backing index containing the matching document | |
Document ID for the document | |
Current sequence number for the document | |
Primary term for the document |
You can use an index API request to update an individual document. To prevent an accidental overwrite, this request must include valid if_seq_no
and if_primary_term
arguments.
The following index API request updates an existing document in my-data-stream
. The request targets document ID bfspvnIBr7VVZlfp2lqX
in the .ds-my-data-stream-000003
backing index.
The request also includes the current sequence number and primary term in the respective if_seq_no
and if_primary_term
query parameters. The request body contains a new JSON source for the document.
PUT /.ds-my-data-stream-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
{
"@timestamp": "2020-12-07T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
}
You use the delete API to delete individual documents. Deletion requests do not require a sequence number or primary term.
The following index API request deletes an existing document in my-data-stream
. The request targets document ID bfspvnIBr7VVZlfp2lqX
in the .ds-my-data-stream-000003
backing index.
DELETE /.ds-my-data-stream-000003/_doc/bfspvnIBr7VVZlfp2lqX
You can use the bulk API to delete or update multiple documents in one request using delete
, index
, or update
actions.
If the action type is index
, the action must include valid if_seq_no
and if_primary_term
arguments.
The following bulk API request uses an index
action to update an existing document in my-data-stream
.
The index
action targets document ID bfspvnIBr7VVZlfp2lqX
in the .ds-my-data-stream-000003
backing index. The action also includes the current sequence number and primary term in the respective if_seq_no
and if_primary_term
parameters.
PUT /_bulk?refresh
{ "index": { "_index": ".ds-my-data-stream-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
{ "@timestamp": "2020-12-07T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }