Use Apache NiFi to consume processed Apache Kafka® topics from Apache Flink® and publish into ADLS Gen2

Important

This feature is currently in preview. The Supplemental Terms of Use for Microsoft Azure Previews include more legal terms that apply to Azure features that are in beta, in preview, or otherwise not yet released into general availability. For information about this specific preview, see Azure HDInsight on AKS preview information. For questions or feature suggestions, please submit a request on AskHDInsight with the details and follow us for more updates on Azure HDInsight Community.

Apache NiFi is a software project from the Apache Software Foundation designed to automate the flow of data between software systems. It supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic.

For more information, see Apache NiFi

In this document, we process streaming data using HDInsight Kafka and perform some transformations on HDInsight Apache Flink on AKS, consume these topics, and write the contents into ADLS Gen2 on Apache NiFi.

By combining the low latency streaming features of Apache Flink and the dataflow capabilities of Apache NiFi, you can process events at high volume. This combination helps you to trigger, enrich, filter, to enhance overall user experience. Both these technologies complement each other with their strengths in event streaming and correlation.

Prerequisites

  • Flink cluster on HDInsight on AKS
  • Kafka cluster on HDInsight
    • You're required to ensure the network settings taken care as described on Using Kafka on HDInsight to make sure HDInsight on AKS and HDInsight clusters are in the same VNet
  • For this demonstration, we're using a Window VM as maven project develop env in the same VNET as HDInsight on AKS
  • For this demonstration, we're using an Ubuntu VM in the same VNET as HDInsight on AKS, install Apache NiFi 1.22.0 on this VM

Prepare HDInsight Kafka topic

For purposes of this demonstration, we're using a HDInsight Kafka Cluster. Let us prepare HDInsight Kafka topic for the demo.

Note

Setup a HDInsight cluster with Apache Kafka and replace broker list with your own list before you get started for both Kafka 3.2.

Kafka 3.2.0

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic click_events --bootstrap-server wn0-contsk:9092

Setup Apache NiFi 1.22.0

For this demo, we install Apache NiFi 1.22.0 on an Ubuntu VM in the same VNet as HDInsight Flink on AKS, or you can also use your NiFi setup.

Apache NiFi Downloads

root@contosoubuntuvm:/home/myvm/nifi-1.22.0/bin# ./nifi.sh start

Java home: /home/myvm/jdk-18.0.1.1
NiFi home: /home/myvm/nifi-1.22.0

Bootstrap Config File: /home/myvm/nifi-1.22.0/conf/bootstrap.conf


root@contosoubuntuvm:/home/myvm/nifi-1.22.0/bin# jps
454421 NiFi
454467 Jps
454396 RunNiFi

Configuring NiFi UI

Here, we configure NiFi properties in order to be accessed outside the localhost VM.

$nifi_home/conf/nifi.properties

Screenshot showing how to define NiFi properties.

Let us develop the source code on Maven, and build the jar.

SinkToKafka.java

package contoso.example;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class SinkToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream env, update the broker-ips with your own
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String brokers = "<update-brokerip>:9092,<update-brokerip>:9092,<update-brokerip>:9092";   // Replace the broker list with your own

        // 2. event data source
        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        DataStream<String> dataStream = stream.map(line-> {
            String str1 = line.toString();
            return str1;
        }).returns(Types.STRING);

        // 3. sink click events to  kafka
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("click_events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        dataStream.sinkTo(sink);
        env.execute("Sink click events to Kafka");
    }
}

Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public String ts;
    public Event() {
    }

    public Event(String user, String url, String ts) {
        this.user = user;
        this.url = url;
        this.ts = ts;
    }

    @Override
    public String toString(){
        return "\"" + ts + "\"" + "," + "\"" + user +  "\"" + ","  + "\"" + url + "\"";
    }
}

ClickSource.java

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;

public class ClickSource implements SourceFunction<Event> {
    // declare a flag
    private Boolean running = true;

    // declare a flag
    public void run(SourceContext<Event> ctx) throws Exception{
        // generate random record
        Random random = new Random();
        String[] users = {"Mary","Alice","Bob","Cary"};
        String[] urls = {"./home","./cart","./fav","./prod?id=100","./prod?id=10"};

        // loop generate
        while (running) {
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            Long timestamp = Calendar.getInstance().getTimeInMillis();
            String ts = timestamp.toString();
            ctx.collect(new Event(user,url,ts));
//            Thread.sleep(2000);
        }
    }
    @Override
    public void cancel()
    {
        running = false;
    }
}

Maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>FlinkDemoKafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Now, lets submit streaming job as mentioned in the previous step into Flink cluster.

Screenshot showing how to submit the streaming job from FLink UI.

Check the topic on Kafka cluster

Check the topic on Kafka.

root@hn0-contos:/home/sshuser# /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --topic click_events --bootstrap-server wn0-contos:9092
"1685939238525","Cary","./home"
"1685939240527","Bob","./fav"
"1685939242528","Cary","./prod?id=10"
"1685939244528","Mary","./prod?id=100"
"1685939246529","Alice","./fav"
"1685939248530","Mary","./cart"
"1685939250530","Mary","./prod?id=100"
"1685939252530","Alice","./prod?id=100"
"1685939254530","Alice","./prod?id=10"
"1685939256530","Cary","./prod?id=100"
"1685939258531","Mary","./prod?id=10"
"1685939260531","Cary","./home"
"1685939262531","Mary","./prod?id=10"
"1685939264531","Cary","./prod?id=100"
"1685939266532","Mary","./cart"
"1685939268532","Bob","./fav"
"1685939270532","Mary","./home"
"1685939272533","Cary","./fav"
"1685939274533","Alice","./cart"
"1685939276533","Bob","./prod?id=10"
"1685939278533","Mary","./cart"
"1685939280533","Alice","./fav"

Create flow on NiFi UI

Note

In this example, we use Azure User Managed Identity to credentials for ADLS Gen2.

In this demonstration, we use Apache NiFi instance installed on an Ubuntu VM. We're accessing the NiFi web interface from a Windows VM. The Ubuntu VM needs to have a managed identity assigned to it and network security group (NSG) rules configured.

To use Managed Identity authentication with the PutAzureDataLakeStorage processor in NiFi. You're required to ensure Ubuntu VM on which NiFi is installed has a managed identity assigned to it, or assign a managed identity to the Ubuntu VM.

Screenshot showing how to create a flow in Apache NiFi - Step 1.

Once you assign a managed identity to the Azure VM, you need to make sure that the VM can connect to the IMDS (Instance Metadata Service) endpoint. The IMDS endpoint is available at the IP address shown in this example. You need to update your network security group rules to allow outbound traffic from the Ubuntu VM to this IP address.

Screenshot showing how to create a flow in Apache NiFi-Step2.

Run the flow:

Screenshot showing how to create a flow in Apache NiFi-Step3.

Using Processor ConsumerKafka_2_0's properties setting:

Screenshot showing how to create a flow in Apache NiFi-Step4.

Screenshot showing how to create a flow in Apache NiFi-Step5.

Using Processor PutAzureDataLakeStorage properties setting:

Screenshot showing how to create a flow in Apache NiFi-Step6.

Using PutAzureDataLakeStorage credential setting:

Screenshot showing how to create a flow in Apache NiFi-Step7.

Lets check output in ADLS Gen2

Validating the output in ADLS Gen2.

Reference