如何提交抵消

KafkaConsumers 可以在后台自动提交偏移量(配置参数 enable.auto.commit = true),默认设置是什么。那些自动提交是在 poll() 内完成的( 通常在循环中调用 )。可以通过 auto.commit.interval.ms 配置应该提交的偏移频率。因为,自动提交嵌入在 poll() 中,poll() 由用户代码调用,所以此参数定义了提交间隔的下限。

作为自动提交的替代方法,也可以手动管理偏移。为此,应禁用自动提交(enable.auto.commit = false)。对于手动提交,KafkaConsumers 提供了两种方法,即 commitSync()commitAsync() 。如名称所示,commitSync() 是一个阻塞调用,在成功提交偏移后返回,而 commitAsync() 立即返回。如果你想知道提交是否成功,你可以提供一个回调处理程序(OffsetCommitCallback)方法参数。请注意,在两次提交调用中,消费者都会提交最新的 poll() 调用的偏移量。例如。让我们假设一个分区主题与一个消费者,最后一次调用 poll() 返回消息 4,5,6。在提交时,将提交偏移量 6,因为这是消费者客户端跟踪的最新偏移量。同时,commitSync()commitAsync() 都允许更多控制你想要提交的偏移量:如果使用允许你指定 Map<TopicPartition, OffsetAndMetadata> 的相应重载,则消费者将只提交指定的偏移量(即,

已提交偏移的语义

提交的偏移量表示已经处理了直到此偏移量的所有消息。因此,由于偏移量是连续数,因此提交偏移量 X 会隐式提交小于 X 的所有偏移量。因此,没有必要单独提交每个偏移量,并且一次提交多个偏移量,但只是提交最大的偏移量。

注意,通过设计,也可以提交比最后提交的偏移更小的偏移量。如果应该第二次读取消息,则可以这样做。

加工保证

使用自动提交提供至少一次处理语义。基本假设是,poll() 仅在所有先前传递的消息成功处理后才被调用。这确保了处理之后发生提交时不会丢失任何消息。如果消费者在提交之前失败,则从 Kafka 接收上次提交之后的所有消息并再次处理。但是,此重试可能会导致重复,因为最后一次 poll() 调用的某些消息可能已被处理但故障发生在自动提交调用之前。

如果需要最多一次处理语义,则必须禁用自动提交,并且应该在 poll() 之后直接执行手动 commitSync()。之后,消息得到处理。这确保了消息在处理之前提交,因此从不再次读取。当然,如果失败,某些消息可能会丢失。