Azure Service Bus で AKS 上の HDInsight で Apache Flink を使用する
重要
AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 詳細を知るために、このお知らせ を確認してください。
ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。
重要
この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能のご提案は、AskHDInsight に詳細を添えてリクエストを送信してください。また、最新情報を得るために Azure HDInsight Communityをフォローしてください。
この記事では、Azure Service Bus 用 AKS 上の HDInsight 上の Apache Flink DataStream API の概要とデモについて説明します。 Flink ジョブのデモンストレーションは、Azure Service Bus からメッセージを読み取り、Azure Data Lake Storage Gen2 (ADLS Gen2) に書き込みます。
- AKS 上の HDInsight での Flink クラスター 1.17.0 の
- このデモでは、Windows VM を Maven プロジェクトとして使用し、AKS 上の HDInsight と同じ VNET で env を開発します。
- Flink クラスターの 作成 中に、SSH アクセスが選択されていることを確認する必要があります。 これにより、Secure Shell (SSH) を使用してクラスターにアクセスできます。
- Azure Service Bus インスタンスを設定します。
- 統合を続行するには、Azure Service Busに必要な接続文字列、トピック名、サブスクリプション名を取得します。
このジョブは、Azure Service Bus からメッセージを読み取り、Data Lake Storage Gen2 (ADLS Gen2) に書き込みます。
ジョブを開始するには、次のコマンドを使用して JAR ファイルを webssh ポッドに転送し、ジョブを送信します。
bin/flink run -c contoso.example.ServiceBusToAdlsGen2 -j AzureServiceBusDemo-1.0-SNAPSHOT.jar
Job has been submitted with JobID fc5793361a914821c968b5746a804570
ジョブを送信したら、Flink ダッシュボード UI にアクセスし、実行中のジョブをクリックして詳細を確認します。
Azure portal で Service Bus Explorer に移動し、対応する Service Bus にメッセージを送信します。
Flink UI でジョブの実行中の詳細を確認して、分析情報を確認します。
メッセージが正常に発行されたら、Azure portal 上の ADLS Gen2 ストレージの Flink ジョブによって生成された出力ファイルを確認します。
POM.xml ファイルでは、Maven を使用してプロジェクトの依存関係を定義し、ライブラリとプラグインの管理を円滑かつ整理します。 Apache Flink を含む依存関係など、プロジェクトの依存関係を含める方法を示すスニペットを次に示します。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>AzureServiceBusDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.26.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.4.1</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.microsoft.azure</groupId>-->
<!-- <artifactId>azure-servicebus</artifactId>-->
<!-- <version>3.6.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.5.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
メイン関数: ServiceBusToAdlsGen2.java
package contoso.example;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class ServiceBusToAdlsGen2 {
public static void main(String[] args) throws Exception {
// Set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final String connectionString = "Endpoint=sb://contososervicebus.servicebus.windows.net/;SharedAccessKeyName=policy1;SharedAccessKey=<key>";
final String topicName = "topic1";
final String subName = "subscription1";
// Create a source function for Azure Service Bus
SessionBasedServiceBusSource sourceFunction = new SessionBasedServiceBusSource(connectionString, topicName, subName);
// Create a data stream using the source function
DataStream<String> stream = env.addSource(sourceFunction);
// Process the data (this is where you'd put your processing logic)
DataStream<String> processedStream = stream.map(value -> processValue(value));
processedStream.print();
// 3. sink to gen2
String outputPath = "abfs://<container>@<account>.dfs.core.windows.net/data/ServiceBus/Topic1";
// String outputPath = "src/ServiceBugOutput/";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.
ofMebiBytes(5))
.build())
.build();
// Add the sink function to the processed stream
processedStream.sinkTo(sink);
// Execute the job
env.execute("ServiceBusToDataLakeJob");
}
private static String processValue(String value) {
// Implement your processing logic here
return value;
}
}
入力ソース: SessionBasedServiceBusSource.java
package contoso.example;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class SessionBasedServiceBusSource extends RichParallelSourceFunction<String> {
private final String connectionString;
private final String topicName;
private final String subscriptionName;
private volatile boolean isRunning = true;
private ServiceBusSessionReceiverAsyncClient sessionReceiver;
public SessionBasedServiceBusSource(String connectionString, String topicName, String subscriptionName) {
this.connectionString = connectionString;
this.topicName = topicName;
this.subscriptionName = subscriptionName;
}
@Override
public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.connectionString(connectionString)
.sessionReceiver()
.topicName(topicName)
.subscriptionName(subscriptionName)
.buildAsyncClient();
sessionReceiver.acceptNextSession()
.flatMapMany(session -> session.receiveMessages())
.doOnNext(message -> {
try {
ctx.collect(message.getBody().toString());
} catch (Exception e) {
System.out.printf("An error occurred: %s.", e.getMessage());
}
})
.doOnError(error -> System.out.printf("An error occurred: %s.", error.getMessage()))
.blockLast();
}
@Override
public void cancel() {
isRunning = false;
if (sessionReceiver != null) {
sessionReceiver.close();
}
}
}
この Java クラス ServiceBusToAdlsgen2
、DStreamAPI AzureServiceBusDemo
の Flink ジョブ全体を調整し、Apache Flink と Azure Service Bus の間の堅牢な統合を示します。
実行環境の設定
StreamExecutionEnvironment.getExecutionEnvironment()
メソッドは、Flink ジョブの実行環境を設定するために使用されます。Azure Service Bus のソース関数の作成
SessionBasedServiceBusSource
オブジェクトは、Azure Service Bus の接続文字列、トピック名、サブスクリプション名を使用して作成されます。 このオブジェクトは、データ ストリームの作成に使用できるソース関数です。データ ストリーム の作成
env.addSource(sourceFunction)
メソッドは、ソース関数からデータ ストリームを作成するために使用されます。 Azure Service Bus トピックからの各メッセージは、このストリームの要素になります。データ の処理
stream.map(value -> processValue(value))
メソッドは、ストリーム内の各要素を処理するために使用されます。 この場合、processValue
メソッドが各要素に適用されます。 ここで処理ロジックを配置します。Azure Data Lake Storage Gen2 のシンクの作成
出力パスと
SimpleStringEncoder
を使用してFileSink object
が作成されます。withRollingPolicy
メソッドは、シンクのローリング ポリシーを設定するために使用されます。処理されたストリーム にシンク関数を追加する
processedStream.sinkTo(sink)
メソッドは、処理されたストリームにシンク関数を追加するために使用されます。 処理された各要素は、Azure Data Lake Storage Gen2 のファイルに書き込まれます。ジョブ の実行
最後に、
env.execute("ServiceBusToDataLakeJob")
メソッドを使用して Flink ジョブを実行します。 これにより、Azure Service Bus トピックからのメッセージの読み取り、処理、および Azure Data Lake Storage Gen2 への書き込みが開始されます。
この Flink ソース関数は、SessionBasedServiceBusSource.java
クラス内にカプセル化され、Azure Service Bus との接続を確立し、メッセージを取得し、Apache Flink と統合して並列データ処理を行います。 このソース関数の主な側面を次に示します。
クラス定義
SessionBasedServiceBusSource
クラスは、Flink で並列データ ソースを実装するための基本クラスであるRichParallelSourceFunction<String>
を拡張します。インスタンス変数
connectionString
、topicName
、およびsubscriptionName
変数は、Azure Service Bus の接続文字列、トピック名、サブスクリプション名を保持します。 isRunning フラグは、ソース関数の実行を制御するために使用されます。sessionReceiver
は、Service Bus からメッセージを受信するために使用されるServiceBusSessionReceiverAsyncClient
のインスタンスです。コンストラクター
コンストラクターは、指定された値を使用してインスタンス変数を初期化します。
run() メソッド
このメソッドは、ソース関数が Flink にデータを出力し始める場所です。
ServiceBusSessionReceiverAsyncClient
を作成し、次に使用可能なセッションを受け入れ、そのセッションからのメッセージの受信を開始します。 その後、各メッセージの本文が Flink ソース コンテキストに収集されます。cancel() メソッド
このメソッドは、ソース関数を停止する必要があるときに呼び出されます。 isRunning フラグを false に設定し、
sessionReceiver
を閉じます。
- Azure Service Bus の詳細については、「Azure Service Bus とは」を参照してください。.
- トピックの作成に関するガイダンスについては、Service Bus Explorerを参照してください。
- Apache、Apache Flink、Flink、および関連するオープンソースプロジェクト名は、Apache Software Foundation (ASF)の商標です。