Painless examples for transforms

Painless examples for transforms

The examples that use the scripted_metric aggregation are not supported on Elasticsearch Serverless.

These examples demonstrate how to use Painless in transforms. You can learn more about the Painless scripting language in the Painless guide.

Getting top hits by using scripted metric aggregation

This snippet shows how to find the latest document, in other words the document with the latest timestamp. From a technical perspective, it helps to achieve the function of a Top hits by using scripted metric aggregation in a transform, which provides a metric output.

This example uses a scripted_metric aggregation which is not supported on Elasticsearch Serverless.

  1. "aggregations": {
  2. "latest_doc": {
  3. "scripted_metric": {
  4. "init_script": "state.timestamp_latest = 0L; state.last_doc = ''",
  5. "map_script": """
  6. def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
  7. if (current_date > state.timestamp_latest)
  8. {state.timestamp_latest = current_date;
  9. state.last_doc = new HashMap(params['_source']);}
  10. """,
  11. "combine_script": "return state",
  12. "reduce_script": """
  13. def last_doc = '';
  14. def timestamp_latest = 0L;
  15. for (s in states) {if (s.timestamp_latest > (timestamp_latest))
  16. {timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}}
  17. return last_doc
  18. """
  19. }
  20. }
  21. }

The init_script creates a long type timestamp_latest and a string type last_doc in the state object.

The map_script defines current_date based on the timestamp of the document, then compares current_date with state.timestamp_latest, finally returns state.last_doc from the shard. By using new HashMap(…) you copy the source document, this is important whenever you want to pass the full source object from one phase to the next.

The combine_script returns state from each shard.

The reduce_script iterates through the value of s.timestamp_latest returned by each shard and returns the document with the latest timestamp (last_doc). In the response, the top hit (in other words, the latest_doc) is nested below the latest_doc field.

Check the scope of scripts for detailed explanation on the respective scripts.

You can retrieve the last value in a similar way:

  1. "aggregations": {
  2. "latest_value": {
  3. "scripted_metric": {
  4. "init_script": "state.timestamp_latest = 0L; state.last_value = ''",
  5. "map_script": """
  6. def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
  7. if (current_date > state.timestamp_latest)
  8. {state.timestamp_latest = current_date;
  9. state.last_value = params['_source']['value'];}
  10. """,
  11. "combine_script": "return state",
  12. "reduce_script": """
  13. def last_value = '';
  14. def timestamp_latest = 0L;
  15. for (s in states) {if (s.timestamp_latest > (timestamp_latest))
  16. {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}}
  17. return last_value
  18. """
  19. }
  20. }
  21. }
Getting top hits by using stored scripts

You can also use the power of stored scripts to get the latest value. Stored scripts are updatable, enable collaboration, and avoid duplication across queries.

  1. Create the stored scripts:

    1. POST _scripts/last-value-map-init
    2. {
    3. "script": {
    4. "lang": "painless",
    5. "source": """
    6. state.timestamp_latest = 0L; state.last_value = ''
    7. """
    8. }
    9. }
    10. POST _scripts/last-value-map
    11. {
    12. "script": {
    13. "lang": "painless",
    14. "source": """
    15. def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
    16. if (current_date > state.timestamp_latest)
    17. {state.timestamp_latest = current_date;
    18. state.last_value = doc[params['key']].value;}
    19. """
    20. }
    21. }
    22. POST _scripts/last-value-combine
    23. {
    24. "script": {
    25. "lang": "painless",
    26. "source": """
    27. return state
    28. """
    29. }
    30. }
    31. POST _scripts/last-value-reduce
    32. {
    33. "script": {
    34. "lang": "painless",
    35. "source": """
    36. def last_value = '';
    37. def timestamp_latest = 0L;
    38. for (s in states) {if (s.timestamp_latest > (timestamp_latest))
    39. {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}}
    40. return last_value
    41. """
    42. }
    43. }
  2. Use the stored scripts in a scripted metric aggregation.

    1. "aggregations":{
    2. "latest_value":{
    3. "scripted_metric":{
    4. "init_script":{
    5. "id":"last-value-map-init"
    6. },
    7. "map_script":{
    8. "id":"last-value-map",
    9. "params":{
    10. "key":"field_with_last_value"
    11. }
    12. },
    13. "combine_script":{
    14. "id":"last-value-combine"
    15. },
    16. "reduce_script":{
    17. "id":"last-value-reduce"
    18. }

    The parameter field_with_last_value can be set any field that you want the latest value for.

Getting time features by using aggregations

This snippet shows how to extract time based features by using Painless in a transform. The snippet uses an index where @timestamp is defined as a date type field.

  1. "aggregations": {
  2. "avg_hour_of_day": {
  3. "avg":{
  4. "script": {
  5. "source": """
  6. ZonedDateTime date = doc['@timestamp'].value;
  7. return date.getHour();
  8. """
  9. }
  10. }
  11. },
  12. "avg_month_of_year": {
  13. "avg":{
  14. "script": {
  15. "source": """
  16. ZonedDateTime date = doc['@timestamp'].value;
  17. return date.getMonthValue();
  18. """
  19. }
  20. }
  21. },
  22. ...
  23. }

Name of the aggregation.

Contains the Painless script that returns the hour of the day.

Sets date based on the timestamp of the document.

Returns the hour value from date.

Name of the aggregation.

Contains the Painless script that returns the month of the year.

Sets date based on the timestamp of the document.

Returns the month value from date.

Getting duration by using bucket script

This example shows you how to get the duration of a session by client IP from a data log by using bucket script. The example uses the Kibana sample web logs dataset.

  1. resp = client.transform.put_transform(
  2. transform_id="data_log",
  3. source={
  4. "index": "kibana_sample_data_logs"
  5. },
  6. dest={
  7. "index": "data-logs-by-client"
  8. },
  9. pivot={
  10. "group_by": {
  11. "machine.os": {
  12. "terms": {
  13. "field": "machine.os.keyword"
  14. }
  15. },
  16. "machine.ip": {
  17. "terms": {
  18. "field": "clientip"
  19. }
  20. }
  21. },
  22. "aggregations": {
  23. "time_frame.lte": {
  24. "max": {
  25. "field": "timestamp"
  26. }
  27. },
  28. "time_frame.gte": {
  29. "min": {
  30. "field": "timestamp"
  31. }
  32. },
  33. "time_length": {
  34. "bucket_script": {
  35. "buckets_path": {
  36. "min": "time_frame.gte.value",
  37. "max": "time_frame.lte.value"
  38. },
  39. "script": "params.max - params.min"
  40. }
  41. }
  42. }
  43. },
  44. )
  45. print(resp)
  1. const response = await client.transform.putTransform({
  2. transform_id: "data_log",
  3. source: {
  4. index: "kibana_sample_data_logs",
  5. },
  6. dest: {
  7. index: "data-logs-by-client",
  8. },
  9. pivot: {
  10. group_by: {
  11. "machine.os": {
  12. terms: {
  13. field: "machine.os.keyword",
  14. },
  15. },
  16. "machine.ip": {
  17. terms: {
  18. field: "clientip",
  19. },
  20. },
  21. },
  22. aggregations: {
  23. "time_frame.lte": {
  24. max: {
  25. field: "timestamp",
  26. },
  27. },
  28. "time_frame.gte": {
  29. min: {
  30. field: "timestamp",
  31. },
  32. },
  33. time_length: {
  34. bucket_script: {
  35. buckets_path: {
  36. min: "time_frame.gte.value",
  37. max: "time_frame.lte.value",
  38. },
  39. script: "params.max - params.min",
  40. },
  41. },
  42. },
  43. },
  44. });
  45. console.log(response);
  1. PUT _transform/data_log
  2. {
  3. "source": {
  4. "index": "kibana_sample_data_logs"
  5. },
  6. "dest": {
  7. "index": "data-logs-by-client"
  8. },
  9. "pivot": {
  10. "group_by": {
  11. "machine.os": {"terms": {"field": "machine.os.keyword"}},
  12. "machine.ip": {"terms": {"field": "clientip"}}
  13. },
  14. "aggregations": {
  15. "time_frame.lte": {
  16. "max": {
  17. "field": "timestamp"
  18. }
  19. },
  20. "time_frame.gte": {
  21. "min": {
  22. "field": "timestamp"
  23. }
  24. },
  25. "time_length": {
  26. "bucket_script": {
  27. "buckets_path": {
  28. "min": "time_frame.gte.value",
  29. "max": "time_frame.lte.value"
  30. },
  31. "script": "params.max - params.min"
  32. }
  33. }
  34. }
  35. }
  36. }

To define the length of the sessions, we use a bucket script.

The bucket path is a map of script variables and their associated path to the buckets you want to use for the variable. In this particular case, min and max are variables mapped to time_frame.gte.value and time_frame.lte.value.

Finally, the script substracts the start date of the session from the end date which results in the duration of the session.

Counting HTTP responses by using scripted metric aggregation

You can count the different HTTP response types in a web log data set by using scripted metric aggregation as part of the transform. You can achieve a similar function with filter aggregations, check the Finding suspicious client IPs example for details.

The example below assumes that the HTTP response codes are stored as keywords in the response field of the documents.

This example uses a scripted_metric aggregation which is not supported on Elasticsearch Serverless.

  1. "aggregations": {
  2. "responses.counts": {
  3. "scripted_metric": {
  4. "init_script": "state.responses = ['error':0L,'success':0L,'other':0L]",
  5. "map_script": """
  6. def code = doc['response.keyword'].value;
  7. if (code.startsWith('5') || code.startsWith('4')) {
  8. state.responses.error += 1 ;
  9. } else if(code.startsWith('2')) {
  10. state.responses.success += 1;
  11. } else {
  12. state.responses.other += 1;
  13. }
  14. """,
  15. "combine_script": "state.responses",
  16. "reduce_script": """
  17. def counts = ['error': 0L, 'success': 0L, 'other': 0L];
  18. for (responses in states) {
  19. counts.error += responses['error'];
  20. counts.success += responses['success'];
  21. counts.other += responses['other'];
  22. }
  23. return counts;
  24. """
  25. }
  26. },
  27. ...
  28. }

The aggregations object of the transform that contains all aggregations.

Object of the scripted_metric aggregation.

This scripted_metric performs a distributed operation on the web log data to count specific types of HTTP responses (error, success, and other).

The init_script creates a responses array in the state object with three properties (error, success, other) with long data type.

The map_script defines code based on the response.keyword value of the document, then it counts the errors, successes, and other responses based on the first digit of the responses.

The combine_script returns state.responses from each shard.

The reduce_script creates a counts array with the error, success, and other properties, then iterates through the value of responses returned by each shard and assigns the different response types to the appropriate properties of the counts object; error responses to the error counts, success responses to the success counts, and other responses to the other counts. Finally, returns the counts array with the response counts.

Comparing indices by using scripted metric aggregations

This example shows how to compare the content of two indices by a transform that uses a scripted metric aggregation.

This example uses a scripted_metric aggregation which is not supported on Elasticsearch Serverless.

  1. resp = client.transform.preview_transform(
  2. id="index_compare",
  3. source={
  4. "index": [
  5. "index1",
  6. "index2"
  7. ],
  8. "query": {
  9. "match_all": {}
  10. }
  11. },
  12. dest={
  13. "index": "compare"
  14. },
  15. pivot={
  16. "group_by": {
  17. "unique-id": {
  18. "terms": {
  19. "field": "<unique-id-field>"
  20. }
  21. }
  22. },
  23. "aggregations": {
  24. "compare": {
  25. "scripted_metric": {
  26. "map_script": "state.doc = new HashMap(params['_source'])",
  27. "combine_script": "return state",
  28. "reduce_script": " \n if (states.size() != 2) {\n return \"count_mismatch\"\n }\n if (states.get(0).equals(states.get(1))) {\n return \"match\"\n } else {\n return \"mismatch\"\n }\n "
  29. }
  30. }
  31. }
  32. },
  33. )
  34. print(resp)
  1. const response = await client.transform.previewTransform({
  2. id: "index_compare",
  3. source: {
  4. index: ["index1", "index2"],
  5. query: {
  6. match_all: {},
  7. },
  8. },
  9. dest: {
  10. index: "compare",
  11. },
  12. pivot: {
  13. group_by: {
  14. "unique-id": {
  15. terms: {
  16. field: "<unique-id-field>",
  17. },
  18. },
  19. },
  20. aggregations: {
  21. compare: {
  22. scripted_metric: {
  23. map_script: "state.doc = new HashMap(params['_source'])",
  24. combine_script: "return state",
  25. reduce_script:
  26. ' \n if (states.size() != 2) {\n return "count_mismatch"\n }\n if (states.get(0).equals(states.get(1))) {\n return "match"\n } else {\n return "mismatch"\n }\n ',
  27. },
  28. },
  29. },
  30. },
  31. });
  32. console.log(response);
  1. POST _transform/_preview
  2. {
  3. "id" : "index_compare",
  4. "source" : {
  5. "index" : [
  6. "index1",
  7. "index2"
  8. ],
  9. "query" : {
  10. "match_all" : { }
  11. }
  12. },
  13. "dest" : {
  14. "index" : "compare"
  15. },
  16. "pivot" : {
  17. "group_by" : {
  18. "unique-id" : {
  19. "terms" : {
  20. "field" : "<unique-id-field>"
  21. }
  22. }
  23. },
  24. "aggregations" : {
  25. "compare" : {
  26. "scripted_metric" : {
  27. "map_script" : "state.doc = new HashMap(params['_source'])",
  28. "combine_script" : "return state",
  29. "reduce_script" : """
  30. if (states.size() != 2) {
  31. return "count_mismatch"
  32. }
  33. if (states.get(0).equals(states.get(1))) {
  34. return "match"
  35. } else {
  36. return "mismatch"
  37. }
  38. """
  39. }
  40. }
  41. }
  42. }
  43. }

The indices referenced in the source object are compared to each other.

The dest index contains the results of the comparison.

The group_by field needs to be a unique identifier for each document.

Object of the scripted_metric aggregation.

The map_script defines doc in the state object. By using new HashMap(…) you copy the source document, this is important whenever you want to pass the full source object from one phase to the next.

The combine_script returns state from each shard.

The reduce_script checks if the size of the indices are equal. If they are not equal, than it reports back a count_mismatch. Then it iterates through all the values of the two indices and compare them. If the values are equal, then it returns a match, otherwise returns a mismatch.

Getting web session details by using scripted metric aggregation

This example shows how to derive multiple features from a single transaction. Let’s take a look on the example source document from the data:

Source document

  1. {
  2. "_index":"apache-sessions",
  3. "_type":"_doc",
  4. "_id":"KvzSeGoB4bgw0KGbE3wP",
  5. "_score":1.0,
  6. "_source":{
  7. "@timestamp":1484053499256,
  8. "apache":{
  9. "access":{
  10. "sessionid":"571604f2b2b0c7b346dc685eeb0e2306774a63c2",
  11. "url":"http://www.leroymerlin.fr/v3/search/search.do?keyword=Carrelage%20salle%20de%20bain",
  12. "path":"/v3/search/search.do",
  13. "query":"keyword=Carrelage%20salle%20de%20bain",
  14. "referrer":"http://www.leroymerlin.fr/v3/p/produits/carrelage-parquet-sol-souple/carrelage-sol-et-mur/decor-listel-et-accessoires-carrelage-mural-l1308217717?resultOffset=0&resultLimit=51&resultListShape=MOSAIC&priceStyle=SALEUNIT_PRICE",
  15. "user_agent":{
  16. "original":"Mobile Safari 10.0 Mac OS X (iPad) Apple Inc.",
  17. "os_name":"Mac OS X (iPad)"
  18. },
  19. "remote_ip":"0337b1fa-5ed4-af81-9ef4-0ec53be0f45d",
  20. "geoip":{
  21. "country_iso_code":"FR",
  22. "location":{
  23. "lat":48.86,
  24. "lon":2.35
  25. }
  26. },
  27. "response_code":200,
  28. "method":"GET"
  29. }
  30. }
  31. }
  32. }
  33. ...

By using the sessionid as a group-by field, you are able to enumerate events through the session and get more details of the session by using scripted metric aggregation.

This example uses a scripted_metric aggregation which is not supported on Elasticsearch Serverless.

  1. POST _transform/_preview
  2. {
  3. "source": {
  4. "index": "apache-sessions"
  5. },
  6. "pivot": {
  7. "group_by": {
  8. "sessionid": {
  9. "terms": {
  10. "field": "apache.access.sessionid"
  11. }
  12. }
  13. },
  14. "aggregations": {
  15. "distinct_paths": {
  16. "cardinality": {
  17. "field": "apache.access.path"
  18. }
  19. },
  20. "num_pages_viewed": {
  21. "value_count": {
  22. "field": "apache.access.url"
  23. }
  24. },
  25. "session_details": {
  26. "scripted_metric": {
  27. "init_script": "state.docs = []",
  28. "map_script": """
  29. Map span = [
  30. '@timestamp':doc['@timestamp'].value,
  31. 'url':doc['apache.access.url'].value,
  32. 'referrer':doc['apache.access.referrer'].value
  33. ];
  34. state.docs.add(span)
  35. """,
  36. "combine_script": "return state.docs;",
  37. "reduce_script": """
  38. def all_docs = [];
  39. for (s in states) {
  40. for (span in s) {
  41. all_docs.add(span);
  42. }
  43. }
  44. all_docs.sort((HashMap o1, HashMap o2)->o1['@timestamp'].toEpochMilli().compareTo(o2['@timestamp'].toEpochMilli()));
  45. def size = all_docs.size();
  46. def min_time = all_docs[0]['@timestamp'];
  47. def max_time = all_docs[size-1]['@timestamp'];
  48. def duration = max_time.toEpochMilli() - min_time.toEpochMilli();
  49. def entry_page = all_docs[0]['url'];
  50. def exit_path = all_docs[size-1]['url'];
  51. def first_referrer = all_docs[0]['referrer'];
  52. def ret = new HashMap();
  53. ret['first_time'] = min_time;
  54. ret['last_time'] = max_time;
  55. ret['duration'] = duration;
  56. ret['entry_page'] = entry_page;
  57. ret['exit_path'] = exit_path;
  58. ret['first_referrer'] = first_referrer;
  59. return ret;
  60. """
  61. }
  62. }
  63. }
  64. }
  65. }

The data is grouped by sessionid.

The aggregations counts the number of paths and enumerate the viewed pages during the session.

The init_script creates an array type doc in the state object.

The map_script defines a span array with a timestamp, a URL, and a referrer value which are based on the corresponding values of the document, then adds the value of the span array to the doc object.

The combine_script returns state.docs from each shard.

The reduce_script defines various objects like min_time, max_time, and duration based on the document fields, then declares a ret object, and copies the source document by using new HashMap (). Next, the script defines first_time, last_time, duration and other fields inside the ret object based on the corresponding object defined earlier, finally returns ret.

The API call results in a similar response:

  1. {
  2. "num_pages_viewed" : 2.0,
  3. "session_details" : {
  4. "duration" : 100300001,
  5. "first_referrer" : "https://www.bing.com/",
  6. "entry_page" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463",
  7. "first_time" : "2017-01-10T21:22:52.982Z",
  8. "last_time" : "2017-01-10T21:25:04.356Z",
  9. "exit_path" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463?__result-wrapper?pageTemplate=Famille%2FMat%C3%A9riaux+et+menuiserie&resultOffset=0&resultLimit=50&resultListShape=PLAIN&nomenclatureId=17942&priceStyle=SALEUNIT_PRICE&fcr=1&*4294718806=4294718806&*14072=14072&*4294718593=4294718593&*17942=17942"
  10. },
  11. "distinct_paths" : 1.0,
  12. "sessionid" : "000046f8154a80fd89849369c984b8cc9d795814"
  13. },
  14. {
  15. "num_pages_viewed" : 10.0,
  16. "session_details" : {
  17. "duration" : 343100405,
  18. "first_referrer" : "https://www.google.fr/",
  19. "entry_page" : "http://www.leroymerlin.fr/",
  20. "first_time" : "2017-01-10T16:57:39.937Z",
  21. "last_time" : "2017-01-10T17:03:23.049Z",
  22. "exit_path" : "http://www.leroymerlin.fr/v3/p/produits/porte-de-douche-coulissante-adena-e168578"
  23. },
  24. "distinct_paths" : 8.0,
  25. "sessionid" : "000087e825da1d87a332b8f15fa76116c7467da6"
  26. }
  27. ...