英語で読む

次の方法で共有


Azure Service Bus で AKS 上の HDInsight で Apache Flink を使用する

重要

AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 詳細を知るために、このお知らせ を確認してください

ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。

重要

この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能のご提案は、AskHDInsight に詳細を添えてリクエストを送信してください。また、最新情報を得るために Azure HDInsight Communityをフォローしてください。

この記事では、Azure Service Bus 用 AKS 上の HDInsight 上の Apache Flink DataStream API の概要とデモについて説明します。 Flink ジョブのデモンストレーションは、Azure Service Bus からメッセージを読み取り、Azure Data Lake Storage Gen2 (ADLS Gen2) に書き込みます。

前提 条件

  • AKS 上の HDInsight での Flink クラスター 1.17.0 の
  • このデモでは、Windows VM を Maven プロジェクトとして使用し、AKS 上の HDInsight と同じ VNET で env を開発します。
  • Flink クラスターの 作成 中に、SSH アクセスが選択されていることを確認する必要があります。 これにより、Secure Shell (SSH) を使用してクラスターにアクセスできます。
  • Azure Service Bus インスタンスを設定します。
  • 統合を続行するには、Azure Service Busに必要な接続文字列、トピック名、サブスクリプション名を取得します。

このジョブは、Azure Service Bus からメッセージを読み取り、Data Lake Storage Gen2 (ADLS Gen2) に書き込みます。

ジョブを開始するには、次のコマンドを使用して JAR ファイルを webssh ポッドに転送し、ジョブを送信します。

bin/flink run -c contoso.example.ServiceBusToAdlsGen2 -j AzureServiceBusDemo-1.0-SNAPSHOT.jar
Job has been submitted with JobID fc5793361a914821c968b5746a804570

ジョブを送信したら、Flink ダッシュボード UI にアクセスし、実行中のジョブをクリックして詳細を確認します。

スクリーンショットは、Flink UI の確認ジョブを示しています。

Azure Service Bus Explorer からのメッセージの送信

Azure portal で Service Bus Explorer に移動し、対応する Service Bus にメッセージを送信します。

スクリーンショットは、Azure portal Service Bus Explorer からのメッセージの送信を示しています。

Flink UI でジョブの実行中の詳細を確認して、分析情報を確認します。

スクリーンショットは、Flink UI でジョブ実行の詳細を確認する方法を示しています。

ポータルの ADLS gen2 で出力ファイルを確認する

メッセージが正常に発行されたら、Azure portal 上の ADLS Gen2 ストレージの Flink ジョブによって生成された出力ファイルを確認します。

スクリーンショットは、ポータルの ADLS gen2 の出力ファイルの確認を示しています。

ソースコード

POM.xml ファイルでは、Maven を使用してプロジェクトの依存関係を定義し、ライブラリとプラグインの管理を円滑かつ整理します。 Apache Flink を含む依存関係など、プロジェクトの依存関係を含める方法を示すスニペットを次に示します。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>AzureServiceBusDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core</artifactId>
            <version>1.26.0</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
            <version>1.4.1</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>com.microsoft.azure</groupId>-->
<!--            <artifactId>azure-servicebus</artifactId>-->
<!--            <version>3.6.0</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.5.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

メイン関数: ServiceBusToAdlsGen2.java

package contoso.example;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class ServiceBusToAdlsGen2 {

    public static void main(String[] args) throws Exception {

        // Set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final String connectionString = "Endpoint=sb://contososervicebus.servicebus.windows.net/;SharedAccessKeyName=policy1;SharedAccessKey=<key>";
        final String topicName = "topic1";
        final String subName = "subscription1";

        // Create a source function for Azure Service Bus
        SessionBasedServiceBusSource sourceFunction = new SessionBasedServiceBusSource(connectionString, topicName, subName);

        // Create a data stream using the source function
        DataStream<String> stream = env.addSource(sourceFunction);

        // Process the data (this is where you'd put your processing logic)
        DataStream<String> processedStream = stream.map(value -> processValue(value));
        processedStream.print();

        // 3. sink to gen2
        String outputPath = "abfs://<container>@<account>.dfs.core.windows.net/data/ServiceBus/Topic1";
//        String outputPath = "src/ServiceBugOutput/";

        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.
                                        ofMebiBytes(5))
                                .build())
                .build();
        // Add the sink function to the processed stream
        processedStream.sinkTo(sink);

        // Execute the job
        env.execute("ServiceBusToDataLakeJob");
    }

    private static String processValue(String value) {
        // Implement your processing logic here
        return value;
    }
}

入力ソース: SessionBasedServiceBusSource.java

package contoso.example;

