Centralized Log Management: Why It’s Essential for System Security in a Hybrid Workforce
Remote work increased due to Covid-19. Now heading into 2023, remote or hybrid workplaces are here to stay. Surveys show 62% of US workers report working…
Ingesting various events and documents into Elasticsearch is great for detailed analysis but when it comes to the common need to analyze data from a higher level, we need to aggregate the individual event data for more interesting insights. This is where Elasticsearch Data Frames come in.
Aggregation queries do a lot of this heavy lifting, but sometimes we need to prebake the aggregations for better performance and more options for analysis and machine learning.
In this lesson, we’ll explore how the Data Frames feature in Elasticsearch can help us create data aggregations for advanced analytics.
Elasticsearch Data Frame Transforms is an important feature in our toolbox that lets us define a mechanism to aggregate data by specific entities (like customer IDs, IP’s, etc) and create new summarized secondary indices.
Now, you may be asking yourself “with all the many aggregation queries already available to us, why would we want to create a new index with duplicated data?”
Transforms are definitely not here to replace aggregation queries, but rather to overcome the following limitations:
Let’s review how the Transform mechanism works.
Step 1: Define
To create a Transform you have two options either use Kibana or use the create transform API. We will later try both later, but we’ll dig deeper into the API.
There are four required parts of a Transform definition. You need to:
Step 2: Run
Here, we have the option to run the transform one time with the Batch transform, or we could run it on a scheduled basis using Continuous transform. We can easily preview our Transform before running it with the preview transform API.
If everything is ok, the Transform can be fired off with the start transform API, which will:
The definition of the actual transformation has two key elements:
Diving deeper into the pivot.group_by definition, we have a few ways we can choose to aggregate:
Don’t worry if that doesn’t make full sense yet as now we’ll see how it works hands-on.
We will start with the API because generally, it’s more effective to understand the system and mechanics of the communication. From the data perspective, we’ll use an older dataset that was published by Elastic, the example NGINX web server logs.
The web server logs are extremely simple but are still close enough to real-world use cases.
Note: This dataset is very small and is just for our example, but the fundamentals remain valid when scaling up to actual deployments.
Let’s start by downloading the dataset to our local machine. The size is slightly over 11MB.
Download: https://github.com/elastic/examples/tree/master/Common%20Data%20Formats/nginx_json_logs
Let’s check one document to see what we’re dealing with here:
[email protected]:~/transforms$ wget https://raw.githubusercontent.com/elastic/examples/master/Common%20Data%20Formats/nginx_json_logs/nginx_json_logs [email protected]:~$ cat nginx_json_logs | wc -l 51462 [email protected]:~/transforms$ head -n 1 nginx_json_logs {"time": "17/May/2015:08:05:32 +0000", "remote_ip": "93.180.71.3", "remote_user": "-", "request": "GET /downloads/product_1 HTTP/1.1", "response": 304, "bytes": 0, "referrer": "-", "agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"}
We will name our index for this data nginx and define its mapping. For the mapping we will map the time field as date (with custom format), remote_ip as ip, bytes as long and the rest as keywords.
[email protected]:~/transforms$ curl --request PUT "http://localhost:9200/nginx" \ --header 'Content-Type: application/json' \ -d '{ "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "properties": { "time": {"type":"date","format":"dd/MMM/yyyy:HH:mm:ss Z"}, "remote_ip": {"type":"ip"}, "remote_user": {"type":"keyword"}, "request": {"type":"keyword"}, "response": {"type":"keyword"}, "bytes": {"type":"long"}, "referrer": {"type":"keyword"}, "agent": {"type":"keyword"} } } }' >>> {"acknowledged":true,"shards_acknowledged":true,"index":"nginx"}
Great! Now we need to index our data. To practice some valuable stuff, let’s use the bulk API and index all data with just one request. Although it requires some formal preprocessing, it’s the most efficient way to index data.
The preprocessing is simple as described by the bulk format. Before each document there needs to be an action defined (in our case just {“index”:{}}) and optionally some metadata.
awk '{print "\{\"index\":\{\}\}\n" $0}' nginx_json_logs > nginx_json_logs_bulk head -n 2 nginx_json_logs_bulk >>> {"index":{}} {"time": "17/May/2015:08:05:32 +0000", "remote_ip": "93.180.71.3", "remote_user": "-", "request": "GET /downloads/product_1 HTTP/1.1", "response": 304, "bytes": 0, "referrer": "-", "agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"}
Perfect… we’re now ready to index the data!
This is basically a standard POST request to the _bulk endpoint. Also, notice the Content-Type header and the link to the file consisting of the processed data (prefixed with special sign @):
curl --location --request POST 'http://localhost:9200/nginx/_doc/_bulk' \ --header 'Content-Type: application/x-ndjson' \ --data-binary '@nginx_json_logs_bulk'
After few seconds, we have all our documents indexed and ready to play with:
[email protected]:~/transforms$ curl localhost:9200/_cat/indices/nginx?v health status index uuid pri rep docs.count docs.deleted store.size pri.store.size green open nginx TNvzQrYSTQWe9f2CuYK1Qw 1 0 51462 0 3.1mb 3.1mb
Note: in relation to the data, as mentioned before, the dataset is of an older date so if you want to browse it in full be sure to set the dates in your queries or Kibana from between: ‘2015-05-16T00:00:00.000Z‘, to: ‘2015-06-04T23:30:00.000Z‘.
Now let’s dive right into the preview transform API and create transform API. Now we’ll group by the remote_ip to center our Transform aggregations around that.
We will do three things:
So to define the Transform and preview the results of our request, let’s do the following:
[email protected]:~/transforms$ curl --location --request POST 'http://localhost:9200/_transform/_preview' \ --header 'Content-Type: application/json' \ --data-raw '{ "source": { "index": "nginx" }, "pivot": { "group_by": { "ip": { "terms": { "field": "remote_ip" } } }, "aggregations": { "bytes.avg": { "avg": { "field": "bytes" } }, "bytes.sum": { "sum": { "field": "bytes" } }, "requests.total": { "value_count": { "field": "_id" } }, "requests.last": { "scripted_metric": { "init_script": "state.timestamp = 0L; state.date = '\'''\''", "map_script": "def doc_date = doc['\''time'\''].getValue().toInstant().toEpochMilli();if (doc_date > state.timestamp){state.timestamp = doc_date;state.date = doc['\''time'\''].getValue();}", "combine_script": "return state", "reduce_script": "def date = '\'''\'';def timestamp = 0L;for (s in states) {if (s.timestamp > (timestamp)){timestamp = s.timestamp; date = s.date;}} return date" } }, "requests.first": { "scripted_metric": { "init_script": "state.timestamp = 1609455599000L; state.date = '\'''\''", "map_script": "def doc_date = doc['\''time'\''].getValue().toInstant().toEpochMilli();if (doc_date < state.timestamp){state.timestamp = doc_date;state.date = doc['\''time'\''].getValue();}", "combine_script": "return state", "reduce_script": "def date = '\'''\'';def timestamp = 0L;for (s in states) {if (s.timestamp > (timestamp)){timestamp = s.timestamp; date = s.date;}} return date" } } } } }'
The outcome will be similar to this. For each IP, we have the defined metrics precalculated. Pretty cool 🙂
... { "bytes": { "avg": 289.77777777777777, "sum": 5216.0 }, "ip": "2.108.119.198", "requests": { "total": 18.0, "last": "2015-05-23T14:05:53.000Z", "first": "2015-05-23T14:05:00.000Z" } }, { "bytes": { "avg": 199.4, "sum": 77766.0 }, "ip": "5.9.28.15", "requests": { "total": 390.0, "last": "2015-06-02T17:06:59.000Z", "first": "2015-05-18T16:05:02.000Z" } } ...
To actually create the Transform we just need to send a PUT command to the _transform/{name} endpoint.
There are just a few other considerations to make:
We are not copying the whole Transform definition beloow (same as above), but don’t forget to replace the source and pivot fields with it:
[email protected]:~/transforms$ curl --location --request PUT 'http://localhost:9200/_transform/nginx_transform' \ --header 'Content-Type: application/json' \ --data-raw '{ "source": { ... }, "pivot": { ... }, "description": "Transferend bytes and request dates overview for remote_ip", "dest": { "index": "nginx_transformed" } }' >>> {"acknowledged":true}
When the Transform is created, the transformation job is not started automatically, so to make it start we need to call the start Transform API like this:
[email protected]:~/transforms$ curl --request POST 'http://localhost:9200/_transform/nginx_transform/_start' >>> {"acknowledged":true}
After a few moments, we should see the new index created filled with the aggregated data grouped by the remote_ip parameter.
Via the Kibana interface, the Transform creation process is almost the same but maybe just more visually appealing so we won’t go into detail. But here are a few pointers to get you started.
Start your journey in Management > Elasticsearch > Transforms section. By now, you should see our nginx_transform we created via the API and along with some detailed statistics on its execution.
To create a new one, click the “Create a transform” button, pick a source index and start defining the Transform as we did with the API. Just be aware that the scripted aggregation would need to be defined in the Advanced pivot editor.
That’s it for Elasticsearch Data Frames! Now you can start running some truly insightful analysis.