你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将适用于 Apache Kafka® 的 Azure 事件中心与 AKS 上的 Apache Flink® on HDInsight 相连接

注意

我们将于 2025 年 1 月 31 日停用 Azure HDInsight on AKS。 在 2025 年 1 月 31 日之前,你需要将工作负荷迁移到 Microsoft Fabric 或同等的 Azure 产品,以避免工作负荷突然终止。 订阅上的剩余群集会被停止并从主机中移除。

在停用日期之前,仅提供基本支持。

重要

此功能目前以预览版提供。 Microsoft Azure 预览版的补充使用条款包含适用于 beta 版、预览版或其他尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 Azure HDInsight on AKS 预览版信息。 如有疑问或功能建议,请在 AskHDInsight 上提交请求并附上详细信息,并关注我们以获取 Azure HDInsight Community 的更多更新。

Apache Flink 的已知用例是流分析。 许多用户普遍选择使用数据流,这些数据流是使用 Apache Kafka 引入的。 Flink 和 Kafka 的典型安装从推送到 Kafka 的事件流开始,Flink 作业可以使用该流。 Azure 事件中心在一个事件中心提供 Apache Kafka 终结点,使用户能够使用 Kafka 协议连接到事件中心。

本文介绍如何将 Azure 事件中心AKS 上的 Apache Flink on HDInsight 连接,并介绍以下内容

  • 创建事件中心命名空间
  • 使用 Apache Flink 在 AKS 群集上创建 HDInsight
  • 运行 Flink 制造者
  • Apache Flink 的包 Jar
  • 作业提交和验证

创建事件中心命名空间和事件中心

  1. 若要创建事件中心命名空间和事件中心,请参阅此处

    显示事件中心设置的屏幕截图。

  1. 在 AKS 群集池上使用现有的 HDInsight,可以创建 Flink 群集

  2. 运行 Flink 生成者,添加 bootstrap.serversproducer.config 信息

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. {YOUR.EVENTHUBS.CONNECTION.STRING} 替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅有关如何获取事件中心连接字符串的详细信息

    例如,

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. 包 com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. 添加代码片段以运行 Flink 生成者。

    显示如何在事件中心测试 Flink 的屏幕截图。

  3. 执行代码后,事件将存储在主题“topic1”中

    显示主题中存储的事件中心的屏幕截图。

参考