import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class SessionBasedServiceBusSource extends RichParallelSourceFunction<String> {

    private final String connectionString;
    private final String topicName;
    private final String subscriptionName;
    private volatile boolean isRunning = true;
    private ServiceBusSessionReceiverAsyncClient sessionReceiver;

    public SessionBasedServiceBusSource(String connectionString, String topicName, String subscriptionName) {
        this.connectionString = connectionString;
        this.topicName = topicName;
        this.subscriptionName = subscriptionName;
    }

    @Override
    public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
        ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sessionReceiver()
                .topicName(topicName)
                .subscriptionName(subscriptionName)
                .buildAsyncClient();

        sessionReceiver.acceptNextSession()
                .flatMapMany(session -> session.receiveMessages())
                .doOnNext(message -> {
                    try {
                        ctx.collect(message.getBody().toString());
                    } catch (Exception e) {
                        System.out.printf("An error occurred: %s.", e.getMessage());
                    }
                })
                .doOnError(error -> System.out.printf("An error occurred: %s.", error.getMessage()))
                .blockLast();
    }

    @Override
    public void cancel() {
        isRunning = false;
        if (sessionReceiver != null) {
            sessionReceiver.close();
        }
    }
}

提供されたコードの主要なコンポーネントと機能

メイン コード: ServiceBusToAdlsgen2.java

この Java クラス ServiceBusToAdlsgen2DStreamAPI AzureServiceBusDemoの Flink ジョブ全体を調整し、Apache Flink と Azure Service Bus の間の堅牢な統合を示します。

  1. 実行環境の設定

    StreamExecutionEnvironment.getExecutionEnvironment() メソッドは、Flink ジョブの実行環境を設定するために使用されます。

  2. Azure Service Bus のソース関数の作成

    SessionBasedServiceBusSource オブジェクトは、Azure Service Bus の接続文字列、トピック名、サブスクリプション名を使用して作成されます。 このオブジェクトは、データ ストリームの作成に使用できるソース関数です。

  3. データ ストリーム の作成

    env.addSource(sourceFunction) メソッドは、ソース関数からデータ ストリームを作成するために使用されます。 Azure Service Bus トピックからの各メッセージは、このストリームの要素になります。

  4. データ の処理

    stream.map(value -> processValue(value)) メソッドは、ストリーム内の各要素を処理するために使用されます。 この場合、processValue メソッドが各要素に適用されます。 ここで処理ロジックを配置します。

  5. Azure Data Lake Storage Gen2 のシンクの作成

    出力パスと SimpleStringEncoderを使用して FileSink object が作成されます。 withRollingPolicy メソッドは、シンクのローリング ポリシーを設定するために使用されます。

  6. 処理されたストリーム にシンク関数を追加する

    processedStream.sinkTo(sink) メソッドは、処理されたストリームにシンク関数を追加するために使用されます。 処理された各要素は、Azure Data Lake Storage Gen2 のファイルに書き込まれます。

  7. ジョブ の実行

    最後に、env.execute("ServiceBusToDataLakeJob") メソッドを使用して Flink ジョブを実行します。 これにより、Azure Service Bus トピックからのメッセージの読み取り、処理、および Azure Data Lake Storage Gen2 への書き込みが開始されます。

この Flink ソース関数は、SessionBasedServiceBusSource.java クラス内にカプセル化され、Azure Service Bus との接続を確立し、メッセージを取得し、Apache Flink と統合して並列データ処理を行います。 このソース関数の主な側面を次に示します。

  1. クラス定義

    SessionBasedServiceBusSource クラスは、Flink で並列データ ソースを実装するための基本クラスである RichParallelSourceFunction<String>を拡張します。

  2. インスタンス変数

    connectionStringtopicName、および subscriptionName 変数は、Azure Service Bus の接続文字列、トピック名、サブスクリプション名を保持します。 isRunning フラグは、ソース関数の実行を制御するために使用されます。 sessionReceiver は、Service Bus からメッセージを受信するために使用される ServiceBusSessionReceiverAsyncClientのインスタンスです。

  3. コンストラクター

    コンストラクターは、指定された値を使用してインスタンス変数を初期化します。

  4. run() メソッド

    このメソッドは、ソース関数が Flink にデータを出力し始める場所です。 ServiceBusSessionReceiverAsyncClientを作成し、次に使用可能なセッションを受け入れ、そのセッションからのメッセージの受信を開始します。 その後、各メッセージの本文が Flink ソース コンテキストに収集されます。

  5. cancel() メソッド

    このメソッドは、ソース関数を停止する必要があるときに呼び出されます。 isRunning フラグを false に設定し、sessionReceiverを閉じます。

参考