我怎样才能从一开始就阅读主题

从一开始就有多种策略来阅读主题。为了解释这些,我们首先需要了解消费者初创公司会发生什么。在启动消费者时,会发生以下情况:

  1. 加入已配置的使用者组,该组触发重新平衡并将分区分配给使用者
  2. 查找已提交的偏移量(针对分配给消费者的所有分区)
  3. 对于具有有效偏移的所有分区,从此偏移量恢复
  4. 对于无效偏移的所有分区,根据 auto.offset.reset 配置参数设置起始偏移量

开始一个新的消费者群体

如果你想从头开始处理主题,你可以简单地启动一个新的使用者组(即,选择一个未使用的 group.id)并设置 auto.offset.reset = earliest。由于新组没有已提交的偏移量,因此将触发自动偏移重置,并且将从头开始使用主题。注意,在消费者重新启动时,如果再次使用相同的 group.id,它将不会再次从头开始阅读该主题,而是从它剩下的地方继续。因此,对于此策略,每次要从头开始阅读主题时,都需要分配新的 group.id

重复使用相同的组 ID

为了避免每次想要从头开始读取主题时设置新的 group.id,你可以在第一次启动消费者之前禁用自动提交(通过 enable.auto.commit = false)(使用未使用的 group.id 并设置 auto.offset.reset = earliest)。此外,你不应手动提交任何偏移。因为从不使用此策略提交偏移,所以在重新启动时,使用者将再次从头开始读取主题。

但是,这种策略有两个缺点:

  1. 它不是容错的
  2. 组重新平衡无法按预期工作

(1)由于永远不会提交偏移,因此在重新启动时会以相同的方式处理失败和停止的使用者。对于这两种情况,主题将从一开始就被消费。 (2)因为 offset 永远不会被提交,所以重新平衡新分配的分区从一开始就是消费者。

因此,此策略仅适用于具有单个消费者的消费者群体,并且仅应用于开发目的。

重用相同的组 ID 和提交

如果你想要容错和/或在你的消费者组中使用多个消费者,则必须提交抵消。因此,如果你想从头开始阅读主题,则需要在消费者启动时操作已提交的偏移量。为此,KafkaConsumer 提供了三种方法 seek()seekToBeginning()seekToEnd()。虽然 seek() 可以用于设置任意偏移,但是第二和第三种方法可以分别用于寻找分区的开头或结尾。因此,在失败和消费者重新开始寻求时将被省略并且消费者可以从其剩下的地方恢复。对于消费者停止和重新启动,在你进入 poll() 循环之前,将明确调用 seekToBeginning()。请注意,seekXXX() 只能在消费者加入群组后使用 - 因此,在使用 seekXXX() 之前,需要进行虚拟民意调查

if (consumer-stop-and-restart-from-beginning) {
    consumer.poll(0); // dummy poll() to join consumer group
    consumer.seekToBeginning(...);
}

// now you can start your poll() loop
while (isRunning) {
    for (ConsumerRecord record : consumer.poll(0)) {
        // process a record
    }
}