内置反序列化模式

SimpleStringSchemaSimpleStringSchema 将消息反序列化为字符串。如果你的消息有密钥,后者将被忽略。

new FlinkKafkaConsumer09<>(kafkaInputTopic, new SimpleStringSchema(), prop);

JSONDeserializationSchema

JSONDeserializationSchema 使用 jackson 反序列化 json 格式的消息并返回 com.fasterxml.jackson.databind.node.ObjectNode 对象的流。然后,你可以使用 .get("property") 方法访问字段。再次,键被忽略。

new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONDeserializationSchema(), prop);

JSONKeyValueDeserializationSchema

JSONKeyValueDeserializationSchema 与前一个非常相似,但处理带有 json 编码键和值的消息。

boolean fetchMetadata = true;
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONKeyValueDeserializationSchema(fetchMetadata), properties);

返回的 ObjectNode 包含以下字段:

  • key:密钥中存在的所有字段
  • value:所有消息字段
  • (可选)metadata:公开消息的 offsetpartitiontopic(将 true 传递给构造函数以获取元数据)。

例如:

kafka-console-producer --broker-list localhost:9092 --topic json-topic \
    --property parse.key=true \
    --property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}}
^C

将解码为:

{
    "key":{"keyField1":1,"keyField2":2},
    "value":{"valueField1":1,"valueField2":{"foo":"bar"}},
    "metadata":{
        "offset":43,
        "topic":"json-topic",
        "partition":0
    }
}