Hadoop(十)Kafka 安装

Apache Kafka 最早是由 LinkedIn 开源出来的分布式消息系统,现在是 Apache 旗下的一个子项目,并且已经成为开源领域应用最广泛的消息系统之一。

Kafka 和传统的消息系统不同在于:

  • Kafka 是一个分布式系统,易于向外扩展。
  • 它同时为发布和订阅提供高吞吐量。
  • 它支持多订阅者,当失败时能自动平衡消费者。
  • 消息的持久化。

概念

Kafka 拓扑结构

  • Producer:消息生产者,发布消息到 Kafka 集群的终端或服务。
  • Broker:存储消息,是由多个 Server 组成的集群。
  • Topic:每条发布到 Kafka 集群的消息属于的类别,即 Kafka 是面向 Topic 的。
  • Partition:是物理上的概念,每个 Topic 包含一个或多个 Partition。Kafka 分配的单位是 Partition。
  • Consumer:从 Kafka 集群中消费消息的终端或服务。
  • Consumer Group:每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
  • Replica:Partition 的副本,保障 partition 的高可用。
  • Leader:Replica 中的一个角色,Producer 和 Consumer 只跟 Leader 交互。
  • Follower:Replica 中的一个角色,从 Leader 中复制数据。
  • Controller:Kafka集群中的一个服务器,用来进行 Leader Election 以及各种 Failover。
  • Zookeeper:Kafka 通过 Zookeeper 来存储集群的 meta 信息。

Producer 发布消息

写入方式

Producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 Kafka 吞吐率)。


消息路由

Producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

1. 指定了 patition,则直接使用;
2. 未指定 patition,但指定 key,通过对 key 的 value 进行 hash 选出一个 patition
3. patition 和 key 都未指定,使用轮询选出一个 patition。s

写入流程

Producer 写入消息序列图如下所示:

Producer 写入消息序列

流程说明:

1. producer 先从 zookeeper 的 “/brokers/.../state” 节点找到该 "partition" 的 leader
2. producer 将消息发送给该 leader
3. leader 将消息写入本地 log
4. followers 从 leader pull 消息,写入本地 log 后,leader 发送 ACK
5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 发送 ACK

producer delivery guarantee

一般情况下存在三种情况:

1、At most once 消息可能会丢,但绝不会重复传输
2、At least one 消息绝不会丢,但可能会重复传输
3、Exactly once 每条消息肯定会被传输且仅传输一次

当 Producer 向 Broker 发送消息时,一旦这条消息被 commit,由于 Replication 的存在,它就不会丢。但是如果 Producer 发送数据给 Broker 后,遇到网络问题而造成通信中断,那 Producer 就无法判断该条消息是否已经 commit。虽然 Kafka 无法确定网络故障期间发生了什么,但是 Producer 可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了 Exactly once,但目前还未实现。所以目前默认情况下一条消息从 Producer 到 Broker 是确保了 At least once,可通过设置 Producer 异步发送实现 At most once。


Broker 保存消息

存储方式

物理上把 Topic 分成一个或多个 Patition(对应 server.properties 中的 num.partitions=3 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件)


存储策略

无论消息是否被消费,Kafka 都会保留所有消息。有两种策略可以删除旧数据:

1. 基于时间:log.retention.hours=168
2. 基于大小:log.retention.bytes=1073741824

需要注意的是,因为 Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。


Topic 创建与删除

创建 Topic

创建 Topic 的序列图如下所示:

Topic 创建

流程说明:

  • Controller 在 ZooKeeper 的 /brokers/topics 节点上注册 Watcher,当 Topic 被创建,则 Controller 会通过 Watch 得到该 Topic 的 Partition / Replica 分配。
  • Controller从 /brokers/ids 读取当前所有可用的 Broker 列表,对于 Set_p 中的每一个 Partition。
    • 从分配给该 Partition 的所有 Replica(称为AR)中任选一个可用的 Broker 作为新的 Leader,并将 AR 设置为新的 ISR;
    • 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state。
  • Controller 通过 RPC 向相关的 Broker 发送 LeaderAndISRRequest。

