SimpleProducer(kafka 0.9)

配置和初始化

首先,建立一個 maven 專案並在你的 pom 中新增以下依賴項:

 <dependencies>
     <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka-clients</artifactId>
         <version>0.9.0.1</version>
     </dependency>
 </dependencies>

使用 Properties 物件初始化生成器。有許多屬性允許你微調生產者行為。以下是所需的最小配置:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");

bootstrap-servers 是一個或多個代理的初始列表,供生產者能夠發現群集的其餘部分。serializer 屬性告訴 Kafka 如何編碼訊息鍵和值。在這裡,我們將傳送字串訊息。雖然不是必需的,但始終建議設定 client.id:這使你可以輕鬆地將代理上的請求與建立它的客戶端例項相關聯。

其他有趣的屬性是:

props.put("acks", "all"); 
props.put("retries", 0);  
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);

你可以通過 acks 設定控制寫入 Kafka 的訊息永續性。預設值 1 需要分割槽負責人明確確認寫入成功。Kafka 提供的最強保證是 acks=all,它保證分割槽領導者不僅接受寫入,而且成功地複製到所有同步副本。你還可以使用值 0 來最大化吞吐量,但是你無法保證訊息已成功寫入代理的日誌,因為在這種情況下代理甚至不傳送響應。

retries(預設為> 0)確定生產者是否在失敗後嘗試重新傳送訊息。請注意,如果重試次數> 0,則可能會發生訊息重新排序,因為在後續寫入成功後可能會發生重試。

Kafka 生產商嘗試將已傳送的訊息分批收集以提高吞吐量。使用 Java 客戶端,你可以使用 batch.size 來控制每個訊息批處理的最大大小(以位元組為單位)。為了給批量填充更多時間,你可以使用 linger.ms 讓生產者延遲傳送。最後,可以使用 compression.type 設定啟用壓縮。

使用 buffer.memory 限制 Java 客戶端可用於收集未傳送訊息的總記憶體。當達到此限制時,生產者將在提出異常之前阻止其他傳送,直到 max.block.ms。此外,為避免將記錄無限期排隊,你可以使用 request.timeout.ms 設定超時。

完整的屬性列表可在此處獲得 。我建議你從 Confluent 閱讀這篇文章瞭解更多詳情。

傳送訊息

send() 方法是非同步的。呼叫時,它會將記錄新增到待處理記錄傳送的緩衝區中並立即返回。這允許生產者將各個記錄一起批處理以提高效率。

傳送的結果是 RecordMetadata,指定記錄傳送到的分割槽以及分配的偏移量。由於傳送呼叫是非同步的,因此它將為將分配給此記錄的 RecordMetadata 返回 Future。要查閱後設資料,你可以呼叫 get(),它將阻止直到請求完成或使用回撥。

// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );

程式碼

public class SimpleProducer{
    
    public static void main( String[] args ) throws ExecutionException, InterruptedException{
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put( "client.id", "octopus" );

        String topic = "test-topic";

        Producer<String, String> producer = new KafkaProducer<>( props );

        for( int i = 0; i < 10; i++ ){
            ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
            producer.send( message );
            System.out.println("message sent.");
        }

        producer.close(); // don't forget this
    }
}