kafka生产环境集群方案及参数配置

写在前面

本文主要聊一聊生产环境的kafka方案选型和重要集群参数配置相关的东西。生产环境的kafka集群部署方案前期的需要评估的东西还是挺多,比如kafka版本选择,操作系统选择,以及磁盘类型及磁盘容量相关的预估等。下面简单介绍一下:

部署方案

操作系统

kafka由scala语言和java开发,编译之后的都是以class文件跑在jvm上面,所以kafka可以运行在市面上所有的OS,比如windos,Linux,mac OS但是不同的操作系统的差异还是给kafka集群造成的很大的影响。 毋庸置疑,你应该部署在Linux上面,生产环境的Kafka集群部署在Linux上面的最多,虽然这个结论不会出乎你的意料,但是详细了解选择Linux操作系统的原因还是要了解一下,主要包含以下三个方面:

  • I/O模型的使用
  • 数据网络的传输效率
  • 社区支持度(毋庸置疑的是Linux平台的社区支持度最好)

什么是I/O模型?

简单的理解,I/O模型就是操作系统执行I/O指令的方法。主流的I/O模型有5中类型:阻塞式I/O、非阻塞式I/O、I/O多路复用、信号驱动I/O和异步I/O。每种I/O 模型都有各自典型的使用场景,比如Java中Socket对象的阻塞模式和非阻塞模式就对应于前两种模型;而 Linux中的系统调用select函数就属于I/O多路复用模型;大名鼎鼎的epoll系统调用则介于第三种和第四种模型之间;至于第五种模型,其实很少有Linux系统支持,反而是Windows系统提供了一个叫IOCP线程模型属于这一种。我们无需纠结每种I/O模型的实现细节,通常我认为后一种模型比前一种模型更高级,就像epoll比select要好一样。

kafka的客户端底层使用了java的selector, java的selector在Linux的实现机制是epoll,而在windows操作系统的实现机制是select,因此kafka部署在Linux上面是占有优势的,能够获得更高效的I/O性能。

数据网络的传输效率的差别

kafka生产和消费的消息都是通过网络传输的,而消息又保存在磁盘中,所以呢,kafka需要在磁盘和网络进行大量数据的传输,如果你是一名运维,你肯定听过Zero copy技术,如果没有,那你需要google一下这个名词。zero copy技术简言之就是当数据在磁盘和网络进行传输时避免内核态数据拷贝到用户态而实现的技术。Linux实现了这样的零拷贝机制,但在window上面必须你的java版本足够高才能支持,据我所知,只有java 8以上的版本才有这种技术,因此,我个人认为windows平台上面部署kafka集群只能用于个人开发环境。

kafka版本

kafka的版本选择很简单,建议至少使用0.11以上的版本,笔者从未使用过0.11以前的版本,一是工作时间不长,二是觉得到1.0以上的版本没有经过足够的市场验证。建议至少使用0.11版本,0.11是使用者最多的一个版本,也是官方patch版本最多的。所以如果有了问题,技术文档和支持方案也是最多的。

磁盘

磁盘资源是对kakfa性能影响最大的,在做kafka集群磁盘时经常面临3个问题,我应该选择什么类型的磁盘,我应该用多大的容量,我是不是应该使用RAID。

在做磁盘类型选择的时候,我应该选择普通的机械磁盘还是固态硬盘?前者成本低且容量大,但易损坏;后者性能优势大,不过单价高。我给出的建议是使用普通机械硬盘即可。因为kafka虽然大量使用磁盘资源,但他使用的方式大多是顺序读写操作,一定程度上面避免了机械盘随机读写慢的劣势。机械盘物美价廉,它容易损坏造成的可靠性问题可以通过软件层面副本机制来保证,所以使用普通的机械盘即可。当然如果你的集群如果是在像aws这样把计算资源和存储资源完全分离的云上面,这些磁盘类型选择的压根跟你没关系,只需要根据iops来评估需要什么类型就OK。

第二个问题是我是否需要使用RAID,使用RAID的主要优势在与提供冗余的磁盘存储空间以及提供负载均衡。RAID的优势对于任何一个分布式系统都是有很强的吸引力,不过对于kafka而言,就显得不是很重要了,因为kafka自身实现了冗余机制来提供可靠性,另一个是使用了partion的概念来自己实现了负载均衡。所以在追求性价比的公司可以不使用RAID,使用普通的机械盘组成存储就可以。

