There is a tutorial here
Install filebeat:
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.5.4-linux-x86_64.tar.gz
tar xzvf filebeat-6.5.4-linux-x86_64.tar.gz
Create the config file:
filebeat.inputs:
- type: log
enabled: true
paths:
- /mnt/projects/log_investigation/*.log
# Multiline log, it starts with: 2019-09-25 11:55:30.378|
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3}\|'
multiline.negate: true
multiline.match: after
# Just in case multiple apps are using the same logstash.
fields:
app.name: server_app
#----------------------------- Logstash output --------------------------------
output.logstash:
# Listen for logstash
hosts: ["127.0.0.1:5044"]
Test the config file:
filebeat test config -c filebeat.yml
Start filebeat:
filebeat run -c filebeat.yml
Logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.3.2.zip
unzip logstash-7.3.2.zip
Config file:
input {
# Filebeat port
beats {
port => 5044
}
}
filter {
grok {
# Patterns for ndc and thread
patterns_dir => ["./patterns"]
# Regex
match => { "message" => "%{TIMESTAMP_ISO8601:log_time}\|%{ISO8601_TIMEZONE:server_timezone}\|%{TZ:server_tz}\|%{HOSTNAME:server_hostname}\|%{WORD:app_name}\|%{LOGLEVEL:log_level}\|%{NDC:ndc}\|(?<logger>[A-Za-z0-9$_.]+)\|%{THREAD:thread}\|%{NUMBER:message_size}\|%{GREEDYDATA:message_logged}" }
}
# ndc will become an array:
kv {
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html
# This works with values like this: "key1=value key2=value", it doesn't support "key" like we have for query.
source => "ndc"
target => "ndc_context"
# You will be able to access them like this: [ndc_context][username]"
# "ndc_context" => {
# "interface" => "client",
# "ke2" => "value",
# "username" => "chreniuc"
# },
}
# Convert to UTC(Logs may come from different servers with different timezones)
mutate {
add_field => [ "log_time_tz", "%{log_time} %{server_timezone}" ]
}
date {
# log_time_tz looks like this: 2019-09-25 11:55:30.378 -0500
match => [ "log_time_tz" , "yyyy-MM-dd HH:mm:ss.SSS Z" ]
target =>["log_time_tz"]
timezone => "UTC"
}
# Get the date in this format: dd-mm-yyyy(We will use this for the log file name)
# The log files will be per day
mutate
{
# Convert the date back to string.
convert => ["log_time_tz","string"]
}
# Extract the date
grok {
# Patterns for LOG_DATE
patterns_dir => ["./patterns"]
# 2019-10-02T21:09:45.290Z
match => { "log_time_tz" => "%{LOG_DATE:log_date}%{GREEDYDATA:ignored_log_date}" }
}
# We can remove the unwanted fields afterwards.
mutate {
remove_field => [ "log_time_tz" ]
remove_field => [ "ignored_log_date" ]
# We have the ndc context in "ndc_context" so we can avoid sending this forward.
remove_field => [ "ndc" ]
}
}
output {
# Print on console, just for debug purpose
stdout {
codec => rubydebug
}
# This is how you access nested fields
if ([ndc_context][username] != "") { # Centralise log files per user and per days from multiple servers.
file {
path => "/mnt/projects/log_investigation/out_put_logs/%{log_date}/%{[ndc_context][username]}.log"
codec => line { format => "%{message}"}
}
}
}
./patterns
content:
NDC ([0-9A-Za-z=\+\-\s\_])*
THREAD 0x[0-8a-f]+
LOG_DATE [0-9]{4}\-[0-9]{2}\-[0-9]{2}
Start logstash:
logstash -f logstash.conf
Test it:
echo '2019-09-25 11:55:30.378|-0500|CST|server|server_app|INFO|interface=client|request_response|0x7f7461f38700|295|Message' >> file.log
# Multiline:
printf '2019-09-25 11:55:30.378|-0500|CST|server|server_app|INFO|interface=client username=username key2=value query|request_response|0x7f7461f38700|295|Message\ndada\ndas\n' >> file.log
The output of logstash should be:
{
"host" => {
"name" => "chreniuc-sv"
},
"input" => {
"type" => "log"
},
"thread" => "0x7f7461f38700",
"fields" => {
"app" => {
"name" => "server_app"
}
},
"logger" => "request_response",
"@timestamp" => 2019-10-02T22:41:40.464Z,
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"message_logged" => "Message\ndada\ndas",
"log_time" => "2019-09-25 11:55:30.378",
"app_name" => "server_app",
"offset" => 6490,
"@version" => "1",
"server_timezone" => "-0500",
"server_hostname" => "server",
"log_date" => "2019-09-25",
"beat" => {
"name" => "chreniuc",
"hostname" => "chreniuc",
"version" => "6.5.4"
},
"log" => {
"flags" => [
[0] "multiline"
]
},
"prospector" => {
"type" => "log"
},
"server_tz" => "CST",
"message_size" => "295",
"message" => "2019-09-25 11:55:30.378|-0500|CST|server|app|INFO|interface=client username=username key2=value query|request_response|0x7f7461f38700|295|Message\n
dada\ndas",
"source" => "/mnt/projects/log_investigation/file.log",
"log_level" => "INFO",
"ndc_context" => {
"username" => "username",
"key2" => "value",
"interface" => "client"
}
}
Resources:
- Regex - used in
./patterns
- log4j to grok - here - Paste the layout from the log4cxx config in that field and it will translate it to grok
- default patterns form logstash - here
- multilines filebeat
- test grok pattern