小土刀

Logstash 连接 Kafka 指南

虽然我对 Logstash 颇有微词,但是与系统自带的 rsyslog 相比还是好用很多的,本文介绍如何快速连接 Logstash 与 Kafka。


更新记录

  • 2016.08.18: 初稿
  • 2016.11.19: 更新通天塔之日志分析平台系列文章链接

通天塔之日志分析平台系列文章

总体思路

还是之前的场景,我需要把各个业务的线上服务器日志接入到统一的日志处理平台中。具体会用 Kafka 做中间件,所以需要解决的就是如何把日志传到 Kafka。原先的考虑是利用系统自带的 rsyslog,这样我只需要自动配置一下 rsyslog 的处理发送规则就可以了,免去了安装和维护的麻烦。但是系统自带的 rsyslog 版本太低,所以到头来还是要更新维护,那就不如直接用更强大且更好用的 Logstash 了。

需要注意的有两点:

  • 不要即时推送日志,以免增加服务器负担
  • 能够妥善处理 logrotate 的情况

幸运的是,这 Logstash 都考虑到了,我们只需要简单配置一下即可。

安装 Java

因为大部分线上服务器跑的是 Ruby,所以需要先安装一下 Java

sudo add-apt-repository -y ppa:webupd8team/java
sudo apt-get update
sudo apt-get -y install oracle-java8-installer

安装 Logstash

ELK 指南中介绍了用 apt-get 进行安装的方法,这里介绍如何手动下载安装

  • 下载到服务器 wget https://download.elastic.co/logstash/logstash/logstash-all-plugins-2.3.4.tar.gz
  • 解压 tar -xvzf logstash-all-plugins-2.3.4.tar.gz
  • 进入 Logstash 文件夹并创建配置文件夹(个人习惯) cd logstash-2.3.4; mkdir confs

之后所有的配置文件均可放在 confs 文件夹中。

配置 Logstash 到 Kafka

这里 Logstash 相当于 producer

Input 读取文件

Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持 glob 展开文件路径,而且会记录一个叫 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。通过记录下来的 inode, major number, minor numberpos 就可以保证不漏过每一条日志。一个可能的配置文件是这样的

input {
file {
path => ["/data/home/service/project/current/log/logstash_production.log", "/data/home/service/project/current/log/logstash_production.log.1"]
codec => "json"
add_field => { "topic" => "djiservice"}
stat_interval => 1800
}
}

这里说一下 File rotation 的情况,为了处理被 rotate 的情况,最好把 rotate 之后的文件名也加到 path 中(如上面所示),这里注意,如果 start_position 被设为 beginning,被 rotate 的文件因为会被认为是新文件,而重新导入。如果用默认值 end,那么在最后一次读之后到被 rotate 结束前生成的日志不会被采集。

其他一些配置的设定原因

  • add_field 添加一个 topic 字段,用作之后导入 elasticsearch 的索引标识
  • stat_interval 单位是秒,这里 30 分钟进行一次检测,不过测试的时候需要去掉这个配置
  • codec 因为已经处理成 logstash 兼容格式,就直接以 json 解析

Filter 内容定制

Filter 主要是对数据进行一些处理,比如说我用的是:

filter {
mutate {
remove_field => ["format"]
}
geoip {
source => "ip"
fields => ["location", "city_name", "country_name", "country_code2","country_code3", "region_name", "continent_code"]
}
}

这里做的操作一是移除无效的域,二是把 ip 转换为地理位置,方便后期的处理。

Output 输出到 Kafka

因为 Logstash 自带 Kafka 插件,直接配置上即可,比如:

output {
kafka {
topic_id => "test"
bootstrap_servers => "kafka_url:port"
}
}

下面是基本的配置及其设定原因

  • topic_id 指定 topic 来进行发送
  • bootstrap_servers 这里是 Kafka 的接入地址

其他一些需要注意的配置

  • acks 可以选的值为 0, 1, all,这里解释一下,0 表示不需要 server 返回就认为请求已完成;1 表示需要 leader 返回才认为请求完成;all 表示需要所有的服务器返回才认为请求完成
  • batch_size 单位是字节,如果是发送到同一分区,会攒够这个大小才发送一次请求
  • block_on_buffer_full 这个设置在缓冲区慢了之后阻塞还是直接报错
  • buffer_memory 发送给服务器之前的缓冲区大小,单位是字节
  • client_id 可以在这里设定有意义的名字,就不一定要用 ip 和 端口来区分
  • compression_type 压缩方式,默认是 none,其他可选的是 gzipsnappy

利用 Logstash 从 Kafka 导出数据到 Elasticsearch

这一步就比较简单了,先从 Kafka 中读取,然后写入到 elasticsearch,这里 Logstash 作为 consumer

output {
input {
kafka {
zk_connect => "localhost:2181"
topic_id => "log"
}
}
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["url:port"]
user => "name"
password => "password"
index => "%{service}-%{+YYYY-MM-dd}"
}
}

这样可以按照配置的服务名称和日期来切割。

至此,我们完成了从 Logstash 到 Kafka 再到 Elasticsearch 的连接,下一步就可以用 kibana 来展示日志的监控分析结果了。

参考链接

您的支持是对我创作最大的鼓励!

热评文章