文章目录[隐藏]
提到消息系统,目前最火热的非 Kafka 莫属,公司也打算利用 Kafka 进行各业务日志统一收集,这里结合自己的实践来分享一下具体的配置及使用。Kafka 版本 0.10.0.1
介绍
作为云计算大数据的套件,Kafka 是一个分布式的、可分区的、可复制的消息系统。该有的功能基本都有,而且有自己的特色:
- 以 topic 为单位进行消息归纳
- 向 topic 发布消息的是 producer
- 从 topic 获取消息的是 consumer
- 集群方式运行,每个服务叫 broker
- 客户端和服务器通过 TCP 进行通信
在Kafka集群中,没有“中心主节点”的概念,集群中所有的服务器都是对等的,因此,可以在不做任何配置的更改的情况下实现服务器的的添加与删除,同样的消息的生产者和消费者也能够做到随意重启和机器的上下线。
对每个 topic 来说,Kafka 会对其进行分区,每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。分区中的每个消息都有一个连续的序列号叫做 offset,用来在分区中唯一的标识这个消息。
发布消息通常有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers 可以同时从服务端读取消息,每个消息只被其中一个 consumer 读到;发布-订阅模式中消息被广播到所有的 consumer 中。更常见的是,每个 topic 都有若干数量的 consumer 组,每个组都是一个逻辑上的『订阅者』,为了容错和更好的稳定性,每个组由若干 consumer 组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个 consumer。
通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。
Kafka 只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要 topic 中所有消息的有序性,那就只能让这个 topic 只有一个分区,当然也就只有一个 consumer 组消费它。
单机配置
按照下列步骤即可(来自官网教程)
1. 下载 Kafka
- 下载
wget http://apache.01link.hk/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
或者wget http://ftp.cuhk.edu.hk/pub/packages/apache.org/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
(看哪个源比较快) - 解压
tar -xzf kafka_2.11-0.10.0.0.tgz
- 进入文件夹
cd kafka_2.11-0.10.0.0/
2. 启动服务
- 启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
(利用&
放到后台方便继续操作) - 启动 Kafka
bin/kafka-server-start.sh config/server.properties &
3. 创建一个叫做 dawang 的 topic,它只有一个分区,一个副本
- 创建
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dawang
- 查看
bin/kafka-topics.sh --list --zookeeper localhost:2181
- 还可以配置 broker 让它自动创建 topic
4. 发送消息。Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。
- 发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dawang
(然后可以随意输入内容,回车可以发送,ctrl+c 退出)
5. 启动 consumer。可以读取消息并输出到标准输出:
- 接收消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dawang --from-beginning
- 在一个终端中运行 consumer 命令行,另一个终端中运行 producer 命令行,就可以在一个终端输入消息,另一个终端读取消息。这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。
6. 搭建一个多个 broker 的集群,启动有 3 个 broker 组成的集群,这些 broker 节点也都在本机
首先复制一下配置文件:cp config/server.properties config/server-1.properties
和 cp config/server.properties config/server-2.properties
两个文件需要改动的内容为:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
|
这里我们把 broker id, 端口号和日志地址配置成和之前不一样,然后我们启动这两个 broker:
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
|
然后创建一个复制因子为 3 的 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic oh3topic
可以使用 describe
命令来显示 topic 详情
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oh3topic
Topic:oh3topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: oh3topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
|
这里简单解释一下
- Leader 是给定分区的节点编号,每个分区的部分数据会随机指定不同的节点
- Replicas 是该日志会保存的复制
- Isr 表示正在同步的复制
我们也可以来看看之前的另一个 topic 的情况
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic dawang
Topic:dawang PartitionCount:1 ReplicationFactor:1 Configs:
Topic: dawang Partition: 0 Leader: 0 Replicas: 0 Isr: 0
|
最后我们可以按照同样的方法来生产和消费消息,例如
# 生产
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic oh3topic
# 消费
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic oh3topic
|
开俩终端就可以一边生产消息,一边消费消息了。
注意事项
如果要配置自定义端口,server.properties
中 listeners 一定要配置成为 IP 地址;如果配置为 localhost 或服务器的 hostname,在使用 java 发送数据时就会抛出异
# 创建 topic
bin/kafka-topics.sh --create --zookeeper bi03:2181 --replication-factor 1 --partitions 1 --topic logs
# 生产消息
bin/kafka-console-producer.sh --broker-list localhost:13647 --topic logs
# 消费消息
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs
|
如果 Zookeeper 出现 fsync-ing the write ahead log in SyncThread:1 took 2243ms which will adversely effect operation latency. See the ZooKeeper troubleshooting guide
,是因为 FOLLOWER 在跟 LEADER 同步时,fsync 操作时间过长,导致超时。增加 tickTime
或者 initLimit
和 syncLimit
的值即可
集群配置
kafka 使用 ZooKeeper 用于管理、协调代理。每个 Kafka 代理通过 Zookeeper 协调其他 Kafka 代理。当 Kafka 系统中新增了代理或某个代理失效时,Zookeeper 服务将通知生产者和消费者。生产者与消费者据此开始与其他代理协调工作。
安装 Java
先给两台机子安装 Java
sudo add-apt-repository -y ppa:webupd8team/java
sudo apt-get update
sudo apt-get -y install oracle-java8-installer
|
更新 Hosts
这里用两台机器做例子(理论上最好是 3 台起步,偶数个不是不可以的,但是zookeeper集群是以宕机个数过半才会让整个集群宕机的,所以奇数个集群更佳),分别配置 /etc/hosts
文件为
127.0.0.1 localhost
10.1.1.164 bi03
10.1.1.44 bi02
|
修改 Zookeeper 配置文件
修改 config/zookeeper.properties
为
dataDir=/data/home/logger/kafka_2.11-0.10.0.0/zookeeper-logs/
clientPort=2181
# maxClientCnxns=0
tickTime=2000
initLimit=5
syncLimit=2
server.1=bi03:13645:13646
server.2=bi02:13645:13646
|
参数的意义为:
- initLimit: zookeeper集群中的包含多台 server,其中一台为 leader,集群中其余的 server 为 follower。initLimit 参数配置初始化连接时,follower 和 leader 之间的最长心跳时间。此时该参数设置为 5,说明时间限制为 5 倍 tickTime,即
5*2000=10000ms=10s
- syncLimit: 该参数配置 leader 和 follower 之间发送消息,请求和应答的最大时间长度。此时该参数设置为 2,说明时间限制为 2 倍 tickTime,即 4000ms
- server.X=A:B:C 其中 X 是一个数字, 表示这是第几号 server。A 是该 server 所在的 IP 地址。B 配置该 server 和集群中的 leader 交换消息所使用的端口。C 配置选举 leader 时所使用的端口。
给服务器编号
在 dataDir 目录下建立一个 myid 文件,分别为
# server.1
echo 1 > myid
# server.2
echo 2 > myid
|
启动 Zookeeper
然后在每台机子上启动 zookeeper 服务
bin/zookeeper-server-start.sh config/zookeeper.properties &
所有机子的 zookeeper 都启动之前会报错,这都是正常的
如果不想要任何输出
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
修改 Kafka 配置文件
修改 config/server.properties
,几个要改的部分是
# 允许删除 topic
delete.topic.enable=true
broker.id=0 # 这里不能重复
listeners=PLAINTEXT://bi03:13647 # 这里要配置成本机的 host name
# 这里需要配置成外网能够访问的地址及端口
advertised.listeners=PLAINTEXT://external.ip:8080
log.dirs=/data/home/logger/kafka_2.11-0.10.0.0/kafka-logs
num.partitions=2
zookeeper.connect=bi03:2181,bi02:2181
|
启动 Kafka
在每个节点上执行
bin/kafka-server-start.sh config/server.properties &
如果不想要任何输出
nohup bin/kafka-server-start.sh config/server.properties &
验证安装
创建一个 topic
bin/kafka-topics.sh --create --zookeeper bi03:2181,bi02:2181 --replication-factor 2 --partitions 1 --topic test
查看集群状态
bin/kafka-topics.sh --describe --zookeeper bi03:2181,bi02:2181 --topic test
生产消息,这里注意要生产到前面设置的监听端口,而不是 zookeeper 的端口
bin/kafka-console-producer.sh --broker-list bi03:13647,bi02:13647 --topic test
消费消息,这里注意是 zookeeper 的端口,而不是 kafka 的端口
bin/kafka-console-consumer.sh --zookeeper bi03:2181,bi02:2181 --from-beginning --topic test
显示 topic 列表
bin/kafka-topics.sh --zookeeper bi03:2181,bi02:2181 --list
删除 topic
bin/kafka-topics.sh --zookeeper bi03:2181,bi02:2181 --delete --topic hello
其他配置
Kafka 使用键值对的属性文件格式来进行配置,比如 config/server.properties
,具体的值可以从文件中读取,或者在代码中进行指定。最重要的三个属性是:
broker.id
: broker 的编号,不能相同log.dirs
: 日志保存的文件夹,默认为/tmp/kafka-logs
zookeeper.connect
: zookeeper 的 host
其他一些我觉得比较有用的属性为
auto.create.topics.enable
是否允许自动创建 topic,boolean 值,默认为true
auto.leader.rebalance.enable
是否允许 leader 进行自动平衡,boolean 值,默认为true
background.threads
后台进程数目,int 值,默认为 10 个compression.type
指定 topic 的压缩方式,string 值,可选有gzip
,snappy
,lz4
压缩方法uncompressed
不压缩producer
跟随 producer 的压缩方式
delete.topic.enable
是否允许删除 topic,boolean 值,默认为 false(主要用于控制 admin 界面中的控制)leader.imbalance.check.interval.seconds
检查是否平衡的时间间隔,long 值,默认为 300leader.imbalance.per.broker.percentage
允许的不平衡的百分比,超出则会进行重平衡,int 值,默认为 10log.flush.interval.messages
攒了多少条消息之后会把数据刷入磁盘,long 值,默认是 9223372036854775807log.flush.interval.ms
每条消息在保存到磁盘中前会在内存中待多久,单位毫秒,long 值,如果不设定,默认使用log.flush.scheduler.interval.ms
,也就是 9223372036854775807
更多的配置可以参考这里,以上的配置均针对 broker,因为目前我只用 broker 的部分
基本操作
所有的工具都可以在 bin/
文件夹下查看,如果不带任何参数,就会给出所有命令的列表说明,这里只简要说明一些常用的命令
创建和移除 topic
可以手动创建 topic,或在数据进来时自动创建不存在的 topic,如果是自动创建的话,可能需要根据这里来进行对应调整。
创建 topic
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
replication-factor 控制复制的份数,建议 2-3 份来兼顾容错和效率。partitions 控制该 topic 将被分区的数目,partitions 的数目最好不要超过服务器的个数(因为分区的意义是增加并行效率,而服务器数量决定了并行的数量,假设只有 2 台服务器,分 4 个区和 2 个区其实差别不大)。另外,topic 的名称不能超过 249 个字符
修改 topic
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40
这里需要注意,即使修改了分区的个数,已有的数据也不会进行变动,Kafka 不会做任何自动重分布
增加配置
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
移除配置
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --delete-config x
删除 topic
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
这个需要 delete.topic.enable=true
,目前 Kafka 不支持减少 topic 的分区数目
优雅关闭
Kafka 会自动检测 broker 的状态并根据机器状态选举出新的 leader。但是如果需要进行配置更改停机的时候,我们就需要使用优雅关闭了,好处在于:
- 会把所有的日志同步到磁盘上,避免重启之后的日志恢复,减少重启时间
- 会在关闭前把以这台机为 leader 的分区数据迁移到其他节点,会减少不可用的时间
但是这个需要开启 controlled.shutdown.enable=true
。
刚重启之后的节点不是任何分区的 leader,所以这时候需要进行重新分配:
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
这里需要开启 auto.leader.rebalance.enable=true
然后可以使用脚本 bin/kafka-server-stop.sh
注意,如果配置文件中没有 auto.leader.rebalance.enable=true
,就还需要重新平衡
深入理解
这里只是一部分摘录,更多内容可查阅参考链接(尤其是美团技术博客的那篇)
文件系统
Kafka 大量依赖文件系统去存储和缓存消息。而文件系统最终会放在硬盘上,不过不用担心,很多时候硬盘的快慢完全取决于使用它的方式。设计良好的硬盘架构可以和内存一样快。
所以与传统的将数据缓存在内存中然后刷到硬盘的设计不同,Kafka直接将数据写到了文件系统的日志中,因此也避开了 JVM 的劣势——Java 对象占用空间巨大,数据量增大后垃圾回收有困难。使用文件系统,即使系统重启了,也不需要刷新数据,也简化了维护数据一致性的逻辑。
对于主要用于日志处理的消息系统,数据的持久化可以简单的通过将数据追加到文件中实现,读的时候从文件中读就好了。这样做的好处是读和写都是 O(1) 的,并且读操作不会阻塞写操作和其他操作。这样带来的性能优势是很明显的,因为性能和数据的大小没有关系了。
既然可以使用几乎没有容量限制(相对于内存来说)的硬盘空间建立消息系统,就可以在没有性能损失的情况下提供一些一般消息系统不具备的特性。比如,一般的消息系统都是在消息被消费后立即删除,Kafka却可以将消息保存一段时间(比如一星期),这给consumer提供了很好的机动性和灵活性。
事务定义
数据传输的事务定义通常有以下三种级别:
- 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
- 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
- 精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。
Kafka 的机制和 git 有点类似,有一个 commit 的概念,一旦提交且 broker 在工作,那么数据就不会丢失。如果 producer 发布消息时发生了网络错误,但又不确定实在提交之前发生的还是提交之后发生的,这种情况虽然不常见,但是必须考虑进去,现在Kafka版本还没有解决这个问题,将来的版本正在努力尝试解决。
并不是所有的情况都需要“精确的一次”这样高的级别,Kafka 允许 producer 灵活的指定级别。比如 producer 可以指定必须等待消息被提交的通知,或者完全的异步发送消息而不等待任何通知,或者仅仅等待 leader 声明它拿到了消息(followers没有必要)。
现在从 consumer 的方面考虑这个问题,所有的副本都有相同的日志文件和相同的offset,consumer 维护自己消费的消息的 offset。如果 consumer 崩溃了,会有另外一个 consumer 接着消费消息,它需要从一个合适的 offset 继续处理。这种情况下可以有以下选择:
- consumer 可以先读取消息,然后将 offset 写入日志文件中,然后再处理消息。这存在一种可能就是在存储 offset 后还没处理消息就 crash 了,新的 consumer 继续从这个 offset 处理,那么就会有些消息永远不会被处理,这就是上面说的『最多一次』
- consumer 可以先读取消息,处理消息,最后记录o ffset,当然如果在记录 offset 之前就 crash 了,新的 consumer 会重复的消费一些消息,这就是上面说的『最少一次』
- 『精确一次』可以通过将提交分为两个阶段来解决:保存了 offset 后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的 offset 和消息被处理后的结果保存在一起。比如用 Hadoop ETL 处理消息时,将处理后的结果和 offset 同时保存在 HDFS 中,这样就能保证消息和 offser 同时被处理了
性能优化
Kafka 在提高效率方面做了很大努力。Kafka 的一个主要使用场景是处理网站活动日志,吞吐量是非常大的,每个页面都会产生好多次写操作。读方面,假设每个消息只被消费一次,读的量的也是很大的,Kafka 也尽量使读的操作更轻量化。
线性读写的情况下影响磁盘性能问题大约有两个方面:太多的琐碎的 I/O 操作和太多的字节拷贝。I/O 问题发生在客户端和服务端之间,也发生在服务端内部的持久化的操作中。
消息集(message set)
为了避免这些问题,Kafka 建立了消息集(message set)的概念,将消息组织到一起,作为处理的单位。以消息集为单位处理消息,比以单个的消息为单位处理,会提升不少性能。Producer 把消息集一块发送给服务端,而不是一条条的发送;服务端把消息集一次性的追加到日志文件中,这样减少了琐碎的 I/O 操作。consumer 也可以一次性的请求一个消息集。
另外一个性能优化是在字节拷贝方面。在低负载的情况下这不是问题,但是在高负载的情况下它的影响还是很大的。为了避免这个问题,Kafka 使用了标准的二进制消息格式,这个格式可以在 producer, broker 和 producer 之间共享而无需做任何改动。
zero copy
Broker 维护的消息日志仅仅是一些目录文件,消息集以固定队的格式写入到日志文件中,这个格式 producer 和 consumer 是共享的,这使得 Kafka 可以一个很重要的点进行优化:消息在网络上的传递。现代的 unix 操作系统提供了高性能的将数据从页面缓存发送到 socket 的系统函数,在 linux 中,这个函数是 sendfile
为了更好的理解 sendfile
的好处,我们先来看下一般将数据从文件发送到 socket 的数据流向:
- 操作系统把数据从文件拷贝内核中的页缓存中
- 应用程序从页缓存从把数据拷贝自己的内存缓存中
- 应用程序将数据写入到内核中 socket 缓存中
- 操作系统把数据从 socket 缓存中拷贝到网卡接口缓存,从这里发送到网络上。
这显然是低效率的,有 4 次拷贝和 2 次系统调用。sendfile
通过直接将数据从页面缓存发送网卡接口缓存,避免了重复拷贝,大大的优化了性能。
在一个多consumers的场景里,数据仅仅被拷贝到页面缓存一次而不是每次消费消息的时候都重复的进行拷贝。这使得消息以近乎网络带宽的速率发送出去。这样在磁盘层面你几乎看不到任何的读操作,因为数据都是从页面缓存中直接发送到网络上去了。
数据压缩
很多时候,性能的瓶颈并非CPU或者硬盘而是网络带宽,对于需要在数据中心之间传送大量数据的应用更是如此。当然用户可以在没有 Kafka 支持的情况下各自压缩自己的消息,但是这将导致较低的压缩率,因为相比于将消息单独压缩,将大量文件压缩在一起才能起到最好的压缩效果。
Kafka 采用了端到端的压缩:因为有『消息集』的概念,客户端的消息可以一起被压缩后送到服务端,并以压缩后的格式写入日志文件,以压缩的格式发送到 consumer,消息从 producer 发出到 consumer 拿到都被是压缩的,只有在 consumer 使用的时候才被解压缩,所以叫做『端到端的压缩』。Kafka支持GZIP和Snappy压缩协议。
每个kafka broker中配置文件server.properties默认必须配置的属性如下:
-
broker.id=0
-
num.network.threads=2
-
num.io.threads=8
-
socket.send.buffer.bytes=1048576
-
socket.receive.buffer.bytes=1048576
-
socket.request.max.bytes=104857600
-
log.dirs=/tmp/kafka-logs
-
num.partitions=2
-
log.retention.hours=168
-
log.segment.bytes=536870912
-
log.retention.check.interval.ms=60000
-
log.cleaner.enable=false
-
zookeeper.connect=localhost:2181
-
zookeeper.connection.timeout.ms=1000000
server.properties中所有配置参数说明(解释)如下列表:
参数 | 说明(解释) |
broker.id =0 | 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况 |
log.dirs=/data/kafka-logs | kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1,/data/kafka-logs-2 |
port =9092 | broker server服务端口 |
message.max.bytes =6525000 | 表示消息体的最大大小,单位是字节 |
num.network.threads =4 | broker处理消息的最大线程数,一般情况下数量为cpu核数 |
num.io.threads =8 | broker处理磁盘IO的线程数,数值为cpu核数2倍 |
background.threads =4 | 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改 |
queued.max.requests =500 | 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。 |
host.name | broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置 |
socket.send.buffer.bytes=100*1024 | socket的发送缓冲区,socket的调优参数SO_SNDBUFF |
socket.receive.buffer.bytes =100*1024 | socket的接受缓冲区,socket的调优参数SO_RCVBUFF |
socket.request.max.bytes =100*1024*1024 | socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖 |
log.segment.bytes =1024*1024*1024 | topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖 |
log.roll.hours =24*7 | 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖 |
log.cleanup.policy = delete | 日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖 |
log.retention.minutes=300
或 log.retention.hours=24 |
数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略
log.retention.bytes和log.retention.minutes或log.retention.hours任意一个达到要求,都会执行删除 有2删除数据文件方式: 按照文件大小删除:log.retention.bytes 按照2中不同时间粒度删除:分别为分钟,小时 |
log.retention.bytes=-1 | topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖 |
log.retention.check.interval.ms=5minutes | 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略 |
log.cleaner.enable=false | 是否开启日志清理 |
log.cleaner.threads = 2 | 日志清理运行的线程数 |
log.cleaner.io.max.bytes.per.second=None | 日志清理时候处理的最大大小 |
log.cleaner.dedupe.buffer.size=500*1024*1024 | 日志清理去重时候的缓存空间,在空间允许的情况下,越大越好 |
log.cleaner.io.buffer.size=512*1024 | 日志清理时候用到的IO块大小一般不需要修改 |
log.cleaner.io.buffer.load.factor =0.9 | 日志清理中hash表的扩大因子一般不需要修改 |
log.cleaner.backoff.ms =15000 | 检查是否处罚日志清理的间隔 |
log.cleaner.min.cleanable.ratio=0.5 | 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖 |
log.cleaner.delete.retention.ms =1day | 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖 |
log.index.size.max.bytes =10*1024*1024 | 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖 |
log.index.interval.bytes =4096 | 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数 |
log.flush.interval.messages=None
例如log.flush.interval.messages=1000 表示每当消息记录数达到1000时flush一次数据到磁盘 |
log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失. |
log.flush.scheduler.interval.ms =3000 | 检查是否需要固化到硬盘的时间间隔 |
log.flush.interval.ms = None
例如:log.flush.interval.ms=1000 表示每间隔1000毫秒flush一次数据到磁盘 |
仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发. |
log.delete.delay.ms =60000 | 文件在索引中清除后保留的时间一般不需要去修改 |
log.flush.offset.checkpoint.interval.ms =60000 | 控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改 |
auto.create.topics.enable =true | 是否允许自动创建topic,若是false,就需要通过命令创建topic |
default.replication.factor =1 | 是否允许自动创建topic,若是false,就需要通过命令创建topic |
num.partitions =1 | 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖 |
以下是kafka中Leader,replicas配置参数 | |
controller.socket.timeout.ms =30000 | partition leader与replicas之间通讯时,socket的超时时间 |
controller.message.queue.size=10 | partition leader与replicas数据同步时,消息的队列尺寸 |
replica.lag.time.max.ms =10000 | replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中 |
replica.lag.max.messages =4000 | 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后 ##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移 ##到其他follower中. ##在broker数量较少,或者网络不足的环境中,建议提高此值. |
replica.socket.timeout.ms=30*1000 | follower与leader之间的socket超时时间 |
replica.socket.receive.buffer.bytes=64*1024 | leader复制时候的socket缓存大小 |
replica.fetch.max.bytes =1024*1024 | replicas每次获取数据的最大大小 |
replica.fetch.wait.max.ms =500 | replicas同leader之间通信的最大等待时间,失败了会重试 |
replica.fetch.min.bytes =1 | fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件 |
num.replica.fetchers=1 | leader进行复制的线程数,增大这个数值会增加follower的IO |
replica.high.watermark.checkpoint.interval.ms =5000 | 每个replica检查是否将最高水位进行固化的频率 |
controlled.shutdown.enable =false | 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker |
controlled.shutdown.max.retries =3 | 控制器关闭的尝试次数 |
controlled.shutdown.retry.backoff.ms =5000 | 每次关闭尝试的时间间隔 |
leader.imbalance.per.broker.percentage =10 | leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡 |
leader.imbalance.check.interval.seconds =300 | 检查leader是否不平衡的时间间隔 |
offset.metadata.max.bytes | 客户端保留offset信息的最大空间大小 |
kafka中zookeeper参数配置 | |
zookeeper.connect = localhost:2181 | zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3 |
zookeeper.session.timeout.ms=6000 | ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大 |
zookeeper.connection.timeout.ms =6000 | ZooKeeper的连接超时时间 |
zookeeper.sync.time.ms =2000 | ZooKeeper集群中leader和follower之间的同步实际那 |
参考链接
- Kafka学习整理六(server.properties配置实践)
- Apache Kafka
- Quick Start
- Kafka入门经典教程
- Apache kafka 工作原理介绍
- 事无巨细 Apache Kafka 0.9.0.1 集群环境搭建
- kafka集群搭建
- Kafka文件存储机制那些事
- kafka原理以及设计实现思想
- kafka设计原理介绍
- Kafka集群操作指南
- What is the actual role of ZooKeeper in Kafka?
本文地址: https://www.xiongge.club/biancheng/elk/1341.html
转载请注明:熊哥club → Kafka_2.11-0.10.0.1 详细使用指南【建议收藏】
©熊哥club,本站推荐使用的主机:阿里云,CDN建议使用七牛云。
关注微信公众号『熊哥club』
免费提供IT技术指导交流
关注博主不迷路~