分享方式:


使用適用於 Apache Kafka® 的 Azure 事件中樞,在 AKS 上 HDInsight 上連接 Apache Flink®

注意

AKS 上的 Azure HDInsight 將於 2025 年 1 月 31 日退場。 請於 2025 年 1 月 31 日之前,將工作負載移轉至 Microsoft Fabric 或對等的 Azure 產品,以免工作負載突然終止。 訂用帳戶中剩餘的叢集將會停止,並會從主機移除。

在淘汰日期之前,只有基本支援可用。

重要

此功能目前為預覽功能。 Microsoft Azure 預覽版增補使用規定包含適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的更多法律條款。 若需此特定預覽版的相關資訊,請參閱 Azure HDInsight on AKS 預覽版資訊。 如有問題或功能建議,請在 AskHDInsight 上提交要求並附上詳細資料,並且在 Azure HDInsight 社群上追蹤我們以獲得更多更新資訊。

Apache Flink 的已知使用案例是串流分析。 許多使用者使用資料流的熱門選擇,會使用 Apache Kafka 來擷取這些資料流。 Flink 和 Kafka 的一般安裝會從正在推送至 Kafka 的事件串流開始,然後由 Flink 作業使用。 Azure 事件中樞在事件中樞上提供一個 Apache Kafka 端點,可讓使用者使用 Kafka 通訊協定連線到事件中樞。

在本文中,我們會探索如何將 Azure 事件中樞在 AKS 上 HDInsight 上的 Apache Flink 連接,並說明下列內容

  • 建立事件中樞命名空間
  • 使用 Apache Flink 建立在 AKS 上的 HDInsight 叢集
  • 執行 Flink 生產者
  • 適用於 Apache Flink 的套件 Jar
  • 作業提交和驗證

建立事件中樞命名空間和事件中樞

  1. 若要建立事件中樞命名空間和事件中樞,請參閱此處

    顯示事件中樞設定的螢幕擷取畫面。

  1. 您可以使用現有在 AKS 上 HDInsight 叢集集區,建立 Flink 叢集

  2. 執行會新增 bootstrap.serversproducer.config 資訊的 Flink 產生者

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. {YOUR.EVENTHUBS.CONNECTION.STRING} 取代為事件中樞命名空間的連接字串。 如需有關取得連接字串的指示,請參閱如何取得事件中樞連接字串的詳細資料。

    例如,

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. 封裝 com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. 新增程式碼片段以執行 Flink 產生者。

    顯示如何在事件中樞測試 Flink 的螢幕擷取畫面。

  3. 執行程式碼之後,事件會儲存在主題 "topic1"

    顯示儲存在主題中的事件中樞的螢幕擷取畫面。

參考