消息队列小结二

kafka目录

在kafka当中, 每一个topic会有一个单独文件夹,这个文件夹存储在 {kafka_home}/config/server.properties中指定的log.dirs路径中。

在topic下会为每一个分区生成一个单独的文件夹,将这二者合并命名topicName-分区号, 例如topic1-0。

在每一个分区下又会有多个segment,,既然已经有多个分区了, 为什么要再进行划分为多个segment?因为:

  • 如果只存一个文件中, 文件会越来越大;
  • Kafka中的数据默认存储7天、每一天都会删除7天前的数据、 如果都存在一个文件当中、会不好删。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
创建topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test2

生成测试数据
bin/kafka-producer-perf-test.sh --topic test --num-records 100000 --record-size 1000 --producer-props bootstrap.servers=hadoop101:9092 --throughput 1000000000

查看log文件
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log

baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1622516506987 size: 84 magic: 2 compresscodec: NONE crc: 2268928679 isvalid: true
| offset: 0 isValid: true crc: null keySize: -1 valueSize: 16 CreateTime: 1622516506987 baseOffset: 0 lastOffset: 0 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 84 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: ajlkdsfajkldfjad
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 84 CreateTime: 1622516642240 size: 71 magic: 2 compresscodec: NONE crc: 427485767 isvalid: true
| offset: 1 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1622516642240 baseOffset: 1 lastOffset: 1 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 71 magic: 2 compressType: NONE position: 84 sequence: -1 headerKeys: [] payload: 哲

查看index文件
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index --print-data-log

Dumping /tmp/kafka-logs/test2-1/00000000000000000000.index
offset: 0 position: 0

kafka架构

Kafka Topic

  • Topic是已发布消息的类别名称
  • 发布和订阅数据必须指定Topic
  • Topic副本数量不大于Brokers个数,即不能多于服务器的数量
  • 创建的Topic存储在 {kafka_home}/config/server.properties 指定的log.dirs路径中

Kafka Producer

生产者将消息写入到Broker

  • Producer直接发送消息到Broker上的Leader Partition
  • Producer客户端自己根据分区策略(随机分配、自定义分区算法等)控制着消息被推送到哪些Partition
  • Producer 可以以同步和异步的方式发送消息

Batch推送提高效率

Producer 负责向 Kafka 主题发布(生产)消息,一个 Topic 可以有多个 Producer实例,其相互之间没有协作关系。Producer 的 send() 方法用于发送消息,参数ProducerRecord 封装了消息的内容:Topic、Partition、key、value 等信息。如果发送成功,返回的 RecordMetadata 中记录了消息的偏移量(Offset),如果发送失败就会重试或者抛出异常。如下图所示。
avatar
Producer 以 Batch 的方式推送数据可以极大的提高处理效率,Kafka Producer 可以将消息在内存中累计到一定数量后作为一个 Batch 发送请求,Batch 的数量大小可以人为进行干预,通过增加 Batch 的大小,可以减少网络请求和磁盘 IO 的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。

Kafka Consumer

  1. 消费者通过订阅消息并消费消息
  • Offset的管理是基于消费组(group.id)的级别
  • 每个Partition只能由同一消费组内的一个Consumer来消费
  • 每个Consumer可以消费多个分区
  • 消费过的数据仍会保留在Kafka中
  • 消费者不能超过分区数量
  1. 消费模式
  • 队列:所有消费者在一个消费组内
  • 发布/订阅:所有消费者被分配到不同的消费组
  1. 提交方式
  • 自动提交:自动提交的优点是方便,但是可能会重复处理消息。
  • 手动提交:又分为同步提交commitSync和异步提交commitAsync。

avatar

Consumer 负责订阅(消费)主题并处理消息。Consumer 负责维护到 Broker 的 TCP 连接以便获取数据,在一个 Partition 中每一个记录的 Offset 是该记录的 唯一标识,即每一个 Offset 唯一标识当前 Partition 中的一条记录,同时 Offset 也可以标识 Consumer 在 Partition 中的位置(Position)。对 Consumer 来讲,这个 位置有两种含义:Current Offset 和 Committed Offset。

  1. Current Offset保存在 Consumer 客户端中,它表示 Consumer 希望收到的下 一条消息的序号。它仅仅在 poll()方法中使用,例如,Consumer 第一次调用 poll() 方法后收到了 20 条消息(offset:0-19),那么 Current Offset 就被设置为 20。这样 Consumer 下一次调用 poll()方法时,Kafka 就知道应该从序号为 20 的消息开始读 取。这样就能够保证每次 Consumer poll 消息时,都能够收到不重复的消息。
  2. Committed Offset保存在 Broker 上,它表示 Consumer 已经确认消费过的 消息的序号。主要通过 commitSync 和 commitAsync API 来操作。 例如,如果 Committed Offset 为 0,Consumer 通过 poll()方法收到 20 条消息 后,此时 Current Offset 就是 20,经过一系列的逻辑处理后,并没有调用 consumer.commitAsync()或 consumer.commitSync()来提交 Committed Offset,那么 此时 Committed Offset 依旧是 0,下一次 Consumer 重启后调用 poll()继续从 0 开 始消费。

又如,如果一个 Consumer 消费了 5 条消息(poll 并且成功 commitSync)之 后宕机了,重新启动之后它仍然能够从第 6 条消息开始消费,因为 Committed Offset 已经被 Kafka 记录为 5。

可以将多个 Consumer 设置为同一个 Consumer Group,组内的所有 Consumer 协调在一起来消费订阅主题的所有分区。但是每个分区只能由同一个消费组内的 一个 Consumer 来消费。很明显,Consumer Group 的作用是为了实现多个 Consumer 并行消费一个 Topic。

Kafka Message

avatar

  • header:消息头,固定长度
    offset:唯一确定每条消息在分区内的位置
    CRC32:用crc32校验消息
    magic:表示本次发布Kafka服务程序协议版本号
    attributes”:表示为独立版本、或标识压缩类型、或编码类型
  • body:消息体
    key:表示消息键,可选
    value bytes payload:表示实际消息数据

存储策略

无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:

  1. 基于时间:log.retention.hours=168
  2. 基于大小:log.retention.bytes=1073741824

因为 Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

ZooKeeper在Kafka中的作用

  1. Broker注册并监控状态

znode:/brokers/ids,保存了所有 Broker id,实现对 Broker 的动态监控。

  1. Topic注册

znode:/brokers/topics,保存了所有 Topic。

  1. 生产者负载均衡

每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更

  1. offset维护

Kafka早期版本使用ZooKeeper为每个消费者存储offset,由于ZooKeeper写入性能较差,从0.10版本后,Kafka使用自己的内部主题维护offset

副本同步

Kafka 引入了 In-sync Replicas,也就是 ISR 副本集合。ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。设置 ISR 主要是为了 Broker 宕掉之后,重新选举 Partition 的 Leader 时从 ISR 列表中选择,也就是说当 Leader 副本发生故障时,只有在 ISR 集合中的 Follower 副本才有资格被选举为新的Leader。

ISR 是 一个动态调整的集合 , 而 非 静 态 不 变 的 。 通 过 Broker 端replica.lag.time.max.ms 参数(Follower 副本能够落后 Leader 副本的最长时间间隔,默认值 10000)值来控制哪个追随者副本与 Leader 同步。只要一个 Follower 副本落后 Leader副本的时间不连续超过10秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。如下图所示。
avatar

上图中,Follower1 与 Follower2 中的消息条数明显少于 Leader,但并不一定与Leader 不同步。Follower 副本唯一的工作就是不断地从 Leader副本拉取消息,然后写入到自己的提交日志中。如果这个同步过程的速度持续慢于 Leader副本的消息写入速度,那么在 replica.lag.time.max.ms 时间后,此 Follower 副本就会被认为是与 Leader 副本不同步的,因此不能再放入 ISR 中。此时,Kafka 会自动收缩 ISR 集合,将该副本踢出ISR。

值得注意的是,倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。

高吞吐

1.顺序读写

Kafka 的消息是不断追加到文件中的,这个特性使 Kafka 可以充分利用磁盘的顺序读写性能。顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写。

2.零拷贝

在 Linux kernel2.2 之后出现了一种叫做”零拷贝(zero-copy)”系统调用机制,就是跳过“用户缓冲区”的拷贝,建立一个磁盘空间和内存的直接映射,数据不再复制到“用户态缓冲区”。

零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数。通常是说在 IO 读写过程中。“零拷贝技术”只用将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中。

avatar

3.分区

Kafka 的队列 topic 被分为了多个区 partition,每个 partition 又分为多个段 segment,所以一个队列中的消息实际上是保存在 N 多个片段文件中通过分段的方式,每次文件操作都是对一个小文件的操作,非常轻便,同时也增加了并行处理能力。

4.批量发送

Kafka 允许进行批量发送消息,先将消息缓存在内存中,然后一次请求批量发送出去比如可以指定缓存的消息达到某个量的时候就发出去,或者缓存了固定的时间后就发送出去如 100条消息就发送,或者每 5 秒发送一次这种策略将大大减少服务端的 I/O 次数。

5.数据压缩

Kafka 还支持对消息集合进行压缩,Producer 可以通过 GZIP 或 Snappy 格式对消息集合进行压缩压缩的好处就是减少传输的数据量,减轻对网络传输的压力。

6.Consumer 的负载均衡

当一个 group 中,有 consumer 加入或者离开时,会触发 partitions 均衡.均衡的最终目的,是提升topic 的并发消费能力。