Apache Kafka 最早是由 LinkedIn 开源出来的分布式消息系统,现在是 Apache 旗下的一个子项目,并且已经成为开源领域应用最广泛的消息系统之一。
Kafka 和传统的消息系统不同在于:
Producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 Kafka 吞吐率)。
Producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
1. 指定了 patition,则直接使用;
2. 未指定 patition,但指定 key,通过对 key 的 value 进行 hash 选出一个 patition
3. patition 和 key 都未指定,使用轮询选出一个 patition。s
Producer 写入消息序列图如下所示:
流程说明:
1. producer 先从 zookeeper 的 “/brokers/.../state” 节点找到该 "partition" 的 leader
2. producer 将消息发送给该 leader
3. leader 将消息写入本地 log
4. followers 从 leader pull 消息,写入本地 log 后,leader 发送 ACK
5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 发送 ACK
一般情况下存在三种情况:
1、At most once 消息可能会丢,但绝不会重复传输
2、At least one 消息绝不会丢,但可能会重复传输
3、Exactly once 每条消息肯定会被传输且仅传输一次
当 Producer 向 Broker 发送消息时,一旦这条消息被 commit,由于 Replication 的存在,它就不会丢。但是如果 Producer 发送数据给 Broker 后,遇到网络问题而造成通信中断,那 Producer 就无法判断该条消息是否已经 commit。虽然 Kafka 无法确定网络故障期间发生了什么,但是 Producer 可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了 Exactly once,但目前还未实现。所以目前默认情况下一条消息从 Producer 到 Broker 是确保了 At least once,可通过设置 Producer 异步发送实现 At most once。
物理上把 Topic 分成一个或多个 Patition(对应 server.properties 中的 num.partitions=3 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件)
无论消息是否被消费,Kafka 都会保留所有消息。有两种策略可以删除旧数据:
1. 基于时间:log.retention.hours=168
2. 基于大小:log.retention.bytes=1073741824
需要注意的是,因为 Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
创建 Topic 的序列图如下所示:
流程说明:
删除 Topic 的序列图如下所示:
流程说明:
Kafka 提供了两套 Consumer API:
1、The high-level Consumer API
2、The SimpleConsumer API
其中 high-level Consumer API 提供了一个从 Kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多的关注细节。
该 API 提供了 Consumer Group 的语义,一个消息只能被 Group 内的一个 Consumer 所消费,且 Consumer 消费消息时不关注 Offset,最后一个 Offset 由 Zookeeper 保存。
使用该 API 可以是多线程的应用,应当注意:
1、如果消费线程大于 patition 数量,则有些线程将收不到消息
2、如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
3、如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的
如果你想要对 Patition 有更多的控制权,那就应该使用该 API,比如:
1、多次读取一个消息
2、只消费一个 patition 中的部分消息
3、使用事务来保证一个消息仅被消费一次
但是使用此 API 时,Partition、Offset、Broker、Leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作:
1、必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
2、应用程序需要通过程序获知每个 Partition 的 leader 是谁
3、需要处理 leader 的变更
使用该 API 的一般流程如下:
1、查找到一个“活着”的 broker,并且找出每个 partition 的 leader
2、找出每个 partition 的 follower
3、定义好请求,该请求应该能描述应用程序需要哪些数据
4、fetch 数据
5、识别 leader 的变化,并对之作出必要的响应
注:只针对 high-level Consumer API 进行说明
Kafka 的分配单位是 Patition。每个 Consumer 都属于一个 Group,一个 Patition 只能被同一个 Group 内的一个 Consumer 所消费(也就保障了一个消息只能被 Group 内的一个 Consumer 所消费),但是多个 Group 可以同时消费这个 Partition。
Kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 Spark / Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 Consumer Group。如下图所示:
Consumer 采用 pull 模式从 Broker 中读取数据。
push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 Broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息。典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。
对于 Kafka 而言,pull 模式更合适,它可简化 Broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式-- 即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
上传解压(『caroly02』)
tar xf kafka_2.10-0.9.0.1.tgz -C /opt/caroly/
cd /opt/caroly
mv kafka_2.10-0.9.0.1/ kafka
修改配置文件(『caroly02』)
cd kafka/config/
vi server.properties
change 20:broker.id=0
change 116:zookeeper.connect=caroly02:2181,caroly03:2181,caroly04:2181
分发(『caroly02』)
cd /opt/caroly
scp -r kafka/ caroly03:`pwd`
scp -r kafka/ caroly04:`pwd`
修改配置文件(『caroly03』)
vi /opt/caroly/kafka/config/server.properties
change 20:broker.id=1
修改配置文件(『caroly04』)
vi /opt/caroly/kafka/config/server.properties
change 20:broker.id=2
启动 ZK(『caroly02』『caroly03』『caroly04』)
zkServer.sh start
启动 kafka(『caroly02』『caroly03』『caroly04』)
cd /opt/caroly/kafka
./bin/kafka-server-start.sh ./config/server.properties
本文由 caroly 创作,如果您觉得本文不错,请随意赞赏
采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载 / 出处外,均为本站原创或翻译,转载前请务必署名
原文链接:https://caroly.fun/archives/kafka安装
最后更新:2021-04-29 16:10:34
Update your browser to view this website correctly. Update my browser now