保存点

组态

配置在文件 flink/conf/flink-conf.yaml 中(在 Mac OSX 下通过自制程序,它是/usr/local/Cellar/apache-flink/1.1.3/libexec/conf/flink-conf.yaml)。

Flink <1.2 :配置与检查点配置非常相似(可用主题)。唯一的区别是定义内存中的保存点后端是没有意义的,因为我们需要保存点在 Flink 关闭后保持不变。

# Supported backends: filesystem, <class-name-of-factory>
savepoints.state.backend: filesystem
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
# Note: must be accessible from the JobManager and all TaskManagers !
savepoints.state.backend.fs.checkpointdir: file:///tmp/flink-backend/savepoints

注意 :如果未指定后端,则默认后端为 jobmanager ,这意味着一旦群集关闭,你的保存点将消失。这仅适用于调试。

Flink 1.2+ :正如本 jira 票中所解释的那样,允许将保存点保存在 jobmanager 的内存中毫无意义。从 Flink 1.2 开始,保存点必然存储在文件中。以上配置已被替换为:

# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints

用法

获得工作 ID

要触发保存点,你只需要应用程序的作业 ID。启动作业时,作业 ID 将在命令行中打印,或者稍后可以使用 flink list 检索作业 ID:

flink list
Retrieving JobManager.
Using address localhost/127.0.0.1:6123 to connect to JobManager.
------------------ Running/Restarting Jobs -------------------
17.03.2017 11:44:03 : 196b8ce6788d0554f524ba747c4ea54f : CheckpointExample (RUNNING)No scheduled jobs.

触发保存点

要触发保存点,请使用 flink savepoint <jobID>

flink savepoint 196b8ce6788d0554f524ba747c4ea54f
Retrieving JobManager.
Using address /127.0.0.1:6123 to connect to JobManager.
Triggering savepoint for job 196b8ce6788d0554f524ba747c4ea54f.
Waiting for response...
Savepoint completed. Path: file:/tmp/flink-backend/savepoints/savepoint-a40111f915fc
You can resume your program from this savepoint with the run command.

请注意,你还可以提供目标目录作为第二个参数,它将覆盖 flink/bin/flink-conf.yaml 中定义的默认目录。

在 Flink 1.2+中,还可以使用 -s 选项取消作业并同时执行保存点:

flink cancel -s 196b8ce6788d0554f524ba747c4ea54f # use default savepoints dir
flink cancel -s hdfs:///savepoints 196b8ce6788d0554f524ba747c4ea54f # specify target dir

注意 :可以移动保存点,但不要重命名它!

从保存点恢复

要从特定保存点恢复,请使用 flink run 命令的 -s [savepoint-dir] 选项:

flink run -s /tmp/flink-backend/savepoints/savepoint-a40111f915fc app.jar

指定运算符 UID

为了能够在代码更改后从保存点恢复,你必须确保新代码对运算符使用相同的 UID。要手动分配 UID,请在运算符之后立即调用 .uid(<name>) fonction:

env
    .addSource(source)
    .uid(className + "-KafkaSource01")
    .rebalance()
    .keyBy((node) -> node.get("key").asInt())
    .flatMap(new StatefulMapper())
    .uid(className + "-StatefulMapper01")
    .print();