KafkaConsumer 的例子

FlinkKafkaConsumer 让你使用一个或多个 kafka 主题的数据。

版本

要使用的消费者取决于你的 kafka 分布。

  • FlinkKafkaConsumer08:使用 Kafka 的旧 SimpleConsumer API。抵消由 Flink 处理并提交给 zookeeper。
  • FlinkKafkaConsumer09:使用 Kafka 的新 Consumer API,它自动处理偏移和重新平衡。
  • FlinkKafkaProducer010:此连接器支持带有时间戳的 Kafka 消息,用于生成和使用(对窗口操作很有用)。

用法

二进制文件不是 flink 核心的一部分,因此你需要导入它们:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.${kafka.version}_2.10</artifactId>
  <version>RELEASE</version>
</dependency>

构造函数有三个参数:

  • 要读取的一个或多个主题
  • 反序列化模式告诉 Flink 如何解释/解码消息
  • kafka 使用者配置属性。这些与常规卡夫卡消费者相同。最低要求是:
    • bootstrap.servers:以 ip:port 形式的逗号分隔的 Kafka 代理列表。对于版本 8,请改用 zookeeper.connect(zookeeper 服务器列表)
    • group.id:消费者群体的 ID(有关详细信息,请参阅 kafka 文档)

在 Java 中:

Properties properties = new Properties();
properties.put("group.id", "flink-kafka-example");
properties.put("bootstrap.servers", "localhost:9092");

DataStream<String> inputStream = env.addSource( 
        new FlinkKafkaConsumer09<>(
            kafkaInputTopic, new SimpleStringSchema(), properties));

在 scala 中:

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

inputStream = env.addSource(
        new FlinkKafkaConsumer08[String](
            "topic", new SimpleStringSchema(), properties))

在开发过程中,你可以使用 kafka 属性 enable.auto.commit=falseauto.offset.reset=earliest 在每次启动你的 pogram 时重新生成相同的数据。