Kafka 指南

提到消息系统,目前最火热的非 Kafka 莫属,公司也打算利用 Kafka 进行各业务日志统一收集,这里结合自己的实践来分享一下具体的配置及使用。Kafka 版本 0.10.0.1


更新记录

  • 2016.08.15: 初稿

介绍

作为云计算大数据的套件,Kafka 是一个分布式的、可分区的、可复制的消息系统。该有的功能基本都有,而且有自己的特色:

  • 以 topic 为单位进行消息归纳
  • 向 topic 发布消息的是 producer
  • 从 topic 获取消息的是 consumer
  • 集群方式运行,每个服务叫 broker
  • 客户端和服务器通过 TCP 进行通信

在Kafka集群中,没有“中心主节点”的概念,集群中所有的服务器都是对等的,因此,可以在不做任何配置的更改的情况下实现服务器的的添加与删除,同样的消息的生产者和消费者也能够做到随意重启和机器的上下线。

对每个 topic 来说,Kafka 会对其进行分区,每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。分区中的每个消息都有一个连续的序列号叫做 offset,用来在分区中唯一的标识这个消息。

发布消息通常有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers 可以同时从服务端读取消息,每个消息只被其中一个 consumer 读到;发布-订阅模式中消息被广播到所有的 consumer 中。更常见的是,每个 topic 都有若干数量的 consumer 组,每个组都是一个逻辑上的『订阅者』,为了容错和更好的稳定性,每个组由若干 consumer 组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个 consumer。

通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。

Kafka 只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要 topic 中所有消息的有序性,那就只能让这个 topic 只有一个分区,当然也就只有一个 consumer 组消费它。

单机配置

按照下列步骤即可(来自官网教程)

1. 下载 Kafka

  • 下载 wget http://apache.01link.hk/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz 或者 wget http://ftp.cuhk.edu.hk/pub/packages/apache.org/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz(看哪个源比较快)
  • 解压 tar -xzf kafka_2.11-0.10.0.0.tgz
  • 进入文件夹 cd kafka_2.11-0.10.0.0/

2. 启动服务

  • 启动 ZooKeeper bin/zookeeper-server-start.sh config/zookeeper.properties &(利用 &放到后台方便继续操作)
  • 启动 Kafka bin/kafka-server-start.sh config/server.properties &

3. 创建一个叫做 dawang 的 topic,它只有一个分区,一个副本

  • 创建 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dawang
  • 查看 bin/kafka-topics.sh --list --zookeeper localhost:2181
  • 还可以配置 broker 让它自动创建 topic

4. 发送消息。Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。

  • 发送消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dawang(然后可以随意输入内容,回车可以发送,ctrl+c 退出)

5. 启动 consumer。可以读取消息并输出到标准输出:

  • 接收消息 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dawang --from-beginning
  • 在一个终端中运行 consumer 命令行,另一个终端中运行 producer 命令行,就可以在一个终端输入消息,另一个终端读取消息。这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。

6. 搭建一个多个 broker 的集群,启动有 3 个 broker 组成的集群,这些 broker 节点也都在本机

首先复制一下配置文件:cp config/server.properties config/server-1.propertiescp config/server.properties config/server-2.properties

两个文件需要改动的内容为:

config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

这里我们把 broker id, 端口号和日志地址配置成和之前不一样,然后我们启动这两个 broker:

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

然后创建一个复制因子为 3 的 topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic oh3topic

可以使用 describe 命令来显示 topic 详情

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oh3topic
Topic:oh3topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: oh3topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

这里简单解释一下

  • Leader 是给定分区的节点编号,每个分区的部分数据会随机指定不同的节点
  • Replicas 是该日志会保存的复制
  • Isr 表示正在同步的复制

我们也可以来看看之前的另一个 topic 的情况

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic dawang
Topic:dawang PartitionCount:1 ReplicationFactor:1 Configs:
Topic: dawang Partition: 0 Leader: 0 Replicas: 0 Isr: 0

最后我们可以按照同样的方法来生产和消费消息,例如

# 生产
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic oh3topic
# 消费
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic oh3topic

开俩终端就可以一边生产消息,一边消费消息了。

注意事项

如果要配置自定义端口,server.properties 中 listeners 一定要配置成为 IP 地址;如果配置为 localhost 或服务器的 hostname,在使用 java 发送数据时就会抛出异

# 创建 topic
bin/kafka-topics.sh --create --zookeeper bi03:2181 --replication-factor 1 --partitions 1 --topic logs
# 生产消息
bin/kafka-console-producer.sh --broker-list localhost:13647 --topic logs
# 消费消息
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs

如果 Zookeeper 出现 fsync-ing the write ahead log in SyncThread:1 took 2243ms which will adversely effect operation latency. See the ZooKeeper troubleshooting guide,是因为 FOLLOWER 在跟 LEADER 同步时,fsync 操作时间过长,导致超时。增加 tickTime 或者 initLimitsyncLimit 的值即可

集群配置

kafka 使用 ZooKeeper 用于管理、协调代理。每个 Kafka 代理通过 Zookeeper 协调其他 Kafka 代理。当 Kafka 系统中新增了代理或某个代理失效时,Zookeeper 服务将通知生产者和消费者。生产者与消费者据此开始与其他代理协调工作。

安装 Java

先给两台机子安装 Java

sudo add-apt-repository -y ppa:webupd8team/java
sudo apt-get update
sudo apt-get -y install oracle-java8-installer

更新 Hosts

这里用两台机器做例子(理论上最好是 3 台起步,偶数个不是不可以的,但是zookeeper集群是以宕机个数过半才会让整个集群宕机的,所以奇数个集群更佳),分别配置 /etc/hosts 文件为

127.0.0.1 localhost
10.1.1.164 bi03
10.1.1.44 bi02

修改 Zookeeper 配置文件

修改 config/zookeeper.properties

dataDir=/data/home/logger/kafka_2.11-0.10.0.0/zookeeper-logs/
clientPort=2181
# maxClientCnxns=0
tickTime=2000
initLimit=5
syncLimit=2
server.1=bi03:13645:13646
server.2=bi02:13645:13646

参数的意义为:

  • initLimit: zookeeper集群中的包含多台 server,其中一台为 leader,集群中其余的 server 为 follower。initLimit 参数配置初始化连接时,follower 和 leader 之间的最长心跳时间。此时该参数设置为 5,说明时间限制为 5 倍 tickTime,即 5*2000=10000ms=10s
  • syncLimit: 该参数配置 leader 和 follower 之间发送消息,请求和应答的最大时间长度。此时该参数设置为 2,说明时间限制为 2 倍 tickTime,即 4000ms
  • server.X=A:B:C 其中 X 是一个数字, 表示这是第几号 server。A 是该 server 所在的 IP 地址。B 配置该 server 和集群中的 leader 交换消息所使用的端口。C 配置选举 leader 时所使用的端口。

给服务器编号

在 dataDir 目录下建立一个 myid 文件,分别为

# server.1
echo 1 > myid
# server.2
echo 2 > myid

启动 Zookeeper

然后在每台机子上启动 zookeeper 服务

bin/zookeeper-server-start.sh config/zookeeper.properties &

所有机子的 zookeeper 都启动之前会报错,这都是正常的

如果不想要任何输出

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

修改 Kafka 配置文件

修改 config/server.properties,几个要改的部分是

# 允许删除 topic
delete.topic.enable=true
broker.id=0 # 这里不能重复
listeners=PLAINTEXT://bi03:13647 # 这里要配置成本机的 host name
# 这里需要配置成外网能够访问的地址及端口
advertised.listeners=PLAINTEXT://external.ip:8080
log.dirs=/data/home/logger/kafka_2.11-0.10.0.0/kafka-logs
num.partitions=2
zookeeper.connect=bi03:2181,bi02:2181

启动 Kafka

在每个节点上执行

bin/kafka-server-start.sh config/server.properties &

如果不想要任何输出

nohup bin/kafka-server-start.sh config/server.properties &

验证安装

创建一个 topic

bin/kafka-topics.sh --create --zookeeper bi03:2181,bi02:2181 --replication-factor 2 --partitions 1 --topic test

查看集群状态

bin/kafka-topics.sh --describe --zookeeper bi03:2181,bi02:2181 --topic test

生产消息,这里注意要生产到前面设置的监听端口,而不是 zookeeper 的端口

bin/kafka-console-producer.sh --broker-list bi03:13647,bi02:13647 --topic test

消费消息,这里注意是 zookeeper 的端口,而不是 kafka 的端口

bin/kafka-console-consumer.sh --zookeeper bi03:2181,bi02:2181 --from-beginning --topic test

显示 topic 列表

bin/kafka-topics.sh --zookeeper bi03:2181,bi02:2181 --list

删除 topic

bin/kafka-topics.sh --zookeeper bi03:2181,bi02:2181 --delete --topic hello

其他配置

Kafka 使用键值对的属性文件格式来进行配置,比如 config/server.properties,具体的值可以从文件中读取,或者在代码中进行指定。最重要的三个属性是:

  • broker.id: broker 的编号,不能相同
  • log.dirs: 日志保存的文件夹,默认为 /tmp/kafka-logs
  • zookeeper.connect: zookeeper 的 host

其他一些我觉得比较有用的属性为

  • auto.create.topics.enable 是否允许自动创建 topic,boolean 值,默认为 true
  • auto.leader.rebalance.enable 是否允许 leader 进行自动平衡,boolean 值,默认为 true
  • background.threads 后台进程数目,int 值,默认为 10 个
  • compression.type 指定 topic 的压缩方式,string 值,可选有
    • gzip, snappy, lz4 压缩方法
    • uncompressed 不压缩
    • producer 跟随 producer 的压缩方式
  • delete.topic.enable 是否允许删除 topic,boolean 值,默认为 false(主要用于控制 admin 界面中的控制)
  • leader.imbalance.check.interval.seconds 检查是否平衡的时间间隔,long 值,默认为 300
  • leader.imbalance.per.broker.percentage 允许的不平衡的百分比,超出则会进行重平衡,int 值,默认为 10
  • log.flush.interval.messages 攒了多少条消息之后会把数据刷入磁盘,long 值,默认是 9223372036854775807
  • log.flush.interval.ms 每条消息在保存到磁盘中前会在内存中待多久,单位毫秒,long 值,如果不设定,默认使用 log.flush.scheduler.interval.ms,也就是 9223372036854775807

更多的配置可以参考这里,以上的配置均针对 broker,因为目前我只用 broker 的部分

基本操作

所有的工具都可以在 bin/ 文件夹下查看,如果不带任何参数,就会给出所有命令的列表说明,这里只简要说明一些常用的命令

创建和移除 topic

可以手动创建 topic,或在数据进来时自动创建不存在的 topic,如果是自动创建的话,可能需要根据这里来进行对应调整。

创建 topic

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y

replication-factor 控制复制的份数,建议 2-3 份来兼顾容错和效率。partitions 控制该 topic 将被分区的数目,partitions 的数目最好不要超过服务器的个数(因为分区的意义是增加并行效率,而服务器数量决定了并行的数量,假设只有 2 台服务器,分 4 个区和 2 个区其实差别不大)。另外,topic 的名称不能超过 249 个字符

修改 topic

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40

这里需要注意,即使修改了分区的个数,已有的数据也不会进行变动,Kafka 不会做任何自动重分布

增加配置

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y

移除配置

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --delete-config x

删除 topic

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

这个需要 delete.topic.enable=true,目前 Kafka 不支持减少 topic 的分区数目

优雅关闭

Kafka 会自动检测 broker 的状态并根据机器状态选举出新的 leader。但是如果需要进行配置更改停机的时候,我们就需要使用优雅关闭了,好处在于:

  1. 会把所有的日志同步到磁盘上,避免重启之后的日志恢复,减少重启时间
  2. 会在关闭前把以这台机为 leader 的分区数据迁移到其他节点,会减少不可用的时间

但是这个需要开启 controlled.shutdown.enable=true

刚重启之后的节点不是任何分区的 leader,所以这时候需要进行重新分配:

bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot

这里需要开启 auto.leader.rebalance.enable=true

然后可以使用脚本 bin/kafka-server-stop.sh

注意,如果配置文件中没有 auto.leader.rebalance.enable=true,就还需要重新平衡

深入理解

这里只是一部分摘录,更多内容可查阅参考链接(尤其是美团技术博客的那篇)

文件系统

Kafka 大量依赖文件系统去存储和缓存消息。而文件系统最终会放在硬盘上,不过不用担心,很多时候硬盘的快慢完全取决于使用它的方式。设计良好的硬盘架构可以和内存一样快。

所以与传统的将数据缓存在内存中然后刷到硬盘的设计不同,Kafka直接将数据写到了文件系统的日志中,因此也避开了 JVM 的劣势——Java 对象占用空间巨大,数据量增大后垃圾回收有困难。使用文件系统,即使系统重启了,也不需要刷新数据,也简化了维护数据一致性的逻辑。

对于主要用于日志处理的消息系统,数据的持久化可以简单的通过将数据追加到文件中实现,读的时候从文件中读就好了。这样做的好处是读和写都是 O(1) 的,并且读操作不会阻塞写操作和其他操作。这样带来的性能优势是很明显的,因为性能和数据的大小没有关系了。

既然可以使用几乎没有容量限制(相对于内存来说)的硬盘空间建立消息系统,就可以在没有性能损失的情况下提供一些一般消息系统不具备的特性。比如,一般的消息系统都是在消息被消费后立即删除,Kafka却可以将消息保存一段时间(比如一星期),这给consumer提供了很好的机动性和灵活性。

事务定义

数据传输的事务定义通常有以下三种级别:

  • 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
  • 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
  • 精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。

Kafka 的机制和 git 有点类似,有一个 commit 的概念,一旦提交且 broker 在工作,那么数据就不会丢失。如果 producer 发布消息时发生了网络错误,但又不确定实在提交之前发生的还是提交之后发生的,这种情况虽然不常见,但是必须考虑进去,现在Kafka版本还没有解决这个问题,将来的版本正在努力尝试解决。

并不是所有的情况都需要“精确的一次”这样高的级别,Kafka 允许 producer 灵活的指定级别。比如 producer 可以指定必须等待消息被提交的通知,或者完全的异步发送消息而不等待任何通知,或者仅仅等待 leader 声明它拿到了消息(followers没有必要)。

现在从 consumer 的方面考虑这个问题,所有的副本都有相同的日志文件和相同的offset,consumer 维护自己消费的消息的 offset。如果 consumer 崩溃了,会有另外一个 consumer 接着消费消息,它需要从一个合适的 offset 继续处理。这种情况下可以有以下选择:

  • consumer 可以先读取消息,然后将 offset 写入日志文件中,然后再处理消息。这存在一种可能就是在存储 offset 后还没处理消息就 crash 了,新的 consumer 继续从这个 offset 处理,那么就会有些消息永远不会被处理,这就是上面说的『最多一次』
  • consumer 可以先读取消息,处理消息,最后记录o ffset,当然如果在记录 offset 之前就 crash 了,新的 consumer 会重复的消费一些消息,这就是上面说的『最少一次』
  • 『精确一次』可以通过将提交分为两个阶段来解决:保存了 offset 后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的 offset 和消息被处理后的结果保存在一起。比如用 Hadoop ETL 处理消息时,将处理后的结果和 offset 同时保存在 HDFS 中,这样就能保证消息和 offser 同时被处理了

性能优化

Kafka 在提高效率方面做了很大努力。Kafka 的一个主要使用场景是处理网站活动日志,吞吐量是非常大的,每个页面都会产生好多次写操作。读方面,假设每个消息只被消费一次,读的量的也是很大的,Kafka 也尽量使读的操作更轻量化。

线性读写的情况下影响磁盘性能问题大约有两个方面:太多的琐碎的 I/O 操作和太多的字节拷贝。I/O 问题发生在客户端和服务端之间,也发生在服务端内部的持久化的操作中。

消息集(message set)

为了避免这些问题,Kafka 建立了消息集(message set)的概念,将消息组织到一起,作为处理的单位。以消息集为单位处理消息,比以单个的消息为单位处理,会提升不少性能。Producer 把消息集一块发送给服务端,而不是一条条的发送;服务端把消息集一次性的追加到日志文件中,这样减少了琐碎的 I/O 操作。consumer 也可以一次性的请求一个消息集。

另外一个性能优化是在字节拷贝方面。在低负载的情况下这不是问题,但是在高负载的情况下它的影响还是很大的。为了避免这个问题,Kafka 使用了标准的二进制消息格式,这个格式可以在 producer, broker 和 producer 之间共享而无需做任何改动。

zero copy

Broker 维护的消息日志仅仅是一些目录文件,消息集以固定队的格式写入到日志文件中,这个格式 producer 和 consumer 是共享的,这使得 Kafka 可以一个很重要的点进行优化:消息在网络上的传递。现代的 unix 操作系统提供了高性能的将数据从页面缓存发送到 socket 的系统函数,在 linux 中,这个函数是 sendfile

为了更好的理解 sendfile 的好处,我们先来看下一般将数据从文件发送到 socket 的数据流向:

  • 操作系统把数据从文件拷贝内核中的页缓存中
  • 应用程序从页缓存从把数据拷贝自己的内存缓存中
  • 应用程序将数据写入到内核中 socket 缓存中
  • 操作系统把数据从 socket 缓存中拷贝到网卡接口缓存,从这里发送到网络上。

这显然是低效率的,有 4 次拷贝和 2 次系统调用。sendfile 通过直接将数据从页面缓存发送网卡接口缓存,避免了重复拷贝,大大的优化了性能。

在一个多consumers的场景里,数据仅仅被拷贝到页面缓存一次而不是每次消费消息的时候都重复的进行拷贝。这使得消息以近乎网络带宽的速率发送出去。这样在磁盘层面你几乎看不到任何的读操作,因为数据都是从页面缓存中直接发送到网络上去了。

数据压缩

很多时候,性能的瓶颈并非CPU或者硬盘而是网络带宽,对于需要在数据中心之间传送大量数据的应用更是如此。当然用户可以在没有 Kafka 支持的情况下各自压缩自己的消息,但是这将导致较低的压缩率,因为相比于将消息单独压缩,将大量文件压缩在一起才能起到最好的压缩效果。

Kafka 采用了端到端的压缩:因为有『消息集』的概念,客户端的消息可以一起被压缩后送到服务端,并以压缩后的格式写入日志文件,以压缩的格式发送到 consumer,消息从 producer 发出到 consumer 拿到都被是压缩的,只有在 consumer 使用的时候才被解压缩,所以叫做『端到端的压缩』。Kafka支持GZIP和Snappy压缩协议。

参考链接

捧个钱场?