第一节 Logstash 入门及架构介绍
1、Logstash
Logstash: ETL 工具 / 数据搜集处理引擎。支持 200 多个插件
2、Logstash Concepts
1-1 Pipeline
- 包含了
input-filter-output
三个阶段的处理流程 - 插件生命周期管理
- 队列管理
1-2 Logstash Event
- 数据在内部流转时的具体表现形式。数据在
input
阶段被转换为Event
,在output
被转化成目标格式数据 - Event 其实是一个 Java Object,在配置文件中,对 Event 的属性进行增删改查
3、Logstash 架构简介
Codec(Code / Decode):
- 将原始数据 decode 成 Event;
- 将 Event encode 成目标数据
4、Logstash 配置文件结构
bin/logstash -f demo.conf
- Pipeline
- Input / Filter / Output
- Codec
- Line / json
- Input: stdin
- filter:
- grok
- date
- output:
- elasticsearch
- stdout
4-1 Input Plugins
- 一个 Pipeline 可以有多个 input 插件
- Stdin / File
- Beats / Log4J / Elasticsearch / JDBC / Kafka / Rabbitmq / Redis
- JMX / HTTP / Websocket / UDP / TCP
- Google Cloud Storage / S3
- Github / Twitter
4-2 Output Plugins
- 将 Event 发送到特定的目的地,是 Pipeline 的最后一个阶段
- 常见 Output Plugins: https://www.elastic.co/guide/en/logstash/7.1/output-plugins.html
- Elasticsearch
- Email / Pageduty
- Influxdb / Kafka / Mongodb / Opentsdb / Zabbix
- Http / TCP / Websocket
4-3 Codec Plugins
- 将原始数据 decode 成 Event;
- 将 Event encode 成目标数据
- 内置的 Codec Plugins https://www.elastic.co/guide/en/logstash/7.1/codec-plugins.html
- Line / Multiline
- JSON / Avro / Cef (ArcSight Common Event Format)
- Dots / Rubydebug
4-4 Filter Plugins
- 处理 Event
- 内置的 Filter Plugins: https://www.elastic.co/guide/en/logstash/7.1/filter-plugins.html
- Mutate – 操作 Event 的字段
- Metrics – Aggregate metrics
- Ruby–执行Ruby代码
5、 Queue
5-1 多 Pipelines 实例
Pipeline.works
: Pipeline 线程数,默认是 CPU 核数Pipeline.batch.size
: Batcher 一次批量获取等待处理的文档数,默认 125。需结合jvm.options
调节Pipeline.batch.delay
: Batcher 等待时间
5-1 Logstash Queue
In Memory Queue
进程 Crash,机器当机,都会引起数据的丢失
Persistent Queue
- Queue.type.persisted (默认是 memory)
Queue.max_bytes: 4gb
- 机器当机,数据也不会丢失;
- 数据保证会被消费;
- 可以替代 Kafka 等消息队列缓冲区的作用
- https://www.elastic.co/guide/en/logstash/7.1/persistent-queues.html
6、Codec Plugin
6-1 Single Line
6-2 Multiline
设置参数
- Pattern: 设置行匹配的正则表达式
- What :如果匹配成功,那么匹配行属于上一个事件还是下一个事件
- Previous / Next
- Negate true / false:是否对 pattern 结果取反
- True / False
6-3 Codec Plugin – Multiline(异常日志)
7、Input Plugin – File
- 支持从文件中读取数据,如日志文件
- 文件读取需要解决的问题
- 只被读取一次。重启后需要从上次读取的位置继续(通过 sincedb 实现)
- 读取到文件新内容,发现新文件
- 文件发生归档操作(文档位置发生变化,日志 rotation),不能影响当前的内容读取
8、Filter Plugin
Filter Plugin 可以对 Logstash Event 进行各种处理,例如解析,删除字段,类型转换
- Date:日期解析
- Dissect:分割符解析
- Grok:正则匹配解析
- Mutate:处理字段。重命名,删除,替换
- Ruby:利用 Ruby 代码来动态修改 Event
8-1 Filter Plugin - Mutate
对字段做各种操作
- Convert 类型转换
- Gsub 字符串替换
- Split / Join / Merge 字符串切割,数组合并字符串,数组合并数组
- Rename 字段重命名
- Update / Replace 字段内容更新替换
Remove_field
字段删除
9、DEMO
JVM 升级导致logstash 启动失败
$ logstash -h
Unrecognized VM option 'UseConcMarkSweepGC'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
vim /usr/share/logstash/config/jvm.options
...
## GC configuration
## -XX:+UseConcMarkSweepGC
## -XX:CMSInitiatingOccupancyFraction=75
## -XX:+UseCMSInitiatingOccupancyOnly
-XX:+UseG1GC
...
9-1 logstash-filter.conf
input { stdin { } }
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
stdout { codec => rubydebug }
}
# 一个 Demo, demo 运行
logstash -f logstash-filter.conf
...
[2020-12-12T19:13:11,145][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2020-12-12T19:13:11,190][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-12-12T19:13:11,599][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
hello world
{
"@timestamp" => 2020-12-12T19:15:13.324Z,
"tags" => [
[0] "_grokparsefailure"
],
"@version" => "1",
"message" => "hello world",
"host" => "elasticsearch7"
}
9-2 codec=>line
logstash -e "input{stdin{codec=>line}}output{stdout{codec=> rubydebug}}"
hello world
{
"@timestamp" => 2020-12-12T19:17:36.692Z,
"message" => "hello world",
"host" => "elasticsearch7",
"@version" => "1"
}
9-3 codec=> json
logstash -e "input{stdin{codec=>json}}output{stdout{codec=> rubydebug}}"
# line: error
hello world
[2020-12-12T19:19:45,222][WARN ][logstash.codecs.jsonlines][main][88ac75a22ddd4bf4cc22d3c0ae803da99e4ef96a529188df0a584d9d02739682] JSON parse error, original data now in message field {:error=>#<LogStash::Json::ParserError: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
at [Source: (String)"hello world"; line: 1, column: 6]>, :data=>"hello world"}
{
"tags" => [
[0] "_jsonparsefailure"
],
"@timestamp" => 2020-12-12T19:19:45.230Z,
"message" => "hello world",
"host" => "elasticsearch7",
"@version" => "1"
}
# json: successful
{"hello":"world"}
{
"hello" => "world",
"@timestamp" => 2020-12-12T19:20:12.973Z,
"host" => "elasticsearch7",
"@version" => "1"
}
9-4 stdout{codec=> dots}
logstash -e "input{stdin{codec=>line}}output{stdout{codec=> dots}}"
hello world
.
9-5 multiline-exception.conf
multiline-exception.conf
input {
stdin {
codec => multiline {
pattern => "^\s"
what => "previous"
}
}
}
filter {}
output {
stdout { codec => rubydebug }
}
logstash -f multiline-exception.conf
# 多行数据,异常
Exception in thread "main" java.lang.NullPointerException
at com.example.myproject.Book.getTitle(Book.java:16)
at com.example.myproject.Author.getBookTitles(Author.java:25)
at com.example.myproject.Bootstrap.main(Bootstrap.java:14)
hello world
{
"@timestamp" => 2020-12-12T19:28:38.285Z,
"message" => "Exception in thread \"main\" java.lang.NullPointerException\n at com.example.myproject.Book.getTitle(Book.java:16)\n at com.example.myproject.Author.getBookTitles(Author.java:25)\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)",
"@version" => "1",
"tags" => [
[0] "multiline"
],
"host" => "elasticsearch7"
}
9-6 movielens/logstash.conf
input {
file {
path => "/home/vagrant/logstash/movielens/ml-latest-small/movies.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["id","content","genre"]
}
mutate {
split => { "genre" => "|" }
remove_field => ["path", "host","@timestamp","message"]
}
mutate {
split => ["content", "("]
add_field => { "title" => "%{[content][0]}"}
add_field => { "year" => "%{[content][1]}"}
}
mutate {
convert => {
"year" => "integer"
}
strip => ["title"]
remove_field => ["path", "host","@timestamp","message","content"]
}
}
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "movies"
document_id => "%{id}"
}
stdout {}
}
- path: filter location
sincedb_path
: 我们不需要这个功能,所以指定到了"/dev/null"- output: elasticsearch
- hosts / index / document_id
10、Q&A
Q: 怎样使用logstash导入word文档到ES呢
A:
logstash 似乎并不能处理word文档。如果需要处理pdf,ppt等文档,需要安装 ingest-attachment插件 https://www.elastic.co/guide/en/elasticsearch/plugins/current/ingest-attachment.html