SimpleConsumer(Kafka 0.9.0)

Kafka 的 0.9 版本引入了对 kafka 消费者的完全重新设计。如果你对旧的 SimpleConsumer(0.8.X)感兴趣,请查看此页面 。如果你的 Kafka 安装比 0.8.X 更新,则以下代码应该开箱即用。

配置和初始化

Kafka 0.9 不再支持 Java 6 或 Scala 2.9。如果你仍在使用 Java 6,请考虑升级到支持的版本。

首先,创建一个 maven 项目并在你的 pom 中添加以下依赖项:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>
</dependencies>

注意 :不要忘记更新最新版本的版本字段(现在> 0.10)。

使用 Properties 对象初始化使用者。有许多属性允许你微调消费者行为。以下是所需的最小配置:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName()); 

bootstrap-servers 是经纪人的初始列表,供消费者发现群集的其余部分。这不需要是集群中的所有服务器:客户端将从此列表中的代理确定完整的活动代理集。

deserializer 告诉消费者如何解释/反序列化消息键和值。在这里,我们使用内置的 StringDeserializer

最后,group.id 对应于该客户的消费者群体。请记住:消费者群体的所有消费者都会在他们之间分割消息(kafka 充当消息队列),而来自不同消费者群体的消费者将获得相同的消息(kafka 表现得像发布 - 订阅系统)。

其他有用的属性是:

  • auto.offset.reset:控制存储在 Zookeeper 中的偏移量丢失或超出范围时要执行的操作。可能的值是 latestearliest。任何其他东西都会抛出异常;

  • enable.auto.commit:如果 true(默认),消费者偏移定期(见 auto.commit.interval.ms)保存在后台。将其设置为 false 并使用 auto.offset.reset=earliest - 是为了确定消费者从哪里开始,以防没有找到提交的偏移信息。earliest 表示从指定的主题分区开始。latest 表示分区的最大可用承诺偏移量。但是,只要找到有效的抵消记录,Kafka 消费者将始终从最后提交的抵消中恢复(即忽略 auto.offset.reset。最好的例子是当一个全新的消费者群体订阅某个主题时。这是当它使用 auto.offset.reset 来确定是否从主题的开头(最早)或结束(最新)开始。

  • session.timeout.ms:会话超时确保在消费者崩溃或网络分区将消费者与协调器隔离时将释放锁定。确实:

    当消费者组的一部分时,为每个消费者分配其已订阅的主题的分区的子集。这基本上是对这些分区的组锁定。只要锁定被保持,组中的其他成员就无法从中读取。当你的消费者健康时,这正是你想要的。这是避免重复消费的唯一方法。但是,如果消费者因机器或应用程序故障而死亡,则需要释放该锁定,以便可以将分区分配给健康的成员。资源

完整的属性列表可在 http://kafka.apache.org/090/documentation.html#newconsumerconfigs 中找到

消费者创建和主题订阅

获得属性后,创建消费者很容易:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );

订阅后,消费者可以与组的其余部分协调以获得其分区分配。这一切都是在你开始使用数据时自动处理的。

基本民意调查

消费者需要能够并行获取数据,可能来自许多分区,因为很多主题可能分散在许多经纪人身上。幸运的是,这一切都是在你开始使用数据时自动处理的。要做到这一点,你需要做的就是循环调用 poll,消费者处理剩下的事情。

poll 从分配的分区返回一组(可能为空)消息。

while( true ){
    ConsumerRecords<String, String> records = consumer.poll( 100 );
    if( !records.isEmpty() ){
        StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
    }
}

代码

基本的例子

这是你可以用来从 kafka 主题获取消息的最基本代码。

public class ConsumerExample09{

    public static void main( String[] args ){

        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092" );
        props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "enable.auto.commit", "false" );
        props.put( "group.id", "octopus" );

        try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
            consumer.subscribe( Collections.singletonList( "test-topic" ) );

            while( true ){
                // poll with a 100 ms timeout
                ConsumerRecords<String, String> records = consumer.poll( 100 );
                if( records.isEmpty() ) continue;
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }
    }
}

可运行的例子

消费者被设计为在自己的线程中运行。没有外部同步的多线程使用是不安全的,尝试可能不是一个好主意。

下面是一个简单的 Runnable 任务,它初始化消费者,订阅主题列表,并无限期地执行轮询循环,直到外部关闭。

public class ConsumerLoop implements Runnable{
    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop( int id, String groupId, List<String> topics ){
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092");
        props.put( "group.id", groupId );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "key.deserializer", StringDeserializer.class.getName() );
        props.put( "value.deserializer", StringDeserializer.class.getName() );
        this.consumer = new KafkaConsumer<>( props );
    }
    
    @Override
    public void run(){
        try{
            consumer.subscribe( topics );

            while( true ){
                ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }catch( WakeupException e ){
            // ignore for shutdown 
        }finally{
            consumer.close();
        }
    }

    public void shutdown(){
        consumer.wakeup();
    }
}

请注意,我们在轮询期间使用 Long.MAX_VALUE 的超时,因此它将无限期地等待新消息。要正确关闭使用者,在结束应用程序之前调用其 shutdown() 方法很重要。

驱动程序可以像这样使用它:

public static void main( String[] args ){

    int numConsumers = 3;
    String groupId = "octopus";
    List<String> topics = Arrays.asList( "test-topic" );

    ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
    final List<ConsumerLoop> consumers = new ArrayList<>();

    for( int i = 0; i < numConsumers; i++ ){
        ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
        consumers.add( consumer );
        executor.submit( consumer );
    }

    Runtime.getRuntime().addShutdownHook( new Thread(){
        @Override
        public void run(){
            for( ConsumerLoop consumer : consumers ){
                consumer.shutdown();
            }
            executor.shutdown();
            try{
                executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
            }catch( InterruptedException e ){
                e.printStackTrace();
            }
        }
    } );
}