Tutorial: Menggunakan Apache Kafka streams API - Azure HDInsight

Pelajari cara membuat aplikasi yang menggunakan Apache Kafka Streams API dan jalankan dengan Kafka di HDInsight.

Aplikasi yang digunakan dalam tutorial ini adalah penghitungan kata streaming. Aplikasi ini membaca data teks dari topik Kafka, mengekstrak kata-kata individual, dan kemudian menyimpan kata dan menghitung topik Kafka lainnya.

Pemrosesan aliran Kafka sering dilakukan menggunakan Apache Spark. Kafka versi 2.1.1 dan 2.4.1 (dalam HDInsight 4.0 dan 5.0) mendukung Kafka Aliran API. API ini memungkinkan Anda untuk mengubah stream data antara topik input dan output.

Untuk informasi selengkapnya tentang Kafka Streams, lihat dokumentasi Pengenalan Stream di Apache.org.

Dalam tutorial ini, Anda akan mempelajari cara:

  • Memahami kode
  • Membuat dan menggunakan aplikasi
  • Mengonfigurasi topik Kafka
  • Menjalankan kode

Prasyarat

Memahami kode

Contoh aplikasi terletak di https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, dalam subdirektori Streaming. Aplikasi ini terdiri dari dua file:

  • pom.xml: File ini mendefinisikan dependensi proyek, versi Java, dan metode pengemasan.
  • Stream.java: File ini mengimplementasikan logika streaming.

Pom.xml

Hal-hal penting yang perlu dipahami dalam file pom.xml adalah:

  • Dependensi: Proyek ini bergantung pada Kafka Streams API, yang disediakan oleh paket kafka-clients. Kode XML berikut menentukan dependensi ini:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    Entri ${kafka.version} dinyatakan di bagian <properties>..</properties> dari pom.xml dan dikonfigurasi ke versi Kafka dari kluster HDInsight.

  • Plugin: Plugin Maven memberikan berbagai kemampuan. Dalam proyek ini, plugin berikut digunakan:

    • maven-compiler-plugin: Digunakan untuk mengatur versi Java yang digunakan oleh proyek ke 8. HDInsight 4.0 dan 5.0 memerlukan Java 8.
    • maven-shade-plugin: Digunakan untuk menghasilkan uber jar yang berisi aplikasi ini, dan dependensi apa pun. Selain itu, digunakan untuk mengatur titik masuk aplikasi, sehingga Anda dapat langsung menjalankan file Jar tanpa harus menentukan kelas utama.

Stream.java

File Stream.java menggunakan Streams API untuk mengimplementasikan aplikasi hitungan kata. Hal ini membaca data dari topik Kafka yang dinamai test dan menulis penghitungan kata menjadi topik bernama wordcounts.

Kode berikut mendefinisikan aplikasi hitungan kata:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Membuat dan menyebarkan contoh

Untuk membangun dan menyebarkan proyek ke Kafka Anda di kluster HDInsight, gunakan langkah-langkah berikut:

  1. Atur direktori Anda saat ini ke lokasi direktori hdinsight-kafka-java-get-started-master\Streaming, lalu gunakan perintah berikut untuk membuat paket jar:

    mvn clean package
    

    Perintah ini membuat paket di target/kafka-streaming-1.0-SNAPSHOT.jar.

  2. Ganti sshuser dengan pengguna SSH untuk kluster, dan ganti clustername dengan nama kluster Anda. Masukkan perintah berikut untuk menyalin file kafka-streaming-1.0-SNAPSHOT.jar ke kluster HDInsight Anda. Jika diminta, masukkan kata sandi untuk akun pengguna SSH.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Membuat topik Apache Kafka

  1. Ganti sshuser dengan pengguna SSH untuk kluster, dan ganti CLUSTERNAME dengan nama kluster Anda. Buka koneksi SSH ke kluster, dengan memasukkan perintah berikut. Jika diminta, masukkan kata sandi untuk akun pengguna SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Instal jq, prosesor JSON baris perintah. Dari koneksi SSH yang terbuka, masukkan perintah berikut untuk memasang jq:

    sudo apt -y install jq
    
  3. Siapkan variabel sandi. Ganti PASSWORD dengan kata sandi masuk kluster, lalu masukkan perintah:

    export PASSWORD='PASSWORD'
    
  4. Ekstrak nama kluster yang di-cased dengan benar. Penulisan sebenarnya dari nama kluster mungkin berbeda dari yang Anda duga, tergantung pada cara kluster dibuat. Perintah ini mendapatkan casing aktual, lalu menyimpannya dalam variabel. Masukkan perintah berikut:

    export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Catatan

    Jika Anda melakukan proses ini dari luar kluster, ada prosedur yang berbeda untuk menyimpan nama kluster. Pastikan nama kluster dalam huruf kecil dari portal Microsoft Azure. Kemudian, ganti nama kluster menjadi <clustername> dalam perintah berikut dan jalankan: export clusterName='<clustername>'.

  5. Untuk mendapatkan host broker Kafka dan host Apache Zookeeper, gunakan perintah berikut. Saat diminta, masukkan kata sandi untuk akun admin kluster.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Catatan

    Perintah ini memerlukan akses Ambari. Jika kluster Anda berada di belakang NSG, jalankan perintah ini dari komputer yang dapat mengakses Ambari.

  6. Untuk membuat topik yang digunakan oleh operasi streaming, gunakan perintah berikut:

    Catatan

    Anda mungkin melihat kesalahan bahwa topik test tersebut sudah ada. Tidak apa-apa, karena topik tersebut mungkin telah dibuat dalam tutorial Producer dan Consumer API.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    Topik digunakan untuk tujuan berikut:

    • test: Topik ini adalah tempat catatan diterima. Aplikasi streaming membaca dari sini.
    • wordcounts: Topik ini adalah tempat aplikasi streaming menyimpan output-nya.
    • RekeyedIntermediateTopic: Topik ini digunakan untuk mempartisi ulang data karena hitungan diperbarui oleh operator countByKey.
    • wordcount-example-Counts-changelog: Topik ini adalah status simpan yang digunakan oleh operasi countByKey

    Kafka di HDInsight juga dapat dikonfigurasi untuk membuat topik secara otomatis. Untuk informasi selengkapnya, lihat dokumen Mengonfigurasi pembuatan topik otomatis.

Menjalankan kode

  1. Untuk memulai aplikasi streaming sebagai proses latar belakang, gunakan perintah berikut:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    Anda mungkin mendapatkan peringatan tentang Apache log4j. Anda bisa mengabaikan peringatan ini.

  2. Untuk mengirim rekaman ke topik test ini, gunakan perintah berikut untuk memulai aplikasi produser:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. Setelah produser selesai, gunakan perintah berikut untuk melihat informasi yang disimpan dalam topik wordcounts:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    Parameter --property memberi tahu konsumen konsol untuk mencetak kunci (kata) bersama dengan hitungan (nilai). Parameter ini juga mengonfigurasi deserializer untuk digunakan saat membaca nilai-nilai ini dari Kafka.

    Outputnya mirip dengan teks berikut:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    Parameter --from-beginning mengonfigurasi konsumen untuk memulai di awal catatan yang disimpan dalam topik. Jumlah bertambah setiap kali satu kata ditemukan, jadi topik berisi beberapa entri untuk setiap kata, dengan jumlah yang bertambah.

  4. Gunakan Ctrl + C untuk keluar dari produser. Lanjutkan menggunakan Ctrl + C untuk keluar dari aplikasi dan konsumen.

  5. Untuk menghapus topik yang digunakan oleh operasi streaming, gunakan perintah berikut:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Membersihkan sumber daya

Untuk membersihkan sumber daya yang dibuat oleh tutorial ini, Anda dapat menghapus grup sumber daya terkait. Menghapus grup sumber daya juga menghapus kluster HDInsight terkait, dan sumber daya lain yang terkait dengan grup sumber daya.

Menghapus grup sumber daya menggunakan portal Microsoft Azure:

  1. Di portal Microsoft Azure, luaskan menu di sebelah kiri untuk membuka menu layanan, lalu pilihGrup Sumber Daya untuk menampilkan daftar grup sumber daya Anda.
  2. Cari grup sumber daya yang ingin dihapus, lalu klik kanan tombol Lainnya (...) di sisi kanan daftar.
  3. Pilih Hapus grup sumber daya, lalu konfirmasi.

Langkah berikutnya

Dalam dokumen ini, Anda belajar cara menggunakan Apache Kafka Streams API dengan Kafka di HDInsight. Gunakan hal-hal berikut ini untuk mempelajari selengkapnya tentang bekerja dengan Kafka.