Use a data stream

Use a data stream

After you set up a data stream, you can do the following:

Add documents to a data stream

To add an individual document, use the index API. Ingest pipelines are supported.

  1. POST /my-data-stream/_doc/
  2. {
  3. "@timestamp": "2099-03-08T11:06:07.000Z",
  4. "user": {
  5. "id": "8a4f500d"
  6. },
  7. "message": "Login successful"
  8. }

You cannot add new documents to a data stream using the index API’s PUT /<target>/_doc/<_id> request format. To specify a document ID, use the PUT /<target>/_create/<_id> format instead. Only an op_type of create is supported.

To add multiple documents with a single request, use the bulk API. Only create actions are supported.

  1. PUT /my-data-stream/_bulk?refresh
  2. {"create":{ }}
  3. { "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
  4. {"create":{ }}
  5. { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
  6. {"create":{ }}
  7. { "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }

Search a data stream

The following search APIs support data streams:

Get statistics for a data stream

Use the data stream stats API to get statistics for one or more data streams:

  1. GET /_data_stream/my-data-stream/_stats?human=true

Manually roll over a data stream

Use the rollover API to manually roll over a data stream:

  1. POST /my-data-stream/_rollover/

Open closed backing indices

You cannot search a closed backing index, even by searching its data stream. You also cannot update or delete documents in a closed index.

To re-open a closed backing index, submit an open index API request directly to the index:

  1. POST /.ds-my-data-stream-2099.03.07-000001/_open/

To re-open all closed backing indices for a data stream, submit an open index API request to the stream:

  1. POST /my-data-stream/_open/

Reindex with a data stream

Use the reindex API to copy documents from an existing index, alias, or data stream to a data stream. Because data streams are append-only, a reindex into a data stream must use an op_type of create. A reindex cannot update existing documents in a data stream.

  1. POST /_reindex
  2. {
  3. "source": {
  4. "index": "archive"
  5. },
  6. "dest": {
  7. "index": "my-data-stream",
  8. "op_type": "create"
  9. }
  10. }

Update documents in a data stream by query

Use the update by query API to update documents in a data stream that match a provided query:

  1. POST /my-data-stream/_update_by_query
  2. {
  3. "query": {
  4. "match": {
  5. "user.id": "l7gk7f82"
  6. }
  7. },
  8. "script": {
  9. "source": "ctx._source.user.id = params.new_id",
  10. "params": {
  11. "new_id": "XgdX0NoX"
  12. }
  13. }
  14. }

Delete documents in a data stream by query

Use the delete by query API to delete documents in a data stream that match a provided query:

  1. POST /my-data-stream/_delete_by_query
  2. {
  3. "query": {
  4. "match": {
  5. "user.id": "vlb44hny"
  6. }
  7. }
  8. }

Update or delete documents in a backing index

If needed, you can update or delete documents in a data stream by sending requests to the backing index containing the document. You’ll need:

To get this information, use a search request:

  1. GET /my-data-stream/_search
  2. {
  3. "seq_no_primary_term": true,
  4. "query": {
  5. "match": {
  6. "user.id": "yWIumJd7"
  7. }
  8. }
  9. }

Response:

  1. {
  2. "took": 20,
  3. "timed_out": false,
  4. "_shards": {
  5. "total": 3,
  6. "successful": 3,
  7. "skipped": 0,
  8. "failed": 0
  9. },
  10. "hits": {
  11. "total": {
  12. "value": 1,
  13. "relation": "eq"
  14. },
  15. "max_score": 0.2876821,
  16. "hits": [
  17. {
  18. "_index": ".ds-my-data-stream-2099.03.08-000003",
  19. "_type": "_doc",
  20. "_id": "bfspvnIBr7VVZlfp2lqX",
  21. "_seq_no": 0,
  22. "_primary_term": 1,
  23. "_score": 0.2876821,
  24. "_source": {
  25. "@timestamp": "2099-03-08T11:06:07.000Z",
  26. "user": {
  27. "id": "yWIumJd7"
  28. },
  29. "message": "Login successful"
  30. }
  31. }
  32. ]
  33. }
  34. }

Backing index containing the matching document

Document ID for the document

Current sequence number for the document

Primary term for the document

To update the document, use an index API request with valid if_seq_no and if_primary_term arguments:

  1. PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
  2. {
  3. "@timestamp": "2099-03-08T11:06:07.000Z",
  4. "user": {
  5. "id": "8a4f500d"
  6. },
  7. "message": "Login successful"
  8. }

To delete the document, use the delete API:

  1. DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX

To delete or update multiple documents with a single request, use the bulk API‘s delete, index, and update actions. For index actions, include valid if_seq_no and if_primary_term arguments.

  1. PUT /_bulk?refresh
  2. { "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
  3. { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }