Apache Kafka 介紹

Apache Kafka 是一個分散式流媒體平臺。

意思是

1-It 允許你釋出和訂閱記錄流。在這方面,它類似於消​​息佇列或企業訊息傳遞系統。

2-It 允許你以容錯方式儲存記錄流。

3-It 允許你在記錄發生時處理它們。

它被用於兩大類應用:

1 - 構建實時流資料管道,可在系統或應用程式之間可靠地獲取資料

2 - 構建轉換或響應資料流的實時流應用程式

Kafka 控制檯指令碼對於基於 Unix 和 Windows 的平臺是不同的。在示例中,你可能需要根據平臺新增擴充套件。Linux:指令碼位於 bin/,副檔名為 .sh。Windows:指令碼位於 bin\windows\並具有 .bat 副檔名。

安裝

第 1 步: 下載程式碼並解壓縮:

tar -xzf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0

第 2 步: 啟動伺服器。

以後可以刪除主題,開啟 server.properties 並將 delete.topic.enable 設定為 true。

Kafka 在很大程度上依賴於 zookeeper,所以你需要先啟動它。如果你沒有安裝它,你可以使用與 kafka 一起打包的便捷指令碼來獲得一個快速且髒的單節點 ZooKeeper 例項。

zookeeper-server-start config/zookeeper.properties
kafka-server-start config/server.properties

第 3 步: 確保一切正常執行

你現在應該讓 zookeeper 聽 localhost:2181localhost:6667 上的一個 kafka 經紀人。

建立一個主題

我們只有一個代理,因此我們建立一個沒有複製因子且只有一個分割槽的主題:

kafka-topics --zookeeper localhost:2181 \
    --create \
    --replication-factor 1 \
    --partitions 1 \
    --topic test-topic

檢查你的主題:

kafka-topics --zookeeper localhost:2181 --list
test-topic

kafka-topics --zookeeper localhost:2181 --describe --topic test-topic
Topic:test-topic    PartitionCount:1    ReplicationFactor:1 Configs:
Topic: test-topic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0

傳送和接收訊息

啟動消費者:

kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic

在另一個終端上,啟動生產者併傳送一些訊息。預設情況下,該工具將每行作為單獨的訊息傳送給代理,而無需特殊編碼。寫一些行並用 CTRL + D 或 CTRL + C 退出:

kafka-console-producer --broker-list localhost:9092 --topic test-topic   
a message
another message
^D

訊息應出現在消費者的終端中。

停止卡夫卡

kafka-server-stop 

啟動多代理群集

以上示例僅使用一個代理。要設定真正的叢集,我們只需要啟動多個 kafka 伺服器。他們會自動協調自己。

步驟 1: 為避免衝突,我們為每個代理建立一個 server.properties 檔案,並更改 idportlogfile 配置屬性。

複製:

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

編輯每個檔案的屬性,例如:

vim config/server-1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/usr/local/var/lib/kafka-logs-1

vim config/server-2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/usr/local/var/lib/kafka-logs-2

第 2 步: 啟動三個經紀人:

    kafka-server-start config/server.properties &
    kafka-server-start config/server-1.properties &
    kafka-server-start config/server-2.properties &

建立複製主題

kafka-topics --zookeeper localhost:2181 --create --replication-factor 3 --partitions 1 --topic replicated-topic

kafka-topics --zookeeper localhost:2181 --describe --topic replicated-topic
Topic:replicated-topic  PartitionCount:1    ReplicationFactor:3 Configs:
Topic: replicated-topic Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

這一次,有更多資訊:

  • leader 是負責給定分割槽的所有讀取和寫入的節點。每個節點將成為隨機選擇的分割槽部分的領導者。
  • replicas 是複製此分割槽日誌的節點列表,無論它們是否為領導者,或者即使它們當前處於活動狀態。
  • isr同步複製品的集合。這是副本列表的子集,該列表當前處於活躍狀態並且已經被領導者捕獲。

請注意,先前建立的主題保持不變。

測試容錯

將一些訊息釋出到新主題:

kafka-console-producer --broker-list localhost:9092 --topic replicated-topic
hello 1
hello 2
^C

殺死領導者(在我們的例子中為 1)。在 Linux 上:

ps aux | grep server-1.properties
kill -9 <PID>

在 Windows 上:

wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties" 
taskkill /pid <PID> /f

看看發生了什麼:

kafka-topics --zookeeper localhost:2181  --describe --topic replicated-topic
Topic:replicated-topic  PartitionCount:1    ReplicationFactor:3 Configs:
Topic: replicated-topic Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

領導層已經轉向經紀人 2 和 1 不再同步。但訊息仍然存在(使用消費者自己檢查)。

清理

刪除這兩個主題:

kafka-topics --zookeeper localhost:2181 --delete --topic test-topic
kafka-topics --zookeeper localhost:2181 --delete --topic replicated-topic