第三个问题是我应该使用多大的磁盘容量。这是一个比较经典的规划问题,kafka集群本身是把消息以日志的方式存储在磁盘上面,消息会默认被保存一段时间后会被清理。这个参数是可配置的,需要根据具体的业务场景来评估你的磁盘容量。 可以使用单位时间内的数据集大小来评估,也可以根据消息大小和单位时间内的消息条数来预估容量。下面给出了2种预估容量的方式:

1
2
10G(每小时数据大小) * 168(假设保存7天)* 3(保存副本数)x 1.1(预留10%的容量) = 5.41T(集群总容量)
1KB(平均单条消息大小) * 10000000(每小时1000万条消息)* 168(假设保存7天)* 3(保存副本数)x 1.1(预留10%的容量) = 5.16T(集群总容量)

带宽

目前市面上面的物理机房网络也好,云服务的vpc网络也好,常见的带宽有2种:1Gbps的千兆网络和10Gbps的万兆网络,千兆网络是大多数公司使用的标准配置了。 千兆网络在实际生产测试中在达到650M-700M之间就会产生丢包。所以最多使用的带宽资源占总带宽的70%,也就是最大资源使用情况下单台broker机器能使用的资源是700Mbps,但在生产环境中机器的带宽资源不可能只留给kafka使用,至少应该预留出来2/3的带宽资源给其他进程使用,比如监控,系统其他进程等其他。这样算下来单个broker使用的带宽也就只有240Mbps。

1
集群每小时处理数据量(MB) = 240Mbps/8(单节点每s处理数据大小) * 3600 * n(集群的节点数) / m(副本数)

最小的3个broker的集群,在3个副本的情况下,集群每小时大约处理108G的数据。

“不谋万世者,不足谋一时;不谋全局者,不足谋一域”, 与其在集群使用过程中费时费力的调整,不如提前在集群部署时就根据实际的业务场景规划所需的环境,通盘考虑部署方案,从多个维度思考部署方案应该如何评估。


集群重要参数

在生产环境的kafka集群有很多集群相关配置并未在官方文档中提现,但是这些参数对整个集群的影响是相当大的。我主要根据我所维护的生产环境的参数出发列出一些重要的。

broker级别参数

kafka broker的参数相当多,有近200多个,大多数参数我们使用默认值就可以。下面简单介绍一下:
存储相关参数:

  • log.dirs: 这个参数是笔者认为相当重要的参数,这个参数指定了broker日志存储的目录路径信息,此参数没有默认值,需要你显示的指定。他的格式是使用分号分割的一组目录。 笔者希望你把多块物理磁盘挂载到不同的log.dirs目录里面,这样做也是前文所述不需要使用SSD的原因,这样可以提升读写性能,比起单块磁盘,多块盘有更高的吞吐,相当于逻辑上做了一个RAID0. 而且如果你使用的kafka版本在1.1及其以上版本的话,还能实现故障转移,也就是Failover, broker上面的使用的任何一块磁盘坏了,此磁盘的数据会自动转移到其他正常的磁盘上面,broker还能正常提供工作。 这也是不需要使用RAID的原因。但是笔者使用的0.11版本并不支持此功能,但是笔者维护的集群使用的是AWS的EBS也无需考虑RAID等问题,所以使用0.11版本的用户如果不使用云盘的情况,考虑数据可靠性的情况下使用RAID还是很有必要的。

zookeeper参数:
zookeeper是一个分布式的协调框架,负责协调管理并保存kafka集群的所有元数据信息,比如集群都有哪些broker在运行,已经创建了哪些topic, 以及topic的分区及leader副本等信息。

  • zookeeper.connect 这也是一个已逗号分隔的CSV格式参数,值为zookeeper的各节点及端口信息,如zookeeper1:2181,zookeeper2:2181,zookeeper3:2181, 如果多个kafka集群使用同一个zookeeper集群,需要使用zookeeper的chroot概念,chroot类似于别名。假设我们有2个kafka集群(kafka1,kafka2),zookeeper.connect参数的配置如下:
    1
    2
    zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka1
    zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka2

