小土刀

【通天塔之日志分析平台】肆 从单机到集群

之前所做的工作都是在单机上运行,随着业务数据量的增长和维持服务高可用的需要,基本都得进行从单机到集群的变迁。从单机到集群主要需要克服的问题和多人合作一样,就是沟通问题。如果需要向全球提供服务,全球部署带来的高延迟和不稳定同样也是一个难题。本文会简单介绍一些解决这些问题的思路。


更新历史

  • 2016.11.23: 完成初稿

系列文章

任务目标

  1. 完成 Elasticsearch 集群的搭建
  2. 完成 Kafka 集群的搭建

Elasticsearch 集群配置

开发人员花了很多心思,尽量让一台机上运行和集群上运行的体验一致,具体说,Elasticsearch 在底层主要做的工作有:

  • 根据配置进行数据分片(sharding),并且保存在一个或者多个节点中
  • 将分片均匀分配到各个节点,对索引和搜索做负载均衡
  • 分片冗余,提高容错性
  • 将集群中任意一个节点上的请求路由到相应数据所在的节点
  • 增加或者移除节点时无缝迁移数据

安装

虽然 Elasticsearch 的安装比较简单,不过我还是写了一个安装脚本,可以在这里查看,具体来说其实就两步,下载和解压,如下:

wget https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.4.0/elasticsearch-2.4.0.tar.gz
tar -xvzf elasticsearch-2.4.0.tar.gz

分别在集群中每台机器中完成安装即可,具体的启动也非常简单,如果要在前台,直接 ./bin/elasticsearch 即可,如果要放到后台,则使用 nohup ./bin/elasticsearch &

配置

配置文件位于 config 文件夹中,其中 elasticsearch.yml 是 elasticsearch 的配置,而 logging.yml 是输出日志相关的设置。配置文件的内容有很多,不过因为默认值基本都够用了,所以我们只需要配置很少的内容。假设现在有两个节点,内部 IP 地址分别为 A: 10.1.1.0B: 10.1.1.1,那么配置为:

# 节点 A
cluster.name: wdxtubes
node.name: "es01"
bootstrap.mlockall: true
network.host: 10.1.1.0
network.publish_host: 10.1.1.0
discovery.zen.ping.unicast.hosts: ["10.1.1.1"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s
# 节点 B
cluster.name: wdxtubes
node.name: "es02"
bootstrap.mlockall: true
network.host: 10.1.1.1
network.publish_host: 10.1.1.1
discovery.zen.ping.unicast.hosts: ["10.1.1.0"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s

这里需要注意的是我们采用单播的方式来进行集群中机器的查找,因为 elasticsearch 已经尽量帮我们做好了集群相关的工作,只要保证 cluster.name 一致,就可以自动发现。另外,我们调大了超时的间隔和互相 ping 发送的频率以及重试次数,防止某台机器在 Full GC 的时候因未能及时响应而造成的连锁反应(后面会详细说明)

多说一句,机器配置的时候,最好确保两台机器可以互相 ping 通,并开放所有端口的内部访问(如果是用云主机的话,尤其需要注意这一点)

如果需要扩展的话,只需要保证 cluster.name 一致即可,比如说现在新加入一台 C: 10.1.1.2,那么配置可以这么写

# 节点 C
cluster.name: wdxtubes
node.name: "es03"
bootstrap.mlockall: true
network.host: 10.1.1.2
network.publish_host: 10.1.1.2
discovery.zen.ping.unicast.hosts: ["10.1.1.0"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s

这里 discovery.zen.ping.unicast.hosts 中只需要填写原有集群中任意一台机器的地址即可。

然后我们可以在集群中的机器上使用 curl http://10.1.1.0:9200/_cluster/health 来查看集群状态。比如:

{
"cluster_name":"wdxtub-es",
"status":"green",
"timed_out":false,
"number_of_nodes":2,
"number_of_data_nodes":2,
"active_primary_shards":821,
"active_shards":1642,
"relocating_shards":0,
"initializing_shards":0,
"unassigned_shards":0,
"delayed_unassigned_shards":0,
"number_of_pending_tasks":0,
"number_of_in_flight_fetch":0,
"task_max_waiting_in_queue_millis":0,
"active_shards_percent_as_number":100.0
}

如果状态是 green,那就没有问题啦。下面我们会结合不同的实例进行介绍

重启

Elasticsearch 的重启是一个非常需要按规矩操作的过程,否则会带来一系列的意想不到的问题,所以一定要按照官方建议的步骤来进行。

首先,因为 Elasticsearch 自带的高可用机制,一旦一个节点下线,就会在集群内部进行数据的重分配,会带来很多不必要的开销,所以需要先关闭,关闭方法是给集群发送一个请求,这个请求可以动态修改集群的设置:

PUT /_cluster/settings
{
"persistent": {
"cluster.routing.allocation.enable": "none"
}
}

而在重启之后需要进行数据恢复,如果停止索引并发送一个同步刷新请求,这个过程就会快很多,需要注意的是,如果此时有任何正在进行的索引操作,这个 flush 操作会失败,因此必要时我们可以重试多次,这是安全的:

POST /_flush/synced

现在我们可以停止集群中的各个节点,完成重启或升级的操作。具体单台机器的操作可以看这里

完成之后,我们最好先启动那些 node.master 设置为 true 的节点(这也是默认设置),等到集群选举出了 master 节点,就可以继续添加数据节点了(即那些 node.master 为 false 且 node.data 为 true 的),这里我们可以用以下方式进行监控

GET _cat/health
GET _cat/nodes

每个节点加入集群之后,就会开始恢复本地保存的首要分片,一开始 _cat/health 查询的结果是 red,之后会变成 yellow,也就意味着所有的首要分片已经恢复了,但是其他的复制分片还没有恢复,因为我们一开始已经设置不恢复复制分片。

最后一步,我们需要重新开启集群的数据重分配,以保证集群的高可用性,操作也很简单:

PUT /_cluster/settings
{
"persistent": {
"cluster.routing.allocation.enable": "all"
}
}

当使用 _cat/health 的结果为 green 时,则重启和恢复顺利完成。

监控

无论是 Elasticsearch 官方还是社区,有很多插件可以完成监控的任务,但是本文只介绍默认的 API,主要是 _cat_cluster 这两个接口,具体的文档可以在 cat APIcluster API 中查看,这里简要介绍一下。

对于 _cat 接口,在请求后面加上 ?v 就会输出详细信息,例如:

wdxtub:~$ curl 10.1.1.10:9200/_cat/master?v
id host ip node
AoVFmiU4Q2SAHNVcMGPsWQ 10.1.1.11 10.1.1.11 node-2

如果对于字段的名字有疑问,可以使用 ?help,例如:

wdxtub:~$ curl 10.1.1.10:9200/_cat/master?help
id | | node id
host | h | host name
ip | | ip address
node | n | node name

如果只想要查看指定字段,可以利用 ?h= 来进行指定,例如:

wdxtub:~$ curl 10.1.1.10:9200/_cat/nodes?h=ip,port,heapPercent,name
10.1.1.11 9300 64 node-2
10.1.1.10 9300 71 node-1

对于带数字的输出,可以利用管道来进行排序,比如下面的命令就可以按照索引大小来进行排序(这里的 -rnk8 指的是按照第八列排序):

wdxtub:~$ curl 10.1.1.10:9200/_cat/indices?bytes=b | sort -rnk8
green open slog-2016-09-11 5 1 9729152 0 11128793222 5564396611
green open slog-2016-09-12 5 1 8355880 0 9539380440 4769690220
green open slog-2016-09-25 5 1 6720954 0 7415719218 3707859609
green open slog-2016-09-19 5 1 5840177 0 6575155002 3287577501
green open slog-2016-09-10 5 1 5858916 0 6504251544 3252125772

其他比较常用的命令如下所示,具体的可以参阅文档,这里不再赘述:

  • _cat/count 文档总数
  • _cat/count/[index_name] 某个索引的文档总数
  • _cat/fielddata?v 显示每个节点的字段的堆内存使用量
  • _cat/health?v 节点的健康状况
    • 可以使用下面的命令来自动检查集群状况
    • while true; do curl localhost:9200/_cat/health; sleep 120; done
  • _cat/indices?v 查看每个索引的详细信息,配合管道命令可以有很多应用,比如
    • 找出所有状态为 yellow 的索引 curl localhost:9200/_cat/indices | grep ^yell
    • 排序 curl 'localhost:9200/_cat/indices?bytes=b' | sort -rnk8
    • 指定列及内存使用状况 curl 'localhost:9200/_cat/indices?v&h=i,tm'
  • _cat/nodes 展示集群的拓扑结构
  • _cat/pending_tasks?v 显示正在排队的任务
  • _cat/recovery?v 显示分片恢复的过程
  • _cat/thread_pool?v 显示线程池相关信息,有很多信息,可以根据需要进行查询
  • _cat/shards?v 显示分片的相关信息
  • _cat/shards/[index-name] 显示指定索引的分片信息

_cluster 的接口的用法和 _cat 类似,这里就不再赘述了。

Kafka 集群配置

kafka 使用 ZooKeeper 用于管理、协调代理。每个 Kafka 代理通过 Zookeeper 协调其他 Kafka 代理。当 Kafka 系统中新增了代理或某个代理失效时,Zookeeper 服务将通知生产者和消费者。生产者与消费者据此开始与其他代理协调工作。

安装 Java

先给两台机子安装 Java

sudo add-apt-repository -y ppa:webupd8team/java
sudo apt-get update
sudo apt-get -y install oracle-java8-installer

更新 Hosts

这里用两台机器做例子(理论上最好是 3 台起步,偶数个不是不可以的,但是zookeeper集群是以宕机个数过半才会让整个集群宕机的,所以奇数个集群更佳),分别配置 /etc/hosts 文件为

127.0.0.1 localhost
10.1.1.164 bi03
10.1.1.44 bi02

修改 Zookeeper 配置文件

修改 config/zookeeper.properties

dataDir=/data/home/logger/kafka_2.11-0.10.0.0/zookeeper-logs/
clientPort=2181
# maxClientCnxns=0
tickTime=2000
initLimit=5
syncLimit=2
server.1=bi03:13645:13646
server.2=bi02:13645:13646

参数的意义为:

  • initLimit: zookeeper集群中的包含多台 server,其中一台为 leader,集群中其余的 server 为 follower。initLimit 参数配置初始化连接时,follower 和 leader 之间的最长心跳时间。此时该参数设置为 5,说明时间限制为 5 倍 tickTime,即 5*2000=10000ms=10s
  • syncLimit: 该参数配置 leader 和 follower 之间发送消息,请求和应答的最大时间长度。此时该参数设置为 2,说明时间限制为 2 倍 tickTime,即 4000ms
  • server.X=A:B:C 其中 X 是一个数字, 表示这是第几号 server。A 是该 server 所在的 IP 地址。B 配置该 server 和集群中的 leader 交换消息所使用的端口。C 配置选举 leader 时所使用的端口。

给服务器编号

在 dataDir 目录下建立一个 myid 文件,分别为

# server.1
echo 1 > myid
# server.2
echo 2 > myid

启动 Zookeeper

然后在每台机子上启动 zookeeper 服务

bin/zookeeper-server-start.sh config/zookeeper.properties &

所有机子的 zookeeper 都启动之前会报错,这都是正常的

如果不想要任何输出

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

修改 Kafka 配置文件

修改 config/server.properties,几个要改的部分是

# 允许删除 topic
delete.topic.enable=true
broker.id=0 # 这里不能重复
listeners=PLAINTEXT://bi03:13647 # 这里要配置成本机的 host name
# 这里需要配置成外网能够访问的地址及端口
advertised.listeners=PLAINTEXT://external.ip:8080
log.dirs=/data/home/logger/kafka_2.11-0.10.0.0/kafka-logs
num.partitions=2
zookeeper.connect=bi03:2181,bi02:2181

启动 Kafka

在每个节点上执行

bin/kafka-server-start.sh config/server.properties &

如果不想要任何输出

nohup bin/kafka-server-start.sh config/server.properties &

验证安装

创建一个 topic

bin/kafka-topics.sh --create --zookeeper bi03:2181,bi02:2181 --replication-factor 2 --partitions 1 --topic test

查看集群状态

bin/kafka-topics.sh --describe --zookeeper bi03:2181,bi02:2181 --topic test

生产消息,这里注意要生产到前面设置的监听端口,而不是 zookeeper 的端口

bin/kafka-console-producer.sh --broker-list bi03:13647,bi02:13647 --topic test

消费消息,这里注意是 zookeeper 的端口,而不是 kafka 的端口

bin/kafka-console-consumer.sh --zookeeper bi03:2181,bi02:2181 --from-beginning --topic test

显示 topic 列表

bin/kafka-topics.sh --zookeeper bi03:2181,bi02:2181 --list

删除 topic

bin/kafka-topics.sh --zookeeper bi03:2181,bi02:2181 --delete --topic hello

其他配置

Kafka 使用键值对的属性文件格式来进行配置,比如 config/server.properties,具体的值可以从文件中读取,或者在代码中进行指定。最重要的三个属性是:

  • broker.id: broker 的编号,不能相同
  • log.dirs: 日志保存的文件夹,默认为 /tmp/kafka-logs
  • zookeeper.connect: zookeeper 的 host

其他一些我觉得比较有用的属性为

  • auto.create.topics.enable 是否允许自动创建 topic,boolean 值,默认为 true
  • auto.leader.rebalance.enable 是否允许 leader 进行自动平衡,boolean 值,默认为 true
  • background.threads 后台进程数目,int 值,默认为 10 个
  • compression.type 指定 topic 的压缩方式,string 值,可选有
    • gzip, snappy, lz4 压缩方法
    • uncompressed 不压缩
    • producer 跟随 producer 的压缩方式
  • delete.topic.enable 是否允许删除 topic,boolean 值,默认为 false(主要用于控制 admin 界面中的控制)
  • leader.imbalance.check.interval.seconds 检查是否平衡的时间间隔,long 值,默认为 300
  • leader.imbalance.per.broker.percentage 允许的不平衡的百分比,超出则会进行重平衡,int 值,默认为 10
  • log.flush.interval.messages 攒了多少条消息之后会把数据刷入磁盘,long 值,默认是 9223372036854775807
  • log.flush.interval.ms 每条消息在保存到磁盘中前会在内存中待多久,单位毫秒,long 值,如果不设定,默认使用 log.flush.scheduler.interval.ms,也就是 9223372036854775807

更多的配置可以参考这里,以上的配置均针对 broker,因为目前我只用 broker 的部分

试一试

  1. 尝试主动关闭集群中某一台机器,看看整个集群是如何保证高可用的
  2. Zookeeper 在 Kafka 集群中起到了什么作用?尝试使用单独的 zookeeper 而不是 Kafka 自带的

总结

本节中的配置需要多台机器或者是一台机器开多个实例进行测试,对于没有条件的同学来说可能会比较麻烦,不过现在 AWS 有一年的免费使用时间(当然只有最基础的机器),所以大家其实可以申请一个 AWS 帐号,在上面进行学习和测试(而且速度也会更快)。接下来的几篇文章会具体介绍 Logstash, Elasticsearch 和 Kibana 的相关使用。

捧个钱场?

热评文章