6.6. Logstash 配置示例【翻译中】
以下这些示例阐释了如何通过配置 Logstash 过滤事件,处理 Apache 日志和 syslog 消息,以及通过使用条件语句控制筛选和输出插件处理的事件。
小技巧:如果需要帮助构建 grok 模式,请尝试 Grok 调试器。
配置过滤器
过滤器是一种内联处理机制,它提供了切割数据以满足需求的灵活性。让我们来看看一些实际使用的过滤器。以下配置文件设置了 grok
和 date
过滤器。
input { stdin { } }
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
使用该配置运行 Logstash:
bin/logstash -f logstash-filter.conf
接下来,粘贴以下行到终端并按回车运行,然后数据会被标准输入插件处理:
127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"
你将会在标准输出中看到类似以下的返回信息:
{
"message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"",
"@timestamp" => "2013-12-11T08:01:45.000Z",
"@version" => "1",
"host" => "cadenza",
"clientip" => "127.0.0.1",
"ident" => "-",
"auth" => "-",
"timestamp" => "11/Dec/2013:00:01:45 -0800",
"verb" => "GET",
"request" => "/xampp/status.php",
"httpversion" => "1.1",
"response" => "200",
"bytes" => "3891",
"referrer" => "\"http://cadenza/xampp/navi.php\"",
"agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\""
}
正如你所看到的,Logstash(借助 grok
过滤器)能够解析日志行(恰好是 Apache 的「组合日志」格式),并把它分解成许多不同的离散信息单元。一旦你查询和分析日志数据的时候,这是相当有用的。例如,你可以轻松地获取到关于 HTTP 响应码、 IP 地址、引用程序等的报告。
处理 Apache 日志
让我们来做一些有用的事情:处理 apache2 访问日志文件!我们在本地文件中读取输入,根据我们的需要使用条件语句处理事件。首先,创建一个名为 logstash-apache.conf
的文件,内容如下(你可以根据你的需要改变日志文件的路径):
input {
file {
path => "/tmp/access_log"
start_position => "beginning"
}
}
filter {
if [path] =~ "access" {
mutate { replace => { "type" => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
stdout { codec => rubydebug }
}
然后,使用以下日志条目(或者使用你自己 web 服务器中的条目)创建上面配置的输入文件(在本例中为 /tmp/access_log
文件):
71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3"
134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)"
98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"
现在,通过 -f
标识传递配置文件来运行 Logstash:
bin/logstash -f logstash-apache.conf
现在,你应该就可以在 Elasticsearch 中看到你的 apache 日志数据了。Logstash 打开并读取指定的输入文件,处理它遇到的每一个事件。任何记录到该文件的添加行都将被 Logstash 事件捕获,处理,然后存储到 Elasticsearch 。作为额外的好处,它们被隐藏在type
字段中,设置为apache_access
(这是通过在输入配置中输入apache_access
行完成的)。
使用条件语句
你可以使用条件语句来控制过滤器或者输出插件处理哪些事件。你可以根据事件出现在哪个文件中来标记每个事件(访问日志,错误日志或者其他以log
结尾的任意文件)。
input {
file {
path => "/tmp/*_log"
}
}
filter {
if [path] =~ "access" {
mutate { replace => { type => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
} else if [path] =~ "error" {
mutate { replace => { type => "apache_error" } }
} else {
mutate { replace => { type => "random_logs" } }
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
此示例使用type
字段标记所有事件,但实际上并不解析错误或者随机文件。这是因为错误日志有很多类型,如何标记取决于你使用的那种类型的日志。
同样的,你可以使用条件语句将事件定向到特定的输出。例如你可以:
- 对所有状态值为 5xx 的 apache 事件抛出 nagios 告警
- 对所有状态值为 4xx 的事件记录到 Elasticsearch
- 通过 statsd 记录所有状态码的命中率
处理 Syslog 消息
input {
tcp {
port => 5000
type => syslog
}
udp {
port => 5000
type => syslog
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
bin/logstash -f logstash-syslog.conf
telnet localhost 5000
Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied
Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.
{
"message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
"@timestamp" => "2013-12-23T22:30:01.000Z",
"@version" => "1",
"type" => "syslog",
"host" => "0:0:0:0:0:0:0:1:52617",
"syslog_timestamp" => "Dec 23 14:30:01",
"syslog_hostname" => "louis",
"syslog_program" => "CRON",
"syslog_pid" => "619",
"syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
"received_at" => "2013-12-23 22:49:22 UTC",
"received_from" => "0:0:0:0:0:0:0:1:52617",
"syslog_severity_code" => 5,
"syslog_facility_code" => 1,
"syslog_facility" => "user-level",
"syslog_severity" => "notice"
}
推荐文章: