HDInsight on AKS の Apache Flink® と Apache Kafka® 用の Azure Event Hubs を接続する
重要
現在、この機能はプレビュー段階にあります。 ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用されるその他の法律条項については、「Microsoft Azure プレビューの追加使用条件」に記載されています。 この特定のプレビューの詳細については、「Azure HDInsight on AKS のプレビュー情報」を参照してください。 質問や機能の提案については、詳細を記載した要求を AskHDInsight で送信してください。また、その他の更新については、Azure HDInsight コミュニティのフォローをお願いいたします。
Apache Flink のよく知られているユース ケースは、ストリーム分析です。 Apache Kafka を使用して取り込まれるデータ ストリームを使用する多くのユーザーが一般的に選択しています。 Flink と Kafka の一般的なインストールは、Flink ジョブで使用できるイベント ストリームが Kafka にプッシュされることから始まります。 Azure Event Hubs では、イベント ハブ上に Apache Kafka エンドポイントを提供します。これにより、ユーザーは Kafka プロトコルを使用してイベント ハブに接続できます。
この記事では、Azure Event Hubs と HDInsight on AKS の Apache Flink を接続する方法について説明し、以下について説明します
- Event Hubs 名前空間を作成します
- Apache Flink を使用して HDInsight on AKS クラスターを作成する
- Flink プロデューサーを実行する
- Apache Flink 用の Jar をパッケージ化する
- ジョブの送信と検証
Event Hubs 名前空間と Event Hubs を作成する
Event Hubs 名前空間と Event Hubs を作成するには、こちらを参照してください
HDInsight on AKS で Flink クラスターを設定する
既存の HDInsight on AKS クラスター プールを使用して、Flink クラスターを作成できます
bootstrap.servers と
producer.config
情報を追加して、Flink プロデューサーを実行しますbootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
{YOUR.EVENTHUBS.CONNECTION.STRING}
を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順について詳しくは、「Event Hubs の接続文字列の取得」を参照してください。たとえば、 にします。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Flink 用の JAR をパッケージ化する
com.example.app をパッケージ化します。
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Flink プロデューサーを実行するスニペットを追加します。
コードが実行されると、イベントはトピック "topic1" に格納されます
リファレンス
- Apache Flink Web サイト
- Apache、Apache Kafka、Kafka、Apache Flink、Flink、関連するオープン ソース プロジェクト名は、Apache Software Foundation (ASF) の商標です。