Use Apache Flink on HDInsight on AKS with Azure Service Bus

Note

We will retire Azure HDInsight on AKS on January 31, 2025. Before January 31, 2025, you will need to migrate your workloads to Microsoft Fabric or an equivalent Azure product to avoid abrupt termination of your workloads. The remaining clusters on your subscription will be stopped and removed from the host.

Only basic support will be available until the retirement date.

Important

This feature is currently in preview. The Supplemental Terms of Use for Microsoft Azure Previews include more legal terms that apply to Azure features that are in beta, in preview, or otherwise not yet released into general availability. For information about this specific preview, see Azure HDInsight on AKS preview information. For questions or feature suggestions, please submit a request on AskHDInsight with the details and follow us for more updates on Azure HDInsight Community.

This article provides an overview and demonstration of Apache Flink DataStream API on HDInsight on AKS for Azure Service Bus. A Flink job demonstration is designed to read messages from an Azure Service Bus and writes them to Azure Data Lake Storage Gen2 (ADLS Gen2).

Prerequisites

  • Flink Cluster 1.17.0 on HDInsight on AKS
  • For this demonstration, we use a Window VM as maven project develop env in the same VNET as HDInsight on AKS.
  • During the creation of the Flink cluster, you are required to ensure that SSH access is selected. This enables you to access the cluster using Secure Shell (SSH).
  • Set up an Azure Service Bus instance.
  • To proceed with the integration, obtain the necessary connection string, topic name, and subscription name for your Azure Service Bus.

This job is designed to read messages from an Azure Service Bus and writes them to Data Lake Storage Gen2 (ADLS Gen2).

To initiate a job, transfer the JAR file into the webssh pod and submit the job using the following command:

bin/flink run -c contoso.example.ServiceBusToAdlsGen2 -j AzureServiceBusDemo-1.0-SNAPSHOT.jar
Job has been submitted with JobID fc5793361a914821c968b5746a804570

After submitting the job, access the Flink Dashboard UI and click on the running job for further details.

Screenshot shows confirm job on Flink UI.

Sending message from Azure Service Bus Explorer

Navigate to the Service Bus Explorer on the Azure portal and send messages to the corresponding Service Bus.

Screenshot shows sending message from Azure portal Serice Bus Explorer.

Review the running details of the job on the Flink UI for insights.

Screenshot shows checking job running details on Flink UI.

Confirm output file in ADLS gen2 on Portal

After successfully publishing messages, verify the output file generated by the Flink job in ADLS Gen2 storage on the Azure portal.

Screenshot shows confirm output file in ADLS gen2 on Portal.

Source Code

In the POM.xml file, we define the project's dependencies using Maven, ensuring a smooth and organized management of libraries and plugins. Here's a snippet illustrating how to include dependencies for a project, such as one involving Apache Flink:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>AzureServiceBusDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core</artifactId>
            <version>1.26.0</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
            <version>1.4.1</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>com.microsoft.azure</groupId>-->
<!--            <artifactId>azure-servicebus</artifactId>-->
<!--            <version>3.6.0</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.5.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

main function: ServiceBusToAdlsGen2.java

package contoso.example;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class ServiceBusToAdlsGen2 {

    public static void main(String[] args) throws Exception {

        // Set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final String connectionString = "Endpoint=sb://contososervicebus.servicebus.windows.net/;SharedAccessKeyName=policy1;SharedAccessKey=<key>";
        final String topicName = "topic1";
        final String subName = "subscription1";

        // Create a source function for Azure Service Bus
        SessionBasedServiceBusSource sourceFunction = new SessionBasedServiceBusSource(connectionString, topicName, subName);

        // Create a data stream using the source function
        DataStream<String> stream = env.addSource(sourceFunction);

        // Process the data (this is where you'd put your processing logic)
        DataStream<String> processedStream = stream.map(value -> processValue(value));
        processedStream.print();

        // 3. sink to gen2
        String outputPath = "abfs://<container>@<account>.dfs.core.windows.net/data/ServiceBus/Topic1";
//        String outputPath = "src/ServiceBugOutput/";

        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.
                                        ofMebiBytes(5))
                                .build())
                .build();
        // Add the sink function to the processed stream
        processedStream.sinkTo(sink);

        // Execute the job
        env.execute("ServiceBusToDataLakeJob");
    }

    private static String processValue(String value) {
        // Implement your processing logic here
        return value;
    }
}

