配置和设置

检查点配置分两步完成。首先,你需要选择后端。然后,你可以在每个应用程序的基础上指定检查点的间隔和模式。

后端

可用的后端

存储检查点的位置取决于配置的后端:

  • MemoryStateBackend:内存状态,备份到 JobManager 的/ ZooKeeper 的内存。应仅用于最小状态(默认为最大 5 MB,例如用于存储 Kafka 偏移)或测试和本地调试。
  • FsStateBackend:状态保存在 TaskManagers 的内存中,状态快照(即检查点)存储在文件系统(HDFS,DS3,本地文件系统……)中。鼓励对大状态或长窗口以及高可用性设置进行此设置。
  • RocksDBStateBackend:保存 RocksDB 数据库中的飞行中数据,该数据库(默认情况下)存储在 TaskManager 数据目录中。在检查点时,整个 RocksDB 数据库被写入文件(如上所述)。与 FsStateBackend 相比,它允许更大的状态(仅受磁盘空间与任务管理器内存大小的限制),但吞吐量将更低(数据不总是在内存中,必须从光盘加载)。

请注意,无论后端如何,元数据(检查点数量,本地化等)始终存储在作业管理器内存中,并且在应用程序终止/取消后检查点将不会保留

指定后端

你可以使用以下命令在程序的 main 方法中指定后端:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

或者在 flink/conf/flink-conf.yaml 中设置默认后端:

# Supported backends: 
#  - jobmanager (MemoryStateBackend), 
#  - filesystem (FsStateBackend), 
#  - rocksdb (RocksDBStateBackend), 
#  - <class-name-of-factory>
state.backend: filesystem
    
# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems, 
# "S3://" for S3 file system.
state.backend.fs.checkpointdir: file:///tmp/flink-backend/checkpoints

启用检查点

每个应用程序都需要明确启用检查点:

long checkpointInterval = 5000; // every 5 seconds

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(checkpointInterval);

你可以选择指定检查点模式。如果没有,则默认为一次

env.enableCheckpointing(checkpointInterval, CheckpointingMode.AT_LEAST_ONCE);

检查点模式定义了系统在出现故障时提供的一致性。当激活检查点时,重放数据流,以便重复丢失的处理部分。使用 EXACTLY_ONCE,系统绘制检查点,使得恢复的行为就像操作符/函数恰好一次看到每条记录一样。使用 AT_LEAST_ONCE,检查点以更简单的方式绘制,通常在恢复时遇到一些重复。