SimpleConsumer(Kafka 0.9.0)

Kafka 的 0.9 版本引入了對 kafka 消費者的完全重新設計。如果你對舊的 SimpleConsumer(0.8.X)感興趣,請檢視此頁面 。如果你的 Kafka 安裝比 0.8.X 更新,則以下程式碼應該開箱即用。

配置和初始化

Kafka 0.9 不再支援 Java 6 或 Scala 2.9。如果你仍在使用 Java 6,請考慮升級到支援的版本。

首先,建立一個 maven 專案並在你的 pom 中新增以下依賴項:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>
</dependencies>

注意 :不要忘記更新最新版本的版本欄位(現在> 0.10)。

使用 Properties 物件初始化使用者。有許多屬性允許你微調消費者行為。以下是所需的最小配置:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName()); 

bootstrap-servers 是經紀人的初始列表,供消費者發現群集的其餘部分。這不需要是叢集中的所有伺服器:客戶端將從此列表中的代理確定完整的活動代理集。

deserializer 告訴消費者如何解釋/反序列化訊息鍵和值。在這裡,我們使用內建的 StringDeserializer

最後,group.id 對應於該客戶的消費者群體。請記住:消費者群體的所有消費者都會在他們之間分割訊息(kafka 充當訊息佇列),而來自不同消費者群體的消費者將獲得相同的訊息(kafka 表現得像釋出 - 訂閱系統)。

其他有用的屬性是:

  • auto.offset.reset:控制儲存在 Zookeeper 中的偏移量丟失或超出範圍時要執行的操作。可能的值是 latestearliest。任何其他東西都會丟擲異常;

  • enable.auto.commit:如果 true(預設),消費者偏移定期(見 auto.commit.interval.ms)儲存在後臺。將其設定為 false 並使用 auto.offset.reset=earliest - 是為了確定消費者從哪裡開始,以防沒有找到提交的偏移資訊。earliest 表示從指定的主題分割槽開始。latest 表示分割槽的最大可用承諾偏移量。但是,只要找到有效的抵消記錄,Kafka 消費者將始終從最後提交的抵消中恢復(即忽略 auto.offset.reset。最好的例子是當一個全新的消費者群體訂閱某個主題時。這是當它使用 auto.offset.reset 來確定是否從主題的開頭(最早)或結束(最新)開始。

  • session.timeout.ms:會話超時確保在消費者崩潰或網路分割槽將消費者與協調器隔離時將釋放鎖定。確實:

    當消費者組的一部分時,為每個消費者分配其已訂閱的主題的分割槽的子集。這基本上是對這些分割槽的組鎖定。只要鎖定被保持,組中的其他成員就無法從中讀取。當你的消費者健康時,這正是你想要的。這是避免重複消費的唯一方法。但是,如果消費者因機器或應用程式故障而死亡,則需要釋放該鎖定,以便可以將分割槽分配給健康的成員。資源

完整的屬性列表可在 http://kafka.apache.org/090/documentation.html#newconsumerconfigs 中找到

消費者建立和主題訂閱

獲得屬性後,建立消費者很容易:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
consumer.subscribe( Collections.singletonList( "topic-example" ) );

訂閱後,消費者可以與組的其餘部分協調以獲得其分割槽分配。這一切都是在你開始使用資料時自動處理的。

基本民意調查

消費者需要能夠並行獲取資料,可能來自許多分割槽,因為很多主題可能分散在許多經紀人身上。幸運的是,這一切都是在你開始使用資料時自動處理的。要做到這一點,你需要做的就是迴圈呼叫 poll,消費者處理剩下的事情。

poll 從分配的分割槽返回一組(可能為空)訊息。

while( true ){
    ConsumerRecords<String, String> records = consumer.poll( 100 );
    if( !records.isEmpty() ){
        StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
    }
}

程式碼

基本的例子

這是你可以用來從 kafka 主題獲取訊息的最基本程式碼。

public class ConsumerExample09{

    public static void main( String[] args ){

        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092" );
        props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "enable.auto.commit", "false" );
        props.put( "group.id", "octopus" );

        try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){
            consumer.subscribe( Collections.singletonList( "test-topic" ) );

            while( true ){
                // poll with a 100 ms timeout
                ConsumerRecords<String, String> records = consumer.poll( 100 );
                if( records.isEmpty() ) continue;
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }
    }
}

可執行的例子

消費者被設計為在自己的執行緒中執行。沒有外部同步的多執行緒使用是不安全的,嘗試可能不是一個好主意。

下面是一個簡單的 Runnable 任務,它初始化消費者,訂閱主題列表,並無限期地執行輪詢迴圈,直到外部關閉。

public class ConsumerLoop implements Runnable{
    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop( int id, String groupId, List<String> topics ){
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put( "bootstrap.servers", "localhost:9092");
        props.put( "group.id", groupId );
        props.put( "auto.offset.reset", "earliest" );
        props.put( "key.deserializer", StringDeserializer.class.getName() );
        props.put( "value.deserializer", StringDeserializer.class.getName() );
        this.consumer = new KafkaConsumer<>( props );
    }
    
    @Override
    public void run(){
        try{
            consumer.subscribe( topics );

            while( true ){
                ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
                StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println );
            }
        }catch( WakeupException e ){
            // ignore for shutdown 
        }finally{
            consumer.close();
        }
    }

    public void shutdown(){
        consumer.wakeup();
    }
}

請注意,我們在輪詢期間使用 Long.MAX_VALUE 的超時,因此它將無限期地等待新訊息。要正確關閉使用者,在結束應用程式之前呼叫其 shutdown() 方法很重要。

驅動程式可以像這樣使用它:

public static void main( String[] args ){

    int numConsumers = 3;
    String groupId = "octopus";
    List<String> topics = Arrays.asList( "test-topic" );

    ExecutorService executor = Executors.newFixedThreadPool( numConsumers );
    final List<ConsumerLoop> consumers = new ArrayList<>();

    for( int i = 0; i < numConsumers; i++ ){
        ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics );
        consumers.add( consumer );
        executor.submit( consumer );
    }

    Runtime.getRuntime().addShutdownHook( new Thread(){
        @Override
        public void run(){
            for( ConsumerLoop consumer : consumers ){
                consumer.shutdown();
            }
            executor.shutdown();
            try{
                executor.awaitTermination( 5000, TimeUnit.MILLISECONDS );
            }catch( InterruptedException e ){
                e.printStackTrace();
            }
        }
    } );
}