《Learning ELK Stack》3 使用Logstash采集、解析和转换数据

3 使用Logstash采集、解析和转换数据

  • 理解Logstash如何采集、解析并将各种格式和类型的数据转换成通用格式,然后被用来为不同的应用构建多样的分析系统

配置Logstash

  • 输入插件将源头数据转换成通用格式的事件,过滤插件修改这些事件,最终输出插件将它们输出到其他系统

image-20200625235030110


Logstash插件

列出Logstash的所有插件

bin/plugin list
  • 使用下面命令列出指定分组的插件
bin/plugin list --group <group_name>
bin/plugin list --group output

插件属性的数据类型

数组(Array

path => ["value1", "value2"]

布尔值(Boolean

periodic_flush => false

编解码器(Codec

  • 编解码器实际上并不是一种数据类型,它是在输入或输出的时候对数据进行解码或编码的一种方式。上面例子指定在输出时,编解码器会将所有输出数据编码成json格式
codec => "json"

哈希(Hash

  • 由一系列键值对组成的集合
match => {
    "key1" => "value1", "key2" => "value2"
}

字符串(String

value => "welcome"

注释(Comment

  • 以字符#开头
# 这是一个注释

字段引用

  • 可使用[field_name]的方式引用,嵌套字段可以使用[level1][level2]的方式指定

Logstash条件语句

  • 在某些条件下Logstash可以用条件语句来过滤事件或日志记录。Logstash中的条件处理和其他编程语言中的类似,使用ifif elseelse语句。多个if else语句块可以嵌套
if <conditional expression1> {
# 一些处理语句
}
else if <conditional expression2> {
# 一些处理语句
}
else {
# 一些其他语句
}
  • 条件语句可以与比较运算符、逻辑运算符和单目运算符一起使用
  • 比较运算符包括以下几种
  1. 相等运算符:==、!=、<、>、<=、>=
  2. 正则表达式:=、!
  3. 包含:innot in
  4. 逻辑运算符:andornandxor
  5. 单目运算符:!
filter {
  if [action] == "login" {
    mutate { remove => "password" }
  }
}
output {
    if [loglevel] == "ERROR" and [deployment] == "production" {
        email {}
    }
}

Logstash插件的类型

  1. 输入(Input
  2. 过滤器(Filter
  3. 输出(Output
  4. 编解码(Codec

输入插件

文件(file

Logstash文件输入插件将文件读取的最新位点保存在$HOME/.sincdb*的文件中。文件路径和刷新频率可以通过sincedb_path和sincdb_write_interval配置

input {
    file {
        path => "/GOOG.csv"
        add_field => {"input_time" => "%{@timestamp}"}
        codec => "json"
        delimiter => "\n"
        exclude => "*.gz"
        sincedb_path => "$HOME/.sincedb*"
        sincedb_write_interval => 15
        start_position => "end"
        tags => ["login"]
        type => ["apache"]
    }
}
选项 数据类型 是否必选 默认值 说明
add_field hash {} 增加字段
codec string plain 用于指定编解码器输入
delimiter string `n ` 分隔符
exclude array [] 排除指定类型文件
sincedb_path string $HOME/.sincedb 监视文件当前读取位点
sincedb_write_interval int 15 指定sincedb文件写入频率
start_position string end 输入文件的初始读取位点
tags array 给输入事件增加一系列标签
type string 给多个输入路径中配置的不同类型的事件指定type名称
path array 日志文件路径
input {
    file {
        path => ["/var/log/syslog/*"]
        type => "syslog"
    }
    file {
        path => ["/var/log/apache/*"]
        type => "apache"
    }
}
filter {
    if [type] == "syslog" {
        grok{}
    }
    if [type] == "apache" {
        grok{}
    }
    if "login" == tags[] {}
}
Redis
  • redis实例中读取事件和日志。经常用于输入数据的消息代理,将输入数据缓存到队列,等待索引器读取日志
选项 数据类型 是否必选 默认值 说明
add_field hash {} 增加字段
codec string plain 用于指定编解码器输入
data_type string list listBLPOP)、channelSUBSCRIBE命令订阅key)、pattern_channelPSUBSCRIBE命令订阅key
host string 127.0.0.1
key string
password string
port int 6379

文档链接:www.elastic.co/guide/en/logstash/c...

输出

elasticsearch
  • 最重要的输出插件

image-20200626065529981

过滤器

  • 用于在输出插件输出结果之前,对输入插件中读取的事件进行中间处理。常用于识别输入事件的字段,并对输入事件的部分内容进行条件判断处理
csv
  • 用于将csv文件输入的数据进行解析,并将值赋给字段
csv {
    columns => ["date_of_record","open","high","low","close","volume","adj_close"]
    separator => ","
}
date
  • 给事件赋予正确的时间戳非常重要,只有这样才能在Kibana中使用时间过滤器对事件进行分析
date {
    match => ["date_of_record", "yyyy-MM-dd"]
}
drop
  • 将满足条件的所有事件都丢弃掉,这个过滤插件有下面这些配置选项
  1. add_field
  2. add_tag
  3. remove_field
  4. remove_tag
filter {
    if [fieldname == "test"] {
        drop {}
    }
}
geoip
  • 基于输入事件中的IP地址给事件增加地理位置信息。这些信息从Maxmind数据库中读取

Maxmind是一个专门提供IP地址信息产品的公司。GeoIP是它们开发的智能IP产品,用于IP地址的位置跟踪。所有Logstash版本都自带一个Maxmind的GeoLite城市数据库。这个地址数据库可以从https://dev.maxmind.com/geoip/geoip2/geolite2/获取

geoip {
    source => # 必选字符串,需要使用geoip服务进行映射的ip地址或主机名
}
grok
  • 目前为止最流行、最强大的插件。使用它可以解析任何非结构化的日志事件,并将日志转化成一系列结构化的字段,用于后续的日志处理和分析
  • 可以用于解析任何类型的日志,包括apachemysql、自定义应用日志或者任何事件中非结构化的文本
  • Logstash默认包含了很多grok模式,可以直接用来识别特定类型的字段,也支持自定义正则表达式
  • 所有可用grok模式从这里获取:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
  • 上面grok模式可以使用下面这样的操作符直接识别这些类型的字段。希望将日志事件中代表主机名的文本赋值给host_name这个字段
%{HOSTNAME:host_name}
  • 看一下如何用grok模式表示一行HTTP日志
54.3.245.1 GET /index.html 14562 0.056
  • grok模式是这样的
%{IP:client_ip} %{WORD:request_method} %{URIPATHPARAM:uri_path} %{NUMBER:bytes_transfered} %{NUMBER:duration}
filter {
    grok {
        match => { "message" => "%{IP:client_ip} %{WORD:request_method} %{URIPATHPARAM:uri_path} %{NUMBER:bytes_transfered} %{NUMBER:duration}" }
    }
}
  • 使用grok过滤器处理上面的事件后,可以看到事件中增加了如下字段和值
  1. client_ip:54.3.245.1
  2. request_methodGET
  3. uri_path:/index.html
  4. bytes_transferred:14562
  5. duration:0.056
  • 如果grok模式中没有需要的模式,可以使用正则表达式创建自定义模式

设计和测试grok模式

grokdebug.herokuapp.com/

grokconstructor.appspot.com/do/matc...

image-20200626073227543

mutate
  • 对输入事件进行重命名、移除、替换和修改字段。也用于转换字段的数据类型、合并两个字段、将文本从小写转换为大写等

image-20200626073755986

sleep
  • Logstash置于sleep模式,时间由参数指定,也可以基于事件指定sleep频率
  • 如果希望每处理五个事件就sleep一秒,可以这样配置
filter {
    sleep {
        time => "1"
        every => 5
    }
}

编解码

  • 用于对输入事件进行解码,对输出事件进行解码,以流式过滤器的形式在输入插件和输出插件中工作,重要的编解码插件包括
  1. avro
  2. json
  3. line
  4. multiline
  5. plain
  6. rubydebug
  7. spool

输入事件或输出事件是完整的json文档,可以这样配置(其中一种方式就可以)

input {
    stdin { codec => "json" }
    stdin { codec => json{} }
}

将每行输入日志作为一个事件,将每个输出事件解码成一行

input {
    stdin { codec => line{} }
    stdin { codec => "line" }
}

把多行日志作为一个事件处理

input {
    file {
        path => "/var/log/someapp.log"
        codec => multiline {
            pattern => "^%{TIMESTAMP_ISO8601}"
            negate => true
            what => previous
        }
    }
}

rubydebug在输出事件时使用,使用Ruby Awesome打印库打印输出事件

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!