[Live Webinar] Next-Level O11y: Why Every DevOps Team Needs a RUM Strategy Register today!

A Practical Guide to Logstash: Shipping Logs to Coralogix with Logstash

  • Amir Raz
  • July 19, 2020
Share article
logstash coralogix

Logstash is a tool to collect, process, and forward events and messages and this log monitoring Logstash tutorial will get you started quickly. It was created by Jordan Sissel who, with a background in operations and system administration, found himself constantly managing huge volumes of log data that really needed a centralized system to aggregate and manage them. Logstash was born under this premise and in 2013 Sissel teamed up with Elasticsearch.

The collection is accomplished via configurable input plugins including raw socket/packet communication, file tailing, and several message bus clients. Once an input plugin has collected data it can be processed by any number of filter plugins that modify and annotate the event data. Finally, Logstash logs and routes events to output plugins that can forward the data to a variety of external programs including Elasticsearch, local files, and several message bus implementations.

Logstash Configuration File

The Logstash configuration file specifies which plugins are to be used and how. You can reference event fields in a configuration and use conditionals to process events when they meet certain criteria. When running Logstash, you use -f to specify your config file.

The configuration file has a section for each type of plugin you want to add to the event processing pipeline:

input {
  ...
 }
filter {
  ...
 }
output {
  ...
}

Multiple filters can be applied in the order of their appearance in the configuration file and within each section, we list the configuration options for the specific plugin.

Settings vary according to the individual plugin. A plugin can require that a value for a setting be of a certain type. The following are the supported types.

Inputs and outputs support codec plugins that enable you to encode or decode the data as it enters or exits the pipeline without having to use a separate filter.

RUBY filter plugin

Execute ruby code. This filter accepts inline ruby code or a ruby file, by using ruby code we can define our application name as well as the subsystem name (by changing the values in it). It can be defined by using the following structure:

filter {
  ruby {code => "
                event.set('[@metadata][application]', event.get('application'))
                event.set('[@metadata][subsystem]', event.get('subsystem'))
                event.set('[@metadata][event]', event.to_json)
                "}
}
#If you wish to make application and subsystem names static you can replace the event.get
string with a plain string.

 

HTTP output plugin

Most of our examples will use Coralogix Logstash output plugin. The plugin configuration has the following structure:

output {
  http {
        url => "<your cluster singles url>"
        http_method => "post"
        headers => ["private_key", "<private key>"]
        format => "json_batch"
        codec => "json"
        mapping => {
            "applicationName" => "%{[@metadata][application]}"
            "subsystemName" => "%{[@metadata][subsystem]}"
            "computerName" => "%{host}"
            "text" => "%{[@metadata][event]}"
        }
        http_compression => true
        automatic_retries => 5
        retry_non_idempotent => true
        connect_timeout => 30
        keepalive => false
        }
}

Examples

The reminder of this document will focus on providing examples of Logstash working configurations.

Example 1

#This implementation uses the

beats input plugin

. It will listen on port 5044 for beats 
#traffic 
input {
   beats {
     port => 5044
   }
}

RUBY filter plugin

#in this example we set the subsystem name to the value of the beat metadata field

filter {
  ruby {code => "
                event.set('[@metadata][application]', 'my_app')
                event.set('[@metadata][subsystem]', '[@metadata][beat]')
                event.set('[@metadata][event]', event.to_json)
                "}
}

HTTP output plugin

output {
  http {
        url => "<your cluster singles url>"
        http_method => "post"
        headers => ["private_key", "<private key>"]
        format => "json_batch"
        codec => "json"
        mapping => {
            "applicationName" => "%{[@metadata][application]}"
            "subsystemName" => "%{[@metadata][subsystem]}"
            "computerName" => "%{host}"
            "text" => "%{[@metadata][event]}"
        }
        http_compression => true
        automatic_retries => 5
        retry_non_idempotent => true
        connect_timeout => 30
        keepalive => false
        }
}

Example 2

# This tcp input plugin listens to port 6000. It uses the

json_line codec

 to identify jsons  
# in a stream of jsons separated by newlines
input{
   tcp{
    port => 6000  1 
    codec => json_lines
   }
}

RUBY filter plugin

filter {
  ruby {code => "
                event.set('[@metadata][application]', 'my_app')
                event.set('[@metadata][subsystem]', 'my_sub')
                event.set('[@metadata][event]', event.to_json)
                "}
}

HTTP output plugin

output {
  http {
        url => "<your cluster singles url>"
        http_method => "post"
        headers => ["private_key", "<private key>"]
        format => "json_batch"
        codec => "json"
        mapping => {
            "applicationName" => "%{[@metadata][application]}"
            "subsystemName" => "%{[@metadata][subsystem]}"
            "computerName" => "%{host}"
            "text" => "%{[@metadata][event]}"
        }
        http_compression => true
        automatic_retries => 5
        retry_non_idempotent => true
        connect_timeout => 30
        keepalive => false
        }
}

Example 3

#This example uses the j

dbc input plugin

. It can ingest data from any DB with a JDBC interface. 
#The plugin doesn’t come with drivers, hence the 

driver_classs

 and

driver_library

configuration #options.
input {
  jdbc {
    jdbc_driver_library => "c:/logstash/mssql-jdbc-7.2.2.jre8.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://172.29.85.103:1433;databaseName=AVALogger;"
    jdbc_user => "avalogger"
    jdbc_password => "ZAQ!2wsx"
    connection_retry_attempts => 5
    statement => "SELECT * FROM AVALogger.dbo.parsed_data WHERE id > :sql_last_value"
    use_column_value => true
    tracking_column => "id"
    schedule => "* * * * *"
    last_run_metadata_path => "c:/logstash/logstash_jdbc_last_run"
  }
}
#The 

json filter plugin

 takes an existing field which contains JSON and expands it into an 
#actual data structure within the Logstash event. In this case it will take the content of 
#‘extra_data’, an original DB column, skip_on_invalid_json allows the filter to skip 
#non-json or non-valid json field values without warnings or added logic.
filter {
   json {
    source => "extra_data"
    target => "extra_data"
    skip_on_invalid_json => true
   }
}

RUBY filter plugin

filter {
  ruby {code => "
                event.set('[@metadata][application]', 'my_app')
                event.set('[@metadata][subsystem]', 'my_sub')
                event.set('[@metadata][event]', event.to_json)
                "}
}

HTTP output plugin

output {
  http {
        url => "<your cluster singles url>"
        http_method => "post"
        headers => ["private_key", "<private key>"]
        format => "json_batch"
        codec => "json"
        mapping => {
            "applicationName" => "%{[@metadata][application]}"
            "subsystemName" => "%{[@metadata][subsystem]}"
            "computerName" => "%{host}"
            "text" => "%{[@metadata][event]}"
        }
        http_compression => true
        automatic_retries => 5
        retry_non_idempotent => true
        connect_timeout => 30
        keepalive => false
        }
}

Example 4

#In this example the

File input plugin

 is being used. It Stream events from files, normally by 
#tailing them (this is configurable). All logs will get A field called stype’ added to them 
#with the value ‘production-log’. ‘path’ indicates where to read the logs from and the 

json #codec

. The codec decodes (via inputs) and encodes (via outputs) full JSON messages.
input {
  file {
    type    => "production-log"
    path    => "/home/admin/apps/fiverr/current/log/production.log"
    codec   => "json"
  }
}
#This output will send logs to a Redis queue using the 

Redis output plugin

. It is using an ‘if” 
#statement to direct only logos with type:”production-log" to the output.
output {
  if [type] == "production-log" {
    redis {
        host => "192.168.0.2"
        port => 6379
        data_type => "list"
        key => "logstash-production-log"
        codec   => "json_lines"
    }
  }
}

Example 5

#Like previous examples, the

File input plugin

 is being used. This time with the ‘exclude’ 
#parameters that indicates which files to ignore as input. The multiline codec collapses 
#multiline messages and merges them into a single event. In this example it will start a 
#new event every time it recognizes a string of word characters that ends with 4 digits, followed 
#by what looks like a time stamp in the form of tt:tt:tt.mmmmmm. This is the regex associated 
#with the ‘pattern’. Negate “true” means that a message not matching the pattern will 
#constitute a match of the multiline filter and the what config parameter will 
#be applied #and indicate the relation to the multi-line event.
input {
  file {
    path => "/mnt/data/logs/pipeline/*.1"
    exclude => "*.gz"
    codec => multiline {
      pattern => "\w+\d{4} \d{2}\:\d{2}\:\d{2}\.\d{6}"
      negate => true
      what => previous
    }
  }
}
#The 

grok filter plugin

 parses arbitrary text and structures it. In this example it will parse #the event message field into additional log fields designated by the regex named #groups. It will get the rest of the log into a field named log and will than remove the #original message field.
filter {
   grok {
    match => { "message" => "(?<loglevel>[A-Z]{1})(?<time>%{MONTHNUM}%{MONTHDAY} %{TIME}) %{POSINT:process}-%{POSINT:thread} %{DATA:function}:%{POSINT:line}] %{GREEDYDATA:log}" }
    remove_field => [ "message" ]
   }
# Next in line to process the event is the 

json filter plugin

 like in example 3
   json {
    source => "log"
    target => "log"
    skip_on_invalid_json => true
   }
#The 

mutate filter plugin

 is used to rename, remove, replace, and modify fields in your events. #The order of mutations is kept by using different blocks.  
   mutate {
#This section creates a parent field called message for all these different fields.
    rename => {
      "loglevel" => "[message][loglevel]"
      "process" => "[message][process]"
      "thread" => "[message][thread]"
      "function" => "[message][function]"
      "line" => "[message][line]"
      "log" => "[message][log]"
      "message" => "[message][message]"
     }
#Copies source =>destination
     copy => {
      "time" => "[message][message][time]"
      "path" => "[message][log_path]"
     }
   }
  mutate {
#Converts the field type
    convert => {
      "[message][message][process]" => "integer"
      "[message][message][thread]" => "integer"
      "[message][message][line]" => "integer"
    }
  }
#The 

truncate filter plugin

 allows you to truncate fields longer than a given length.
  truncate {
    fields => [ "time" ]
    length_bytes => 17
  }
#The 

date filter plugin

 is used for parsing dates from fields, and then using that date or #timestamp as the logstash timestamp for the event. In this case there is only one #format to look for in the field time. It will update the default @timestamp field for the #event and then remove the field time from the event.
  date {
    match => [ "time", "MMdd HH:mm:ss.SSS" ]
    remove_field => [ "time" ]
  }

RUBY filter plugin

filter {
  ruby {code => "
                event.set('[@metadata][application]', 'my_app')
                event.set('[@metadata][subsystem]', 'my_sub')
                event.set('[@metadata][event]', event.to_json)
                "}
}

HTTP output plugin

output {
  http {
        url => "<your cluster singles url>"
        http_method => "post"
        headers => ["private_key", "<private key>"]
        format => "json_batch"
        codec => "json"
        mapping => {
            "applicationName" => "%{[@metadata][application]}"
            "subsystemName" => "%{[@metadata][subsystem]}"
            "computerName" => "%{host}"
            "text" => "%{[@metadata][event]}"
        }
        http_compression => true
        automatic_retries => 5
        retry_non_idempotent => true
        connect_timeout => 30
        keepalive => false
        }
}

Example 6

#Like previous examples, the

File input plugin

 is being used. Sincedb_path holds the path to the 
#file that holds the current position of the monitored log files. Read mode means that the files 
#will be treated as if they are content complete. Logstash will look for EOF and then emit the 
#accumulated characters as a line. This helps with processing zip’ed files. discover_interval 
#sets the frequency the plugin will use the regular expression to look for new files. Stat_interval 
#sets the frequency we check if files are modified. File_completed_action = log and 
#file_completed_path  combined will append the read file upon completion to the file species in 
#file_completed_path. File_chunk_size set the block size to be read from the file. In this 
#configuration we specified 4x the default 32KB chunk. The 

json codec

decodes (via inputs) 
#and encodes (via outputs) full JSON messages.
input {
  file {
    path => [
      "/var/log/genie/*.gz",
      "/var/log/genie/*.gzip"
    ]
    sincedb_path => "/dev/null"
    mode => read
    codec => "json"
    discover_interval => 10
    stat_interval => "500ms"
    file_completed_action => log
    file_completed_log_path => "/dev/null"
    file_chunk_size => 131072
  }
filter {
  ruby {code => "
                event.set('[@metadata][application]', 'application')
                event.set('[@metadata][subsystem]', 'subsystem')
                event.set('[@metadata][event]', event.to_json)
                "}
}
output {
  http {
        url => "<your cluster singles url>"
        http_method => "post"
        headers => ["private_key", "<private key>"]
        format => "json_batch"
        codec => "json"
        mapping => {
            "applicationName" => "%{[@metadata][application]}"
            "subsystemName" => "%{[@metadata][subsystem]}"
            "computerName" => "%{host}"
            "text" => "%{[@metadata][event]}"
        }
        http_compression => true
        automatic_retries => 5
        retry_non_idempotent => true
        connect_timeout => 30
        keepalive => false
        }
}

Where Modern Observability
and Financial Savvy Meet.

Live Webinar
Next-Level O11y: Why Every DevOps Team Needs a RUM Strategy
April 30th at 12pm ET | 6pm CET
Save my Seat