学习 Flink(十二):Kafka Connector

Flink 同时支持 Kafka 作为 Source 和 Sink。

依赖

编辑 pom.xml 文件,添加依赖:

<dependency>  
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>  

使用

在 resources 目录下,新建 kafka.properties 文件,编辑:

bootstrap.servers=127.0.0.1:9092  
group.id=test  

Consumer

Properties properties = new Properties();  
try (InputStream in = getClass()..getClassLoader().getResourceAsStream("kafka.properties")) {  
    properties.load(in);
}
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties);  
consumer.setStartFromGroupOffsets();  

Producer

Properties properties = new Properties();  
try (InputStream in = getClass()..getClassLoader().getResourceAsStream("kafka.properties")) {  
    properties.load(in);
}
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(topic, new SimpleStringSchema(), properties);  
producer.setWriteTimestampToKafka(true);  

参考