Share via


HDInsight on AKS の Apache Flink® と Apache Kafka® 用の Azure Event Hubs を接続する

重要

現在、この機能はプレビュー段階にあります。 ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用されるその他の法律条項については、「Microsoft Azure プレビューの追加使用条件」に記載されています。 この特定のプレビューの詳細については、「Azure HDInsight on AKS のプレビュー情報」を参照してください。 質問や機能の提案については、詳細を記載した要求を AskHDInsight で送信してください。また、その他の更新については、Azure HDInsight コミュニティのフォローをお願いいたします。

Apache Flink のよく知られているユース ケースは、ストリーム分析です。 Apache Kafka を使用して取り込まれるデータ ストリームを使用する多くのユーザーが一般的に選択しています。 Flink と Kafka の一般的なインストールは、Flink ジョブで使用できるイベント ストリームが Kafka にプッシュされることから始まります。 Azure Event Hubs では、イベント ハブ上に Apache Kafka エンドポイントを提供します。これにより、ユーザーは Kafka プロトコルを使用してイベント ハブに接続できます。

この記事では、Azure Event HubsHDInsight on AKS の Apache Flink を接続する方法について説明し、以下について説明します

  • Event Hubs 名前空間を作成します
  • Apache Flink を使用して HDInsight on AKS クラスターを作成する
  • Flink プロデューサーを実行する
  • Apache Flink 用の Jar をパッケージ化する
  • ジョブの送信と検証

Event Hubs 名前空間と Event Hubs を作成する

  1. Event Hubs 名前空間と Event Hubs を作成するには、こちらを参照してください

    Event Hubs セットアップを示すスクリーンショット。

  1. 既存の HDInsight on AKS クラスター プールを使用して、Flink クラスターを作成できます

  2. bootstrap.serversproducer.config 情報を追加して、Flink プロデューサーを実行します

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. {YOUR.EVENTHUBS.CONNECTION.STRING} を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順について詳しくは、「Event Hubs の接続文字列の取得」を参照してください。

    たとえば、 にします。

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. com.example.app をパッケージ化します。

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Flink プロデューサーを実行するスニペットを追加します。

    Event Hubs で Flink をテストする方法を示すスクリーンショット。

  3. コードが実行されると、イベントはトピック "topic1" に格納されます

    トピックに格納された Event Hubs を示すスクリーンショット。

リファレンス