Use Apache Flink on HDInsight on AKS with Azure Service Bus
Note
We will retire Azure HDInsight on AKS on January 31, 2025. Before January 31, 2025, you will need to migrate your workloads to Microsoft Fabric or an equivalent Azure product to avoid abrupt termination of your workloads. The remaining clusters on your subscription will be stopped and removed from the host.
Only basic support will be available until the retirement date.
Important
This feature is currently in preview. The Supplemental Terms of Use for Microsoft Azure Previews include more legal terms that apply to Azure features that are in beta, in preview, or otherwise not yet released into general availability. For information about this specific preview, see Azure HDInsight on AKS preview information. For questions or feature suggestions, please submit a request on AskHDInsight with the details and follow us for more updates on Azure HDInsight Community.
This article provides an overview and demonstration of Apache Flink DataStream API on HDInsight on AKS for Azure Service Bus. A Flink job demonstration is designed to read messages from an Azure Service Bus and writes them to Azure Data Lake Storage Gen2 (ADLS Gen2).
Prerequisites
- Flink Cluster 1.17.0 on HDInsight on AKS
- For this demonstration, we use a Window VM as maven project develop env in the same VNET as HDInsight on AKS.
- During the creation of the Flink cluster, you are required to ensure that SSH access is selected. This enables you to access the cluster using Secure Shell (SSH).
- Set up an Azure Service Bus instance.
- To proceed with the integration, obtain the necessary connection string, topic name, and subscription name for your Azure Service Bus.
Develop Apache Flink job
This job is designed to read messages from an Azure Service Bus and writes them to Data Lake Storage Gen2 (ADLS Gen2).
Submit the JAR into Azure HDInsight on AKS with Flink
To initiate a job, transfer the JAR file into the webssh pod and submit the job using the following command:
bin/flink run -c contoso.example.ServiceBusToAdlsGen2 -j AzureServiceBusDemo-1.0-SNAPSHOT.jar
Job has been submitted with JobID fc5793361a914821c968b5746a804570
Confirm job submission on Flink UI
After submitting the job, access the Flink Dashboard UI and click on the running job for further details.
Sending message from Azure Service Bus Explorer
Navigate to the Service Bus Explorer on the Azure portal and send messages to the corresponding Service Bus.
Check job run details on Apache Flink UI
Review the running details of the job on the Flink UI for insights.
Confirm output file in ADLS gen2 on Portal
After successfully publishing messages, verify the output file generated by the Flink job in ADLS Gen2 storage on the Azure portal.
Source Code
In the POM.xml file, we define the project's dependencies using Maven, ensuring a smooth and organized management of libraries and plugins. Here's a snippet illustrating how to include dependencies for a project, such as one involving Apache Flink:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>AzureServiceBusDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.26.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.4.1</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.microsoft.azure</groupId>-->
<!-- <artifactId>azure-servicebus</artifactId>-->
<!-- <version>3.6.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.5.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
main function: ServiceBusToAdlsGen2.java
package contoso.example;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class ServiceBusToAdlsGen2 {
public static void main(String[] args) throws Exception {
// Set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final String connectionString = "Endpoint=sb://contososervicebus.servicebus.windows.net/;SharedAccessKeyName=policy1;SharedAccessKey=<key>";
final String topicName = "topic1";
final String subName = "subscription1";
// Create a source function for Azure Service Bus
SessionBasedServiceBusSource sourceFunction = new SessionBasedServiceBusSource(connectionString, topicName, subName);
// Create a data stream using the source function
DataStream<String> stream = env.addSource(sourceFunction);
// Process the data (this is where you'd put your processing logic)
DataStream<String> processedStream = stream.map(value -> processValue(value));
processedStream.print();
// 3. sink to gen2
String outputPath = "abfs://<container>@<account>.dfs.core.windows.net/data/ServiceBus/Topic1";
// String outputPath = "src/ServiceBugOutput/";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.
ofMebiBytes(5))
.build())
.build();
// Add the sink function to the processed stream
processedStream.sinkTo(sink);
// Execute the job
env.execute("ServiceBusToDataLakeJob");
}
private static String processValue(String value) {
// Implement your processing logic here
return value;
}
}
input source: SessionBasedServiceBusSource.java
package contoso.example;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class SessionBasedServiceBusSource extends RichParallelSourceFunction<String> {
private final String connectionString;
private final String topicName;
private final String subscriptionName;
private volatile boolean isRunning = true;
private ServiceBusSessionReceiverAsyncClient sessionReceiver;
public SessionBasedServiceBusSource(String connectionString, String topicName, String subscriptionName) {
this.connectionString = connectionString;
this.topicName = topicName;
this.subscriptionName = subscriptionName;
}
@Override
public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.connectionString(connectionString)
.sessionReceiver()
.topicName(topicName)
.subscriptionName(subscriptionName)
.buildAsyncClient();
sessionReceiver.acceptNextSession()
.flatMapMany(session -> session.receiveMessages())
.doOnNext(message -> {
try {
ctx.collect(message.getBody().toString());
} catch (Exception e) {
System.out.printf("An error occurred: %s.", e.getMessage());
}
})
.doOnError(error -> System.out.printf("An error occurred: %s.", error.getMessage()))
.blockLast();
}
@Override
public void cancel() {
isRunning = false;
if (sessionReceiver != null) {
sessionReceiver.close();
}
}
}
Key components and functionality of the provided code
Main code: ServiceBusToAdlsgen2.java
This Java class, ServiceBusToAdlsgen2
, orchestrates the entire Flink job for the DStreamAPI AzureServiceBusDemo
, shows a robust integration between Apache Flink and Azure Service Bus.
Setting up the execution environment
The
StreamExecutionEnvironment.getExecutionEnvironment()
method is used to set up the execution environment for the Flink job.Creating a source function for Azure Service Bus
A
SessionBasedServiceBusSource
object is created with the connection string, topic name, and subscription name for your Azure Service Bus. This object is a source function that can be used to create a data stream.Creating a data stream
The
env.addSource(sourceFunction)
method is used to create a data stream from the source function. Each message from the Azure Service Bus topic becomes an element in this stream.Processing the data
The
stream.map(value -> processValue(value))
method is used to process each element in the stream. In this case, theprocessValue
method is applied to each element. This is where you’d put your processing logic.Creating a sink for Azure Data Lake Storage Gen2
A
FileSink object
is created with the output path and aSimpleStringEncoder
. ThewithRollingPolicy
method is used to set a rolling policy for the sink.Adding the sink function to the processed stream
The
processedStream.sinkTo(sink)
method is used to add the sink function to the processed stream. Each processed element is written to a file in Azure Data Lake Storage Gen2.Executing the job
Finally, the
env.execute("ServiceBusToDataLakeJob")
method is used to execute the Flink job. This starts reading messages from the Azure Service Bus topic, process them, and write them to Azure Data Lake Storage Gen2.
Flink source function: SessionBasedServiceBusSource.java
This Flink source function, encapsulated within the SessionBasedServiceBusSource.java
class, establishing a connection with Azure Service Bus, retrieving messages, and integrating with Apache Flink for parallel data processing. The following is key aspects of this source function:
Class Definition
The
SessionBasedServiceBusSource
class extendsRichParallelSourceFunction<String>
, which is a base class for implementing a parallel data source in Flink.Instance Variables
The
connectionString
,topicName
, andsubscriptionName
variables hold the connection string, topic name, and subscription name for your Azure Service Bus. The isRunning flag is used to control the execution of the source function. ThesessionReceiver
is an instance oferviceBusSessionReceiverAsyncClient
, which is used to receive messages from the Service Bus.Constructor
The constructor initializes the instance variables with the provided values.
run() Method
This method is where the source function starts to emit data to Flink. It creates a
ServiceBusSessionReceiverAsyncClient
, accepts the next available session, and starts receiving messages from that session. Each message’s body is then collected into the Flink source context.cancel() Method
This method is called when the source function needs to be stopped. It sets the isRunning flag to false and closes the
sessionReceiver
.
Reference
- To learn more about Azure Service Bus, refer to the What is Azure Service Bus?.
- For guidance on creating topics, consult the Service Bus Explorer.
- Apache, Apache Flink, Flink, and associated open source project names are trademarks of the Apache Software Foundation (ASF).