Kafka 安装
Apache Kafka 最早是由 LinkedIn 开源出来的分布式消息系统,现在是 Apache 旗下的一个子项目,并且已经成为开源领域应用最广泛的消息系统之一。
Kafka 和传统的消息系统不同在于:
- Kafka 是一个分布式系统,易于向外扩展。
- 它同时为发布和订阅提供高吞吐量。
- 它支持多订阅者,当失败时能自动平衡消费者。
- 消息的持久化。
概念
- Producer:消息生产者,发布消息到 Kafka 集群的终端或服务。
- Broker:存储消息,是由多个 Server 组成的集群。
- Topic:每条发布到 Kafka 集群的消息属于的类别,即 Kafka 是面向 Topic 的。
- Partition:是物理上的概念,每个 Topic 包含一个或多个 Partition。Kafka 分配的单位是 Partition。
- Consumer:从 Kafka 集群中消费消息的终端或服务。
- Consumer Group:每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
- Replica:Partition 的副本,保障 partition 的高可用。
- Leader:Replica 中的一个角色,Producer 和 Consumer 只跟 Leader 交互。
- Follower:Replica 中的一个角色,从 Leader 中复制数据。
- Controller:Kafka集群中的一个服务器,用来进行 Leader Election 以及各种 Failover。
- Zookeeper:Kafka 通过 Zookeeper 来存储集群的 meta 信息。
Producer 发布消息
写入方式
Producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 Kafka 吞吐率)。
消息路由
Producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
1. 指定了 patition,则直接使用;
2. 未指定 patition,但指定 key,通过对 key 的 value 进行 hash 选出一个 patition
3. patition 和 key 都未指定,使用轮询选出一个 patition。s
写入流程
Producer 写入消息序列图如下所示:
流程说明:
1. producer 先从 zookeeper 的 “/brokers/.../state” 节点找到该 "partition" 的 leader
2. producer 将消息发送给该 leader
3. leader 将消息写入本地 log
4. followers 从 leader pull 消息,写入本地 log 后,leader 发送 ACK
5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 发送 ACK
producer delivery guarantee
一般情况下存在三种情况:
1、At most once 消息可能会丢,但绝不会重复传输
2、At least one 消息绝不会丢,但可能会重复传输
3、Exactly once 每条消息肯定会被传输且仅传输一次
当 Producer 向 Broker 发送消息时,一旦这条消息被 commit,由于 Replication 的存在,它就不会丢。但是如果 Producer 发送数据给 Broker 后,遇到网络问题而造成通信中断,那 Producer 就无法判断该条消息是否已经 commit。虽然 Kafka 无法确定网络故障期间发生了什么,但是 Producer 可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了 Exactly once,但目前还未实现。所以目前默认情况下一条消息从 Producer 到 Broker 是确保了 At least once,可通过设置 Producer 异步发送实现 At most once。
Broker 保存消息
存储方式
物理上把 Topic 分成一个或多个 Patition(对应 server.properties 中的 num.partitions=3 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件)
存储策略
无论消息是否被消费,Kafka 都会保留所有消息。有两种策略可以删除旧数据:
1. 基于时间:log.retention.hours=168
2. 基于大小:log.retention.bytes=1073741824
需要注意的是,因为 Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
Topic 创建与删除
创建 Topic
创建 Topic 的序列图如下所示:
流程说明:
- controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
- controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition。
- 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR;
- 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state。
- controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。
删除 Topic
删除 Topic 的序列图如下所示:
流程说明:
- controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
- 若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。
Consumer 消费消息
API
Consumer API
Kafka 提供了两套 Consumer API:
1、The high-level Consumer API
2、The SimpleConsumer API
其中 high-level Consumer API 提供了一个从 Kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多的关注细节。
The high-level Consumer API
该 API 提供了 Consumer Group 的语义,一个消息只能被 Group 内的一个 Consumer 所消费,且 Consumer 消费消息时不关注 Offset,最后一个 Offset 由 Zookeeper 保存。
使用该 API 可以是多线程的应用,应当注意:
1、如果消费线程大于 patition 数量,则有些线程将收不到消息
2、如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
3、如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的
The SimpleConsumer API
如果你想要对 Patition 有更多的控制权,那就应该使用该 API,比如:
1、多次读取一个消息
2、只消费一个 patition 中的部分消息
3、使用事务来保证一个消息仅被消费一次
但是使用此 API 时,Partition、Offset、Broker、Leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作:
1、必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
2、应用程序需要通过程序获知每个 Partition 的 leader 是谁
3、需要处理 leader 的变更
使用该 API 的一般流程如下:
1、查找到一个“活着”的 broker,并且找出每个 partition 的 leader
2、找出每个 partition 的 follower
3、定义好请求,该请求应该能描述应用程序需要哪些数据
4、fetch 数据
5、识别 leader 的变化,并对之作出必要的响应
Consumer Group
注:只针对 high-level Consumer API 进行说明
Kafka 的分配单位是 Patition。每个 Consumer 都属于一个 Group,一个 Patition 只能被同一个 Group 内的一个 Consumer 所消费(也就保障了一个消息只能被 Group 内的一个 Consumer 所消费),但是多个 Group 可以同时消费这个 Partition。
Kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 Spark / Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 Consumer Group。如下图所示:
消费方式
Consumer 采用 pull 模式从 Broker 中读取数据。
push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 Broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息。典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。
对于 Kafka 而言,pull 模式更合适,它可简化 Broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式– 即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
Kafka 安装
上传解压(『caroly02』)
tar xf kafka_2.10-0.9.0.1.tgz -C /opt/caroly/
cd /opt/caroly
mv kafka_2.10-0.9.0.1/ kafka
修改配置文件(『caroly02』)
change 20:broker.id=0 change 116:zookeeper.connect=caroly02:2181,caroly03:2181,caroly04:2181
分发(『caroly02』)
cd /opt/caroly
scp -r kafka/ caroly03:`pwd` scp -r kafka/ caroly04:`pwd`
修改配置文件(『caroly03』)
vi /opt/caroly/kafka/config/server.properties
change 20:broker.id=1
修改配置文件(『caroly04』)
vi /opt/caroly/kafka/config/server.properties
change 20:broker.id=2
启动 ZK(『caroly02』『caroly03』『caroly04』)
zkServer.sh start
启动 kafka(『caroly02』『caroly03』『caroly04』)
cd /opt/caroly/kafka
./bin/kafka-server-start.sh ./config/server.properties