input source: SessionBasedServiceBusSource.java

package contoso.example;

import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class SessionBasedServiceBusSource extends RichParallelSourceFunction<String> {

    private final String connectionString;
    private final String topicName;
    private final String subscriptionName;
    private volatile boolean isRunning = true;
    private ServiceBusSessionReceiverAsyncClient sessionReceiver;

    public SessionBasedServiceBusSource(String connectionString, String topicName, String subscriptionName) {
        this.connectionString = connectionString;
        this.topicName = topicName;
        this.subscriptionName = subscriptionName;
    }

    @Override
    public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
        ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sessionReceiver()
                .topicName(topicName)
                .subscriptionName(subscriptionName)
                .buildAsyncClient();

        sessionReceiver.acceptNextSession()
                .flatMapMany(session -> session.receiveMessages())
                .doOnNext(message -> {
                    try {
                        ctx.collect(message.getBody().toString());
                    } catch (Exception e) {
                        System.out.printf("An error occurred: %s.", e.getMessage());
                    }
                })
                .doOnError(error -> System.out.printf("An error occurred: %s.", error.getMessage()))
                .blockLast();
    }

    @Override
    public void cancel() {
        isRunning = false;
        if (sessionReceiver != null) {
            sessionReceiver.close();
        }
    }
}

Key components and functionality of the provided code

Main code: ServiceBusToAdlsgen2.java

This Java class, ServiceBusToAdlsgen2, orchestrates the entire Flink job for the DStreamAPI AzureServiceBusDemo, shows a robust integration between Apache Flink and Azure Service Bus.

  1. Setting up the execution environment

    The StreamExecutionEnvironment.getExecutionEnvironment() method is used to set up the execution environment for the Flink job.

  2. Creating a source function for Azure Service Bus

    A SessionBasedServiceBusSource object is created with the connection string, topic name, and subscription name for your Azure Service Bus. This object is a source function that can be used to create a data stream.

  3. Creating a data stream

    The env.addSource(sourceFunction) method is used to create a data stream from the source function. Each message from the Azure Service Bus topic becomes an element in this stream.

  4. Processing the data

    The stream.map(value -> processValue(value)) method is used to process each element in the stream. In this case, the processValue method is applied to each element. This is where you’d put your processing logic.

  5. Creating a sink for Azure Data Lake Storage Gen2

    A FileSink object is created with the output path and a SimpleStringEncoder. The withRollingPolicy method is used to set a rolling policy for the sink.

  6. Adding the sink function to the processed stream

    The processedStream.sinkTo(sink) method is used to add the sink function to the processed stream. Each processed element is written to a file in Azure Data Lake Storage Gen2.

  7. Executing the job

    Finally, the env.execute("ServiceBusToDataLakeJob") method is used to execute the Flink job. This starts reading messages from the Azure Service Bus topic, process them, and write them to Azure Data Lake Storage Gen2.

This Flink source function, encapsulated within the SessionBasedServiceBusSource.java class, establishing a connection with Azure Service Bus, retrieving messages, and integrating with Apache Flink for parallel data processing. The following is key aspects of this source function:

  1. Class Definition

    The SessionBasedServiceBusSource class extends RichParallelSourceFunction<String>, which is a base class for implementing a parallel data source in Flink.

  2. Instance Variables

    The connectionString, topicName, and subscriptionName variables hold the connection string, topic name, and subscription name for your Azure Service Bus. The isRunning flag is used to control the execution of the source function. The sessionReceiver is an instance of erviceBusSessionReceiverAsyncClient, which is used to receive messages from the Service Bus.

  3. Constructor

    The constructor initializes the instance variables with the provided values.

  4. run() Method

    This method is where the source function starts to emit data to Flink. It creates a ServiceBusSessionReceiverAsyncClient, accepts the next available session, and starts receiving messages from that session. Each message’s body is then collected into the Flink source context.

  5. cancel() Method

    This method is called when the source function needs to be stopped. It sets the isRunning flag to false and closes the sessionReceiver.

Reference