topic管理相关参数:

  • auto.create.topics.enable:是否允许自动创建Topic。生产环境建议设置为false,从运维角度出发,线上应该创建什么topic必须有运维来把控,防止研发误操作来创建很多无用的topic来增加后期的维护成本。
  • unclean.leader.election.enable:是否允许Unclean Leader选举。从数据完整性角度考虑,强烈建议关闭Unclean Leader选举这个参数。 每个partition都有多个副本来提供高可用,这些副本只有一个对外提供副本即leader副本。在做leader选举过程中,只有保存数据数据比较多的副本才有资格竞选leader, 那些进度落后太多的副本是没资格做这件事的。如果此参数为false状态,就是不让unclean的副本去做竞选leader的操作。反之,如果一个进度落后很多的副本能够竞选leader,并成功成为了leader,就会造成数据丢失。这种情况在生产环境是不允许出现的。
  • auto.leader.rebalance.enable:是否允许定期进行Leader选举。此参数也应该设置为false. 因为定期进行leader选举对性能来讲没有收益,而且如果定期更换leader对客户端程序来讲成本是很高的。虽然此参数的默认参数也是false,但笔者还是建议显示的指定为false。

数据留存相关参数:

  • log.retention.{hour|minutes|ms}:这是一组参数,都是控制一条消息数据被保存多长时 间。从优先级上来说ms设置最高、minutes次之、hour最低。通常情况下使用hour级别的最多,笔者也是使用hour级别参数,log.retention.hour=72表示默认保存7天的数据,自动清除3天前的数据。如果消息要保存的时间要调长,也应相应的调大此值
  • log.retention.bytes:这是指定Broker为消息保存的总磁盘容量大小。 默认值是-1,表示你想在这台broker上面保存多少数据都行。建议设置一个合理的上限。
  • message.max.bytes:控制Broker能够接收的最大消息大小。默认值是1000012。实际生产环境中单条消息突破1M的还是很多的,所以设置一个比较大的值是比较保险的,设置的大一些也仅仅只能限制最大的消息大小,并不会多耗费其他资源。

topic级别参数

在新版本的kafka中提供了一些topic的相关参数,笔者在生产环境中并未做这些参数配置,这里简单讲说下这些参数:

如果同时设置了topic级别参数和全局的broker参数,topic级别参数会覆盖全局参数。这个topic参数只是作为一个补充,让我们的生成环境更实用与具体的业务场景,也更灵活。

消息保存相关参数:

  • retention.ms:规定了该Topic消息被保存的时长。默认是7天,即该Topic只保存最近7天的消息。一旦 设置了这个值,它会覆盖掉Broker端的全局参数值。
  • retention.bytes:规定了要为该Topic预留多大的磁盘空间。和全局参数作用相似,这个值通常在多 租户的Kafka集群中会有用武之地。当前默认值是-1,表示可以无限使用磁盘空间。

topic也有最大消息大小相关的参数max.message.byte,如果在全局方面我们不好给定一个合适的值,就设置一个相对大的保险值,然后再有具体的topic来设置max.message.byte,这个在具体是业务场景下是很常见的。

这些参数可以在创建topic时指定,也可以在修改topic时指定,个人觉得使用一种方式去操作会容易记忆,即在修改topic时区指定对应的参数,因为修改topic这种方式社区应该会统一由kafka-configs脚本来统一管理这些参数。

JVM参数

kafka服务端代码是在scala语言编写,最后运行在jvm上。所以说jvm的性能调优是很必要的。如果没有调整过建议调整到你本机内存的一半。只需设定下面两个环境变量就好。

  • KAFKA_HEAP_OPTS:指定堆大小。
  • KAFKA_JVM_PERFORMANCE_OPTS:指定GC参数。
    1
    2
    export KAFKA_HEAP_OPTS=--Xms6g  --Xmx6g
    export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true"

操作系统参数

Kafka并不需要设置太多的OS 参数,但有些因素最好还是关注一下,比如下面这几个:

  • 文件描述符限制,笔者调整到100000,调整这个值并不会对系统有太大的影响,其次,如果你和笔者一样使用supervisord来管理进程的话,要把supervisord的minfds设置为和ulimit大小相差不大的值,不设置后果很严重,你会看到Too many open files”的错误。
  • 文件系统类型 , 我建议使用xfs,官方测试出来的结果也要强于ext4等文件系统
  • Swappiness ,很多运维在做规划服务器规划过程中是不设置swap的。我们生产环境也是如此,但是我个人反倒觉得不要禁用swap分区,可以设置一个较小的值用于报警,因为一旦禁用swap分区,当内存耗尽时会触发OOM killer组件,会随机挑选进程去kill掉,根本不给用户任何预警,这也是我们使用supervisord去管理服务的一个方面,当进程挂掉之后,supervisord会把此进程重新拉起。
  • 提交时间, 提交时间也就是flush的落盘时间。向Kafka发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据LRU算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是5秒。一般情况下我们 会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于Kafka在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能 还是一个合理的做法。