配置和設定

檢查點配置分兩步完成。首先,你需要選擇後端。然後,你可以在每個應用程式的基礎上指定檢查點的間隔和模式。

後端

可用的後端

儲存檢查點的位置取決於配置的後端:

  • MemoryStateBackend:記憶體狀態,備份到 JobManager 的/ ZooKeeper 的記憶體。應僅用於最小狀態(預設為最大 5 MB,例如用於儲存 Kafka 偏移)或測試和本地除錯。
  • FsStateBackend:狀態儲存在 TaskManagers 的記憶體中,狀態快照(即檢查點)儲存在檔案系統(HDFS,DS3,本地檔案系統……)中。鼓勵對大狀態或長視窗以及高可用性設定進行此設定。
  • RocksDBStateBackend:儲存 RocksDB 資料庫中的飛行中資料,該資料庫(預設情況下)儲存在 TaskManager 資料目錄中。在檢查點時,整個 RocksDB 資料庫被寫入檔案(如上所述)。與 FsStateBackend 相比,它允許更大的狀態(僅受磁碟空間與工作管理員記憶體大小的限制),但吞吐量將更低(資料不總是在記憶體中,必須從光碟載入)。

請注意,無論後端如何,後設資料(檢查點數量,本地化等)始終儲存在作業管理器記憶體中,並且在應用程式終止/取消後檢查點將不會保留

指定後端

你可以使用以下命令在程式的 main 方法中指定後端:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

或者在 flink/conf/flink-conf.yaml 中設定預設後端:

# Supported backends: 
#  - jobmanager (MemoryStateBackend), 
#  - filesystem (FsStateBackend), 
#  - rocksdb (RocksDBStateBackend), 
#  - <class-name-of-factory>
state.backend: filesystem
    
# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems, 
# "S3://" for S3 file system.
state.backend.fs.checkpointdir: file:///tmp/flink-backend/checkpoints

啟用檢查點

每個應用程式都需要明確啟用檢查點:

long checkpointInterval = 5000; // every 5 seconds

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(checkpointInterval);

你可以選擇指定檢查點模式。如果沒有,則預設為一次

env.enableCheckpointing(checkpointInterval, CheckpointingMode.AT_LEAST_ONCE);

檢查點模式定義了系統在出現故障時提供的一致性。當啟用檢查點時,重放資料流,以便重複丟失的處理部分。使用 EXACTLY_ONCE,系統繪製檢查點,使得恢復的行為就像操作符/函式恰好一次看到每條記錄一樣。使用 AT_LEAST_ONCE,檢查點以更簡單的方式繪製,通常在恢復時遇到一些重複。