Tutorial: Menggunakan Apache Kafka streams API - Azure HDInsight
Pelajari cara membuat aplikasi yang menggunakan Apache Kafka Streams API dan jalankan dengan Kafka di HDInsight.
Aplikasi yang digunakan dalam tutorial ini adalah penghitungan kata streaming. Aplikasi ini membaca data teks dari topik Kafka, mengekstrak kata-kata individual, dan kemudian menyimpan kata dan menghitung topik Kafka lainnya.
Pemrosesan aliran Kafka sering dilakukan menggunakan Apache Spark. Kafka versi 2.1.1 dan 2.4.1 (dalam HDInsight 4.0 dan 5.0) mendukung Kafka Aliran API. API ini memungkinkan Anda untuk mengubah stream data antara topik input dan output.
Untuk informasi selengkapnya tentang Kafka Streams, lihat dokumentasi Pengenalan Stream di Apache.org.
Dalam tutorial ini, Anda akan mempelajari cara:
- Memahami kode
- Membuat dan menggunakan aplikasi
- Mengonfigurasi topik Kafka
- Menjalankan kode
Prasyarat
Kafka pada kluster HDInsight 4.0 atau 5.0. Untuk mempelajari cara membuat Kafka di kluster HDInsight, lihat dokumen Mulai dengan Apache Kafka di HDInsight.
Selesaikan langkah-langkah dalam dokumen Apache Kafka Consumer and Producer API. Langkah-langkah dalam dokumen ini menggunakan contoh aplikasi dan topik yang dibuat dalam tutorial ini.
Java Developer Kit (JDK) versi 8 atau yang setara, seperti OpenJDK.
Apache Mavendipasang dengan benar menurut Apache. Maven merupakan sistem pembangunan proyek untuk proyek Java.
Klien SSH. Untuk informasi selengkapnya, lihat Menyambungkan ke HDInsight (Apache Hadoop) menggunakan SSH.
Memahami kode
Contoh aplikasi terletak di https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, dalam subdirektori Streaming
. Aplikasi ini terdiri dari dua file:
pom.xml
: File ini mendefinisikan dependensi proyek, versi Java, dan metode pengemasan.Stream.java
: File ini mengimplementasikan logika streaming.
Pom.xml
Hal-hal penting yang perlu dipahami dalam file pom.xml
adalah:
Dependensi: Proyek ini bergantung pada Kafka Streams API, yang disediakan oleh paket
kafka-clients
. Kode XML berikut menentukan dependensi ini:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
Entri
${kafka.version}
dinyatakan di bagian<properties>..</properties>
daripom.xml
dan dikonfigurasi ke versi Kafka dari kluster HDInsight.Plugin: Plugin Maven memberikan berbagai kemampuan. Dalam proyek ini, plugin berikut digunakan:
maven-compiler-plugin
: Digunakan untuk mengatur versi Java yang digunakan oleh proyek ke 8. HDInsight 4.0 dan 5.0 memerlukan Java 8.maven-shade-plugin
: Digunakan untuk menghasilkan uber jar yang berisi aplikasi ini, dan dependensi apa pun. Selain itu, digunakan untuk mengatur titik masuk aplikasi, sehingga Anda dapat langsung menjalankan file Jar tanpa harus menentukan kelas utama.
Stream.java
File Stream.java menggunakan Streams API untuk mengimplementasikan aplikasi hitungan kata. Hal ini membaca data dari topik Kafka yang dinamai test
dan menulis penghitungan kata menjadi topik bernama wordcounts
.
Kode berikut mendefinisikan aplikasi hitungan kata:
package com.microsoft.example;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
public class Stream
{
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Membuat dan menyebarkan contoh
Untuk membangun dan menyebarkan proyek ke Kafka Anda di kluster HDInsight, gunakan langkah-langkah berikut:
Atur direktori Anda saat ini ke lokasi direktori
hdinsight-kafka-java-get-started-master\Streaming
, lalu gunakan perintah berikut untuk membuat paket jar:mvn clean package
Perintah ini membuat paket di
target/kafka-streaming-1.0-SNAPSHOT.jar
.Ganti
sshuser
dengan pengguna SSH untuk kluster, dan ganticlustername
dengan nama kluster Anda. Masukkan perintah berikut untuk menyalin filekafka-streaming-1.0-SNAPSHOT.jar
ke kluster HDInsight Anda. Jika diminta, masukkan kata sandi untuk akun pengguna SSH.scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
Membuat topik Apache Kafka
Ganti
sshuser
dengan pengguna SSH untuk kluster, dan gantiCLUSTERNAME
dengan nama kluster Anda. Buka koneksi SSH ke kluster, dengan memasukkan perintah berikut. Jika diminta, masukkan kata sandi untuk akun pengguna SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Instal jq, prosesor JSON baris perintah. Dari koneksi SSH yang terbuka, masukkan perintah berikut untuk memasang
jq
:sudo apt -y install jq
Siapkan variabel sandi. Ganti
PASSWORD
dengan kata sandi masuk kluster, lalu masukkan perintah:export PASSWORD='PASSWORD'
Ekstrak nama kluster yang di-cased dengan benar. Penulisan sebenarnya dari nama kluster mungkin berbeda dari yang Anda duga, tergantung pada cara kluster dibuat. Perintah ini mendapatkan casing aktual, lalu menyimpannya dalam variabel. Masukkan perintah berikut:
export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Catatan
Jika Anda melakukan proses ini dari luar kluster, ada prosedur yang berbeda untuk menyimpan nama kluster. Pastikan nama kluster dalam huruf kecil dari portal Microsoft Azure. Kemudian, ganti nama kluster menjadi
<clustername>
dalam perintah berikut dan jalankan:export clusterName='<clustername>'
.Untuk mendapatkan host broker Kafka dan host Apache Zookeeper, gunakan perintah berikut. Saat diminta, masukkan kata sandi untuk akun admin kluster.
export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Catatan
Perintah ini memerlukan akses Ambari. Jika kluster Anda berada di belakang NSG, jalankan perintah ini dari komputer yang dapat mengakses Ambari.
Untuk membuat topik yang digunakan oleh operasi streaming, gunakan perintah berikut:
Catatan
Anda mungkin melihat kesalahan bahwa topik
test
tersebut sudah ada. Tidak apa-apa, karena topik tersebut mungkin telah dibuat dalam tutorial Producer dan Consumer API./usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Topik digunakan untuk tujuan berikut:
test
: Topik ini adalah tempat catatan diterima. Aplikasi streaming membaca dari sini.wordcounts
: Topik ini adalah tempat aplikasi streaming menyimpan output-nya.RekeyedIntermediateTopic
: Topik ini digunakan untuk mempartisi ulang data karena hitungan diperbarui oleh operatorcountByKey
.wordcount-example-Counts-changelog
: Topik ini adalah status simpan yang digunakan oleh operasicountByKey
Kafka di HDInsight juga dapat dikonfigurasi untuk membuat topik secara otomatis. Untuk informasi selengkapnya, lihat dokumen Mengonfigurasi pembuatan topik otomatis.
Menjalankan kode
Untuk memulai aplikasi streaming sebagai proses latar belakang, gunakan perintah berikut:
java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
Anda mungkin mendapatkan peringatan tentang Apache
log4j
. Anda bisa mengabaikan peringatan ini.Untuk mengirim rekaman ke topik
test
ini, gunakan perintah berikut untuk memulai aplikasi produser:java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
Setelah produser selesai, gunakan perintah berikut untuk melihat informasi yang disimpan dalam topik
wordcounts
:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
Parameter
--property
memberi tahu konsumen konsol untuk mencetak kunci (kata) bersama dengan hitungan (nilai). Parameter ini juga mengonfigurasi deserializer untuk digunakan saat membaca nilai-nilai ini dari Kafka.Outputnya mirip dengan teks berikut:
dwarfs 13635 ago 13664 snow 13636 dwarfs 13636 ago 13665 a 13803 ago 13666 a 13804 ago 13667 ago 13668 jumped 13640 jumped 13641
Parameter
--from-beginning
mengonfigurasi konsumen untuk memulai di awal catatan yang disimpan dalam topik. Jumlah bertambah setiap kali satu kata ditemukan, jadi topik berisi beberapa entri untuk setiap kata, dengan jumlah yang bertambah.Gunakan Ctrl + C untuk keluar dari produser. Lanjutkan menggunakan Ctrl + C untuk keluar dari aplikasi dan konsumen.
Untuk menghapus topik yang digunakan oleh operasi streaming, gunakan perintah berikut:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Membersihkan sumber daya
Untuk membersihkan sumber daya yang dibuat oleh tutorial ini, Anda dapat menghapus grup sumber daya terkait. Menghapus grup sumber daya juga menghapus kluster HDInsight terkait, dan sumber daya lain yang terkait dengan grup sumber daya.
Menghapus grup sumber daya menggunakan portal Microsoft Azure:
- Di portal Microsoft Azure, luaskan menu di sebelah kiri untuk membuka menu layanan, lalu pilihGrup Sumber Daya untuk menampilkan daftar grup sumber daya Anda.
- Cari grup sumber daya yang ingin dihapus, lalu klik kanan tombol Lainnya (...) di sisi kanan daftar.
- Pilih Hapus grup sumber daya, lalu konfirmasi.
Langkah berikutnya
Dalam dokumen ini, Anda belajar cara menggunakan Apache Kafka Streams API dengan Kafka di HDInsight. Gunakan hal-hal berikut ini untuk mempelajari selengkapnya tentang bekerja dengan Kafka.