删除 Topic

删除 Topic 的序列图如下所示:

Topic 删除

流程说明:

  • Controller 在 zooKeeper 的 /brokers/topics 节点上注册 Watcher,当 Topic 被删除,则 Controller 会通过 Watch 得到该 Topic 的 Partition / Replica 分配。
  • 若 delete.topic.enable=false,结束;否则 Controller 注册在 /admin/delete_topics 上的 Watch 被 Fire,Controller 通过回调向对应的 Broker 发送 StopReplicaRequest。

Consumer 消费消息

API

Consumer API

Kafka 提供了两套 Consumer API:

1、The high-level Consumer API
2、The SimpleConsumer API

其中 high-level Consumer API 提供了一个从 Kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多的关注细节。


The high-level Consumer API

该 API 提供了 Consumer Group 的语义,一个消息只能被 Group 内的一个 Consumer 所消费,且 Consumer 消费消息时不关注 Offset,最后一个 Offset 由 Zookeeper 保存。

使用该 API 可以是多线程的应用,应当注意:

1、如果消费线程大于 patition 数量,则有些线程将收不到消息
2、如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
3、如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的

The SimpleConsumer API

如果你想要对 Patition 有更多的控制权,那就应该使用该 API,比如:

1、多次读取一个消息
2、只消费一个 patition 中的部分消息
3、使用事务来保证一个消息仅被消费一次

但是使用此 API 时,Partition、Offset、Broker、Leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作:

1、必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
2、应用程序需要通过程序获知每个 Partition 的 leader 是谁
3、需要处理 leader 的变更

使用该 API 的一般流程如下:

1、查找到一个“活着”的 broker,并且找出每个 partition 的 leader
2、找出每个 partition 的 follower
3、定义好请求,该请求应该能描述应用程序需要哪些数据
4、fetch 数据
5、识别 leader 的变化,并对之作出必要的响应

Consumer Group

注:只针对 high-level Consumer API 进行说明

Kafka 的分配单位是 Patition。每个 Consumer 都属于一个 Group,一个 Patition 只能被同一个 Group 内的一个 Consumer 所消费(也就保障了一个消息只能被 Group 内的一个 Consumer 所消费),但是多个 Group 可以同时消费这个 Partition。

Kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 Spark / Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 Consumer Group。如下图所示:

处理流程


消费方式

Consumer 采用 pull 模式从 Broker 中读取数据。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 Broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息。典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适,它可简化 Broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式-- 即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。


Kafka 安装

  • 上传解压(『caroly02』)

    • tar xf kafka_2.10-0.9.0.1.tgz -C /opt/caroly/
      
    • cd /opt/caroly
      
    • mv kafka_2.10-0.9.0.1/ kafka
      
  • 修改配置文件(『caroly02』)

    • change 20:broker.id=0
      change 116:zookeeper.connect=caroly02:2181,caroly03:2181,caroly04:2181
      
  • 分发(『caroly02』)

    • cd /opt/caroly
      
    • scp -r kafka/ caroly03:`pwd`
      scp -r kafka/ caroly04:`pwd`
      
  • 修改配置文件(『caroly03』)

    • vi /opt/caroly/kafka/config/server.properties
      
    • change 20:broker.id=1
      
  • 修改配置文件(『caroly04』)

    • vi /opt/caroly/kafka/config/server.properties
      
    • change 20:broker.id=2
      
  • 启动 ZK(『caroly02』『caroly03』『caroly04』)

    • zkServer.sh start
      
  • 启动 kafka(『caroly02』『caroly03』『caroly04』)

    • cd /opt/caroly/kafka
      
    • ./bin/kafka-server-start.sh ./config/server.properties
      

更新时间:2021-03-23 10:09:43

本文由 caroly 创作,如果您觉得本文不错,请随意赞赏
采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载 / 出处外,均为本站原创或翻译,转载前请务必署名
原文链接:https://caroly.fun/archives/kafka安装
最后更新:2021-03-23 10:09:43

评论

Your browser is out of date!

Update your browser to view this website correctly. Update my browser now

×