我怎樣才能從一開始就閱讀主題

從一開始就有多種策略來閱讀主題。為了解釋這些,我們首先需要了解消費者初創公司會發生什麼。在啟動消費者時,會發生以下情況:

  1. 加入已配置的使用者組,該組觸發重新平衡並將分割槽分配給使用者
  2. 查詢已提交的偏移量(針對分配給消費者的所有分割槽)
  3. 對於具有有效偏移的所有分割槽,從此偏移量恢復
  4. 對於無效偏移的所有分割槽,根據 auto.offset.reset 配置引數設定起始偏移量

開始一個新的消費者群體

如果你想從頭開始處理主題,你可以簡單地啟動一個新的使用者組(即,選擇一個未使用的 group.id)並設定 auto.offset.reset = earliest。由於新組沒有已提交的偏移量,因此將觸發自動偏移重置,並且將從頭開始使用主題。注意,在消費者重新啟動時,如果再次使用相同的 group.id,它將不會再次從頭開始閱讀該主題,而是從它剩下的地方繼續。因此,對於此策略,每次要從頭開始閱讀主題時,都需要分配新的 group.id

重複使用相同的組 ID

為了避免每次想要從頭開始讀取主題時設定新的 group.id,你可以在第一次啟動消費者之前禁用自動提交(通過 enable.auto.commit = false)(使用未使用的 group.id 並設定 auto.offset.reset = earliest)。此外,你不應手動提交任何偏移。因為從不使用此策略提交偏移,所以在重新啟動時,使用者將再次從頭開始讀取主題。

但是,這種策略有兩個缺點:

  1. 它不是容錯的
  2. 組重新平衡無法按預期工作

(1)由於永遠不會提交偏移,因此在重新啟動時會以相同的方式處理失敗和停止的使用者。對於這兩種情況,主題將從一開始就被消費。 (2)因為 offset 永遠不會被提交,所以重新平衡新分配的分割槽從一開始就是消費者。

因此,此策略僅適用於具有單個消費者的消費者群體,並且僅應用於開發目的。

重用相同的組 ID 和提交

如果你想要容錯和/或在你的消費者組中使用多個消費者,則必須提交抵消。因此,如果你想從頭開始閱讀主題,則需要在消費者啟動時操作已提交的偏移量。為此,KafkaConsumer 提供了三種方法 seek()seekToBeginning()seekToEnd()。雖然 seek() 可以用於設定任意偏移,但是第二和第三種方法可以分別用於尋找分割槽的開頭或結尾。因此,在失敗和消費者重新開始尋求時將被省略並且消費者可以從其剩下的地方恢復。對於消費者停止和重新啟動,在你進入 poll() 迴圈之前,將明確呼叫 seekToBeginning()。請注意,seekXXX() 只能在消費者加入群組後使用 - 因此,在使用 seekXXX() 之前,需要進行虛擬民意調查

if (consumer-stop-and-restart-from-beginning) {
    consumer.poll(0); // dummy poll() to join consumer group
    consumer.seekToBeginning(...);
}

// now you can start your poll() loop
while (isRunning) {
    for (ConsumerRecord record : consumer.poll(0)) {
        // process a record
    }
}