[Live Webinar] Next-Level O11y: Why Every DevOps Team Needs a RUM Strategy Register today!

Aggregate Data with Elasticsearch Data Frames

  • Stanislav Prihoda
  • September 21, 2020
Share article

Ingesting various events and documents into Elasticsearch is great for detailed analysis but when it comes to the common need to analyze data from a higher level, we need to aggregate the individual event data for more interesting insights. This is where Elasticsearch Data Frames come in.

Aggregation queries do a lot of this heavy lifting, but sometimes we need to prebake the aggregations for better performance and more options for analysis and machine learning.

In this lesson, we’ll explore how the Data Frames feature in Elasticsearch can help us create data aggregations for advanced analytics.

Elasticsearch Data Frame Transforms is an important feature in our toolbox that lets us define a mechanism to aggregate data by specific entities (like customer IDs, IP’s, etc) and create new summarized secondary indices.

Now, you may be asking yourself “with all the many aggregation queries already available to us, why would we want to create a new index with duplicated data?”

Why Elasticsearch Data Frames?

Transforms are definitely not here to replace aggregation queries, but rather to overcome the following limitations:

    • Performance: Complex dashboards may quickly run into performance issues as the aggregation queries get rerun every time, except if it’s cached. This can place excessive memory and compute demand on our cluster.
    • Result Limitations: As a result of performance issues, aggregations need to be bounded by various limits, but this can impact the search results to a point where we’re not finding what we want. For example, the maximum number of buckets returned, or the ordering and filtering limitations of aggregations.
    • Dimensionality of Data: For higher-level monitoring and insights, the raw data may not be very helpful. Having the data aggregated around a specific entity makes it easier to analyze and also enables us to apply machine learning to our data, like detecting outliers, for example.

How to Use Elasticsearch Data Frames

Let’s review how the Transform mechanism works.

Step 1: Define

To create a Transform you have two options either use Kibana or use the create transform API. We will later try both later, but we’ll dig deeper into the API.

There are four required parts of a Transform definition. You need to:

  • Provide an identifier (i.e. a name) which is a path parameter so it needs to comply with standard URL rules.
  • Specify the source index from which the “raw” data will be drawn
  • Define the actual transformation mechanism called a pivot which we will examine in a moment. If the term reminds you of a pivot table in Excel spreadsheets, that’s because it’s actually based on the same principle.
  • Specify the destination index to which the transformed data will be stored

Step 2: Run

Here, we have the option to run the transform one time with the Batch transform, or we could run it on a scheduled basis using Continuous transform. We can easily preview our Transform before running it with the preview transform API.

If everything is ok, the Transform can be fired off with the start transform API, which will:

  • Create the destination index (if it doesn’t exist) and infer a target mapping
  • Perform assurance validations and run the transformation on the whole index or a subset of the data
  • This process ends up with the creation of what’s called a checkpoint, which marks what the Transform has processed. Continuous transforms, which run on a scheduled basis, create an incremental checkpoint each time the transform runs.

The Transformation Definition

The definition of the actual transformation has two key elements:

  • pivot.group_by is where you define one or more aggregations. 
  • pivot.aggregations is where you define the actual aggregations you want to calculate in the buckets of data. For example, you can calculate the Average, Max, Min, Sum, Value count, custom Scripted metrics, and more.

Diving deeper into the pivot.group_by definition, we have a few ways we can choose to aggregate:

  • Terms: Aggregate data based on individual words that are found in your textual fields.
  • Histogram: Aggregate based on an interval of numeric values eg. 0-5, 5-10, etc.

Don’t worry if that doesn’t make full sense yet as now we’ll see how it works hands-on.

Hands-on Exercises

API

We will start with the API because generally, it’s more effective to understand the system and mechanics of the communication. From the data perspective, we’ll use an older dataset that was published by Elastic, the example NGINX web server logs.

The web server logs are extremely simple but are still close enough to real-world use cases.

Note: This dataset is very small and is just for our example, but the fundamentals remain valid when scaling up to actual deployments.

Let’s start by downloading the dataset to our local machine. The size is slightly over 11MB.

Download: https://github.com/elastic/examples/tree/master/Common%20Data%20Formats/nginx_json_logs

Let’s check one document to see what we’re dealing with here:

vagrant@ubuntu-xenial:~/transforms$ wget https://raw.githubusercontent.com/elastic/examples/master/Common%20Data%20Formats/nginx_json_logs/nginx_json_logs
 
vagrant@ubuntu-xenial:~$ cat nginx_json_logs | wc -l
51462
 
vagrant@ubuntu-xenial:~/transforms$ head -n 1 nginx_json_logs
{"time": "17/May/2015:08:05:32 +0000", "remote_ip": "93.180.71.3", "remote_user": "-", "request": "GET /downloads/product_1 HTTP/1.1", "response": 304, "bytes": 0, "referrer": "-", "agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"}

We will name our index for this data nginx and define its mapping. For the mapping we will map the time field as date (with custom format), remote_ip as ip, bytes as long and the rest as keywords.

vagrant@ubuntu-xenial:~/transforms$ curl --request PUT "http://localhost:9200/nginx" \
--header 'Content-Type: application/json' \
-d '{
   "settings": {
       "number_of_shards": 1,
       "number_of_replicas": 0
   },
   "mappings": {
       "properties": {
           "time": {"type":"date","format":"dd/MMM/yyyy:HH:mm:ss Z"},
           "remote_ip": {"type":"ip"},
           "remote_user": {"type":"keyword"},
           "request": {"type":"keyword"},
           "response": {"type":"keyword"},
           "bytes": {"type":"long"},
           "referrer": {"type":"keyword"},
           "agent": {"type":"keyword"}
       }
   }
}'
>>>
{"acknowledged":true,"shards_acknowledged":true,"index":"nginx"}

Great! Now we need to index our data. To practice some valuable stuff, let’s use the bulk API and index all data with just one request. Although it requires some formal preprocessing, it’s the most efficient way to index data.

The preprocessing is simple as described by the bulk format. Before each document there needs to be an action defined (in our case just {“index”:{}}) and optionally some metadata.

awk '{print "\{\"index\":\{\}\}\n" $0}' nginx_json_logs > nginx_json_logs_bulk
 
head -n 2 nginx_json_logs_bulk
>>>
{"index":{}}
{"time": "17/May/2015:08:05:32 +0000", "remote_ip": "93.180.71.3", "remote_user": "-", "request": "GET /downloads/product_1 HTTP/1.1", "response": 304, "bytes": 0, "referrer": "-", "agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"}

Perfect… we’re now ready to index the data!

This is basically a standard POST request to the _bulk endpoint. Also, notice the Content-Type header and the link to the file consisting of the processed data (prefixed with special sign @):

curl --location --request POST 'http://localhost:9200/nginx/_doc/_bulk' \
--header 'Content-Type: application/x-ndjson' \
--data-binary '@nginx_json_logs_bulk'

After few seconds, we have all our documents indexed and ready to play with:

vagrant@ubuntu-xenial:~/transforms$ curl localhost:9200/_cat/indices/nginx?v
health status index uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   nginx TNvzQrYSTQWe9f2CuYK1Qw   1   0      51462            0      3.1mb          3.1mb

Note: in relation to the data, as mentioned before, the dataset is of an older date so if you want to browse it in full be sure to set the dates in your queries or Kibana from between: ‘2015-05-16T00:00:00.000Z‘, to: ‘2015-06-04T23:30:00.000Z‘.

Now let’s dive right into the preview transform API and create transform API. Now we’ll group by the remote_ip to center our Transform aggregations around that.

We will do three things:

  • define the remote_ip field as the one by which to group by
  • define individual simple aggregations
    • we will calculate the average number of bytes for the ip
    • total sum of the transferred bytes
    • and total number of requests (i.e. count of documents)
  • more complex (and powerful) scripted aggregations can also be defined:
    • we’ll create two scripted metrics that will get the latest and earliest requests (i.e. documents) by time and return the timestamp.
    • Both are written in Elasticsearch’s own scripting language called Painless which is basically just a stripped-down version of groovy/java.
  • Also know that scripted metric aggregations all comply to the same structure:
    • init_script – happens once before every execution where you generally init your variables
    • map_script – executed once for each document to perform the core calculations (we are comparing the date values in our example)
    • combine_script – executed once for each shard. Here you can start aggregating the results (we’re passing on the state)
    • reduce_script – executed once it aggregated results from all shards that responded (that is the final dates comparison)

So to define the Transform and preview the results of our request, let’s do the following:

vagrant@ubuntu-xenial:~/transforms$ curl --location --request POST 'http://localhost:9200/_transform/_preview' \
--header 'Content-Type: application/json' \
--data-raw '{
   "source": {
       "index": "nginx"
   },
   "pivot": {
       "group_by": {
           "ip": {
               "terms": {
                   "field": "remote_ip"
               }
           }
       },
       "aggregations": {
           "bytes.avg": {
               "avg": {
                   "field": "bytes"
               }
           },
           "bytes.sum": {
               "sum": {
                   "field": "bytes"
               }
           },
           "requests.total": {
               "value_count": {
                   "field": "_id"
               }
           },
           "requests.last": {
               "scripted_metric": {
                   "init_script": "state.timestamp = 0L; state.date = '\'''\''",
                   "map_script": "def doc_date = doc['\''time'\''].getValue().toInstant().toEpochMilli();if (doc_date > state.timestamp){state.timestamp = doc_date;state.date = doc['\''time'\''].getValue();}",
                   "combine_script": "return state",
                   "reduce_script": "def date = '\'''\'';def timestamp = 0L;for (s in states) {if (s.timestamp > (timestamp)){timestamp = s.timestamp; date = s.date;}} return date"
               }
           },
           "requests.first": {
               "scripted_metric": {
                   "init_script": "state.timestamp = 1609455599000L; state.date = '\'''\''",
                   "map_script": "def doc_date = doc['\''time'\''].getValue().toInstant().toEpochMilli();if (doc_date < state.timestamp){state.timestamp = doc_date;state.date = doc['\''time'\''].getValue();}", "combine_script": "return state", "reduce_script": "def date = '\'''\'';def timestamp = 0L;for (s in states) {if (s.timestamp > (timestamp)){timestamp = s.timestamp; date = s.date;}} return date"
               }
           }
       }
   }
}'

The outcome will be similar to this. For each IP, we have the defined metrics precalculated.  Pretty cool 🙂

...
{
           "bytes": {
               "avg": 289.77777777777777,
               "sum": 5216.0
           },
           "ip": "2.108.119.198",
           "requests": {
               "total": 18.0,
               "last": "2015-05-23T14:05:53.000Z",
               "first": "2015-05-23T14:05:00.000Z"
           }
       },
       {
           "bytes": {
               "avg": 199.4,
               "sum": 77766.0
           },
           "ip": "5.9.28.15",
           "requests": {
               "total": 390.0,
               "last": "2015-06-02T17:06:59.000Z",
               "first": "2015-05-18T16:05:02.000Z"
           }
       }
...

To actually create the Transform we just need to send a PUT command to the _transform/{name} endpoint.

There are just a few other considerations to make:

  • Provide a description of your metrics which is optional.
  • And more importantly, the destination index where we’ll store the transformed data
  • Also, if you’d like to run the Transform continually (which we won’t do now) you would define a frequency and sync clauses.

We are not copying the whole Transform definition beloow (same as above), but don’t forget to replace the source and pivot fields with it:

vagrant@ubuntu-xenial:~/transforms$ curl --location --request PUT 'http://localhost:9200/_transform/nginx_transform' \
--header 'Content-Type: application/json' \
--data-raw '{
   "source": { ... },
   "pivot": { ... },
   "description": "Transferend bytes and request dates overview for remote_ip",
   "dest": {
       "index": "nginx_transformed"
   }
}'
>>>
{"acknowledged":true}

When the Transform is created, the transformation job is not started automatically, so to make it start we need to call the start Transform API like this:

vagrant@ubuntu-xenial:~/transforms$ curl --request POST 'http://localhost:9200/_transform/nginx_transform/_start'
>>>
{"acknowledged":true}

After a few moments, we should see the new index created filled with the aggregated data grouped by the remote_ip parameter.

Kibana

Via the Kibana interface, the Transform creation process is almost the same but maybe just more visually appealing so we won’t go into detail. But here are a few pointers to get you started.

Start your journey in Management > Elasticsearch > Transforms section. By now, you should see our nginx_transform we created via the API and along with some detailed statistics on its execution.

To create a new one, click the “Create a transform” button, pick a source index and start defining the Transform as we did with the API. Just be aware that the scripted aggregation would need to be defined in the Advanced pivot editor.

That’s it for Elasticsearch Data Frames! Now you can start running some truly insightful analysis.

Learn More

Where Modern Observability
and Financial Savvy Meet.

Live Webinar
Next-Level O11y: Why Every DevOps Team Needs a RUM Strategy
April 30th at 12pm ET | 6pm CET
Save my Seat