Bagikan melalui


Tutorial: Use Apache Kafka streams API in Azure HDInsight

Learn how to create an application that uses the Apache Kafka Streams API and run it with Kafka on HDInsight.

The application used in this tutorial is a streaming word count. It reads text data from a Kafka topic, extracts individual words, and then stores the word and count into another Kafka topic.

Kafka stream processing is often done using Apache Spark. Kafka version 2.1.1 and 2.4.1 (in HDInsight 4.0 and 5.0) supports the Kafka Streams API. This API allows you to transform data streams between input and output topics.

For more information on Kafka Streams, see the Intro to Streams documentation on Apache.org.

Dalam tutorial ini, Anda mempelajari cara:

  • Memahami kode
  • Membuat dan menggunakan aplikasi
  • Configure Kafka topics
  • Menjalankan kode

Prasyarat

Memahami kode

Contoh aplikasi terletak di https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, di subdirektori Streaming . The application consists of two files:

  • pom.xml: File ini menentukan dependensi proyek, versi Java, dan metode pengemasan.
  • Stream.java: This file implements the streaming logic.

Pom.xml

The important things to understand in the pom.xml file are:

  • Dependencies: This project relies on the Kafka Streams API, which is provided by the kafka-clients package. Kode XML berikut mendefinisikan dependensi ini:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    The ${kafka.version} entry is declared in the <properties>..</properties> section of pom.xml, and is configured to the Kafka version of the HDInsight cluster.

  • Plugin: Plugin Maven menyediakan berbagai kemampuan. Dalam proyek ini, plugin berikut digunakan:

    • maven-compiler-plugin: Digunakan untuk mengatur versi Java yang digunakan oleh proyek ke 8. HDInsight 4.0 and 5.0 requires Java 8.
    • maven-shade-plugin: Used to generate an uber jar that contains this application, and any dependencies. It's also used to set the entry point of the application, so that you can directly run the Jar file without having to specify the main class.

Stream.java

The Stream.java file uses the Streams API to implement a word count application. It reads data from a Kafka topic named test and writes the word counts into a topic named wordcounts.

The following code defines the word count application:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Membangun dan menyebarkan contoh

To build and deploy the project to your Kafka on HDInsight cluster, use the following steps:

  1. Set your current directory to the location of the hdinsight-kafka-java-get-started-master\Streaming directory, and then use the following command to create a jar package:

    mvn clean package
    

    This command creates the package at target/kafka-streaming-1.0-SNAPSHOT.jar.

  2. Ganti sshuser dengan pengguna SSH untuk kluster Anda, dan ganti clustername dengan nama kluster Anda. Use the following command to copy the kafka-streaming-1.0-SNAPSHOT.jar file to your HDInsight cluster. Jika diminta, masukkan kata sandi untuk akun pengguna SSH.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Create Apache Kafka topics

  1. Ganti sshuser dengan pengguna SSH untuk kluster Anda, dan ganti CLUSTERNAME dengan nama kluster Anda. Buka koneksi SSH ke kluster, dengan memasukkan perintah berikut. Jika diminta, masukkan kata sandi untuk akun pengguna SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Install jq, a command-line JSON processor. From the open SSH connection, enter following command to install jq:

    sudo apt -y install jq
    
  3. Set up password variable. Replace PASSWORD with the cluster login password, then enter the command:

    export PASSWORD='PASSWORD'
    
  4. Extract correctly cased cluster name. The actual casing of the cluster name may be different than you expect, depending on how the cluster was created. This command obtains the actual casing, and then stores it in a variable. Masukkan perintah berikut:

    export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Nota

    If you're doing this process from outside the cluster, there is a different procedure for storing the cluster name. Get the cluster name in lower case from the Azure portal. Then, substitute the cluster name for <clustername> in the following command and execute it: export clusterName='<clustername>'.

  5. To get the Kafka broker hosts and the Apache Zookeeper hosts, use the following commands. When prompted, enter the password for the cluster login (admin) account.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Nota

    These commands require Ambari access. If your cluster is behind an NSG, run these commands from a machine that can access Ambari.

  6. To create the topics used by the streaming operation, use the following commands:

    Nota

    You may receive an error that the test topic already exists. This is OK, as it may have been created in the Producer and Consumer API tutorial.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    The topics are used for the following purposes:

    • test: This topic is where records are received. The streaming application reads from here.
    • wordcounts: This topic is where the streaming application stores its output.
    • RekeyedIntermediateTopic: This topic is used to repartition data as the count is updated by the countByKey operator.
    • wordcount-example-Counts-changelog: This topic is a state store used by the countByKey operation

    Kafka on HDInsight can also be configured to automatically create topics. For more information, see the Configure automatic topic creation document.

Menjalankan kode

  1. To start the streaming application as a background process, use the following command:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    You may get a warning about Apache log4j. You can ignore this warning.

  2. To send records to the test topic, use the following command to start the producer application:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. Once the producer completes, use the following command to view the information stored in the wordcounts topic:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    The --property parameters tell the console consumer to print the key (word) along with the count (value). This parameter also configures the deserializer to use when reading these values from Kafka.

    Outputnya mirip dengan teks berikut:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    The parameter --from-beginning configures the consumer to start at the beginning of the records stored in the topic. The count increments each time a word is encountered, so the topic contains multiple entries for each word, with an increasing count.

  4. Use the Ctrl + C to exit the producer. Continue using Ctrl + C to exit the application and the consumer.

  5. To delete the topics used by the streaming operation, use the following commands:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Membersihkan sumber daya

Untuk membersihkan sumber daya yang dibuat oleh tutorial ini, Anda dapat menghapus grup sumber daya. Menghapus grup sumber daya juga menghapus kluster HDInsight terkait, dan sumber daya lain yang terkait dengan grup sumber daya.

Menghapus grup sumber daya menggunakan portal Microsoft Azure:

  1. Di portal Microsoft Azure, luaskan menu di sebelah kiri untuk membuka menu layanan, lalu pilihGrup Sumber Daya untuk menampilkan daftar grup sumber daya Anda.
  2. Cari grup sumber daya yang ingin dihapus, lalu klik kanan tombol Lainnya (...) di sisi kanan daftar.
  3. Pilih Hapus grup sumber daya, lalu konfirmasi.

Langkah berikutnya

In this document, you learned how to use the Apache Kafka Streams API with Kafka on HDInsight. Use the following to learn more about working with Kafka.