你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
将适用于 Apache Kafka® 的 Azure 事件中心与 AKS 上的 Apache Flink® on HDInsight 相连接
注意
我们将于 2025 年 1 月 31 日停用 Azure HDInsight on AKS。 在 2025 年 1 月 31 日之前,你需要将工作负荷迁移到 Microsoft Fabric 或同等的 Azure 产品,以避免工作负荷突然终止。 订阅上的剩余群集会被停止并从主机中移除。
在停用日期之前,仅提供基本支持。
重要
此功能目前以预览版提供。 Microsoft Azure 预览版的补充使用条款包含适用于 beta 版、预览版或其他尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览版的信息,请参阅 Azure HDInsight on AKS 预览版信息。 如有疑问或功能建议,请在 AskHDInsight 上提交请求并附上详细信息,并关注我们以获取 Azure HDInsight Community 的更多更新。
Apache Flink 的已知用例是流分析。 许多用户普遍选择使用数据流,这些数据流是使用 Apache Kafka 引入的。 Flink 和 Kafka 的典型安装从推送到 Kafka 的事件流开始,Flink 作业可以使用该流。 Azure 事件中心在一个事件中心提供 Apache Kafka 终结点,使用户能够使用 Kafka 协议连接到事件中心。
本文介绍如何将 Azure 事件中心与 AKS 上的 Apache Flink on HDInsight 连接,并介绍以下内容
- 创建事件中心命名空间
- 使用 Apache Flink 在 AKS 群集上创建 HDInsight
- 运行 Flink 制造者
- Apache Flink 的包 Jar
- 作业提交和验证
创建事件中心命名空间和事件中心
若要创建事件中心命名空间和事件中心,请参阅此处
在 AKS 上的 HDInsight 上设置 Flink 群集
在 AKS 群集池上使用现有的 HDInsight,可以创建 Flink 群集
运行 Flink 生成者,添加 bootstrap.servers 和
producer.config
信息bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
将
{YOUR.EVENTHUBS.CONNECTION.STRING}
替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅有关如何获取事件中心连接字符串的详细信息。例如,
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
打包适用于 Flink 的 JAR
包 com.example.app;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
添加代码片段以运行 Flink 生成者。
执行代码后,事件将存储在主题“topic1”中
参考
- Apache Flink 网站
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和关联的开源项目名称是 Apache Software Foundation (ASF) 的商标。