Tutorial: Gunakan Streaming Terstruktur Apache Spark dengan Apache Kafka di HDInsight
Tutorial ini menunjukkan cara menggunakan Apache Spark Structured Streaming untuk membaca dan menulis data dengan Apache Kafka di Azure HDInsight.
Spark Structured Streaming adalah mesin pemroses aliran yang dibangun di Spark SQL. Ini memungkinkan Anda untuk menyatakan komputasi streaming dengan cara yang sama seperti komputasi batch pada data statis.
Dalam tutorial ini, Anda akan mempelajari cara:
- Menggunakan templat Azure Resource Manager untuk membuat kluster
- Gunakan Spark Structured Streaming dengan Kafka
Bila Anda sudah menyelesaikan langkah-langkah dalam dokumen ini, jangan lupa hapus kluster untuk menghindari biaya berlebih.
Prasyarat
jq, prosesor JSON baris perintah. Lihat https://stedolan.github.io/jq/.
Terbiasa menggunakan Buku Catatan Jupyter dengan Spark di HDInsight. Untuk informasi selengkapnya, lihat dokumen Memuat data dan menjalankan kueri dengan Apache Spark di HDInsight.
Memahami bahasa pemrograman Scala. Kode yang digunakan dalam tutorial ini ditulis dalam Scala.
Terbiasa membuat topik Kafka. Untuk informasi selengkapnya, lihat dokumen Mulai cepat Apache Kafka di HDInsight.
Penting
Langkah-langkah dalam dokumen ini memerlukan grup sumber daya Azure yang memuat Spark di HDInsight dan Kafka di kluster HDInsight. Kedua kluster ini terletak di dalam Azure Virtual Network, yang memungkinkan kluster Spark untuk langsung berkomunikasi dengan kluster Kafka.
Demi kenyamanan Anda, dokumen ini terhubung ke templat yang dapat membuat semua sumber daya Azure yang diperlukan.
Untuk informasi selengkapnya tentang penggunaan HDInsight di jaringan virtual, lihat dokumen Rencanakan jaringan virtual untuk HDInsight.
Structured Streaming dengan Apache Kafka
Spark Structured Streaming adalah mesin pemrosesan streaming yang dibuat di mesin Spark SQL. Ketika menggunakan Structured Streaming, Anda dapat menulis kueri streaming dengan cara yang sama saat Anda menulis kueri batch.
Cuplikan kode berikut menunjukkan pembacaan dari Kafka dan penyimpanan ke file. Yang pertama adalah operasi batch, sedangkan yang kedua adalah operasi streaming:
// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.load()
// Select data and write to file
kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip")
.write
.format("parquet")
.option("path","/example/batchtripdata")
.option("checkpointLocation", "/batchcheckpoint")
.save()
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.load()
// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip")
.writeStream
.format("parquet")
.option("path","/example/streamingtripdata")
.option("checkpointLocation", "/streamcheckpoint")
.start.awaitTermination(30000)
Pada kedua cuplikan, data dibaca dari Kafka dan ditulis ke file. Perbedaan dari contoh-contoh tersebut adalah:
Batch | Streaming |
---|---|
read |
readStream |
write |
writeStream |
save |
start |
Operasi streaming juga menggunakan awaitTermination(30000)
, yang menghentikan streaming setelah 30.000 mdtk.
Untuk menggunakan Structured Streaming dengan Kafka, proyek Anda harus memiliki dependensi paket org.apache.spark : spark-sql-kafka-0-10_2.11
. Versi paket ini harus cocok dengan versi Spark di HDInsight. Untuk Spark 2.4 (tersedia di HDInsight 4.0), Anda dapat menemukan informasi dependensi untuk berbagai jenis proyek di https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar.
Untuk Buku Catatan Jupyter yang digunakan dengan tutorial ini, sel berikut memuat dependensi paket ini:
%%configure -f
{
"conf": {
"spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
"spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
}
}
Membuat kluster
Apache Kafka di HDInsight tidak menyediakan akses ke broker Kafka melalui internet publik. Apa pun yang menggunakan Kafka harus ada di dalam jaringan virtual Azure yang sama. Dalam tutorial ini, kluster Kafka dan Spark ada di dalam jaringan virtual Azure yang sama.
Diagram berikut menunjukkan bagaimana komunikasi mengalir antara Spark dan Kafka:
Catatan
Layanan Kafka terbatas pada komunikasi dalam jaringan virtual. Layanan lain pada kluster, seperti SSH dan Ambari, dapat diakses melalui internet. Untuk informasi selengkapnya tentang port publik yang tersedia lewat HDInsight, lihat Port dan URI yang digunakan oleh HDInsight.
Untuk membuat Azure Virtual Network, lalu mmebuat kluster Kafka dan Spark di dalamnya, gunakan langkah-langkah berikut:
Gunakan tombol berikut untuk masuk ke Azure dan buka templat di portal Microsoft Azure.
Templat Azure Resource Manager ada di https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json.
Templat ini membuat sumber daya berikut:
Kafka pada kluster HDInsight 4.0 atau 5.0.
Spark 2.4 atau 3.1 pada kluster HDInsight 4.0 atau 5.0.
Azure Virtual Network yang memuat kluster HDInsight.
Penting
Notebook streaming terstruktur yang digunakan dalam tutorial ini memerlukan Spark 2.4 atau 3.1 pada HDInsight 4.0 atau 5.0. Jika Anda menggunakan versi Spark yang lebih lama pada Microsoft Azure HDInsight, Anda menerima kesalahan saat menggunakan notebook tersebut.
Gunakan informasi berikut ini untuk mengisi entri di bagian Templat Kustomisasi:
Pengaturan Nilai Langganan Langganan Azure Anda Grup sumber daya Grup sumber daya yang berisi sumber daya. Lokasi Wilayah Azure tempat sumber daya dibuat. Nama Kluster Spark Nama kluster Spark tersebut. Enam karakter pertama harus berbeda dari nama kluster Kafka. Nama Kluster Kafka Nama kluster Kafka tersebut. Enam karakter pertama harus berbeda dari nama kluster Spark. Nama Pengguna Masuk Kluster Nama pengguna admin untuk kluster. Kata Sandi Masuk Kluster Kata sandi pengguna admin untuk kluster. Nama Pengguna SSH Pengguna SSH untuk membuat kluster. Kata Sandi SSH Kata sandi untuk pengguna SSH. Baca Syarat dan Ketentuan, lalu pilih Saya menyetujui syarat dan ketentuan yang tercantum di atas.
Pilih Beli.
Catatan
Pembuatan kluster dapat memakan waktu hingga 20 menit.
Gunakan Spark Structured Streaming
Contoh ini menunjukkan cara penggunaan Spark Structured Streaming dengan Kafka di HDInsight. Ia menggunakan data perjalanan taksi, yang disediakan oleh New York City. Himpunan data yang digunakan oleh buku catatan ini diambil dari 2016 Green Taxi Trip Data.
Kumpulkan informasi host. Gunakan perintah curl dan jq di bawah ini untuk mendapatkan informasi Kafka ZooKeeper dan host broker Anda. Perintah tersebut dirancang untuk prompt perintah Windows, sedikit variasi akan diperlukan untuk lingkungan lain. Ganti
KafkaCluster
dengan nama kluster Kafka Anda, danKafkaPassword
dengan kata sandi masuk kluster. Ganti jugaC:\HDI\jq-win64.exe
dengan jalur aktual ke instalasi jq Anda. Masukkan perintah di prompt perintah Windows dan simpan output untuk digunakan di langkah-langkah berikutnya.REM Enter cluster name in lowercase set CLUSTERNAME=KafkaCluster set PASSWORD=KafkaPassword curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):2181"""] | join(""",""")" curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/KAFKA/components/KAFKA_BROKER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):9092"""] | join(""",""")"
Dari browser web, arahkan ke
https://CLUSTERNAME.azurehdinsight.net/jupyter
, di manaCLUSTERNAME
merupakan nama kluster Anda. Ketika diminta, masukkan info masuk kluster (admin) dan kata sandi yang Anda gunakan saat membuat kluster.Pilih Baru > Spark untuk membuat notebook.
Spark streaming memiliki batch mikro, yang berarti data datang sebagai batch dan pelaksana berjalan pada batch data. Jika pelaksana memiliki batas waktu siaga kurang dari waktu yang diperlukan untuk memproses batch, maka pelaksana akan terus ditambah dan dihapus. Jika batas waktu siaga pelaksana lebih besar dari durasi batch, pelaksana tidak pernah dihapus. Jadi sebaiknya Anda menonaktifkan alokasi dinamis dengan mengubah pengaturan spark.dynamicAllocation.enabled ke false ketika menjalankan aplikasi streaming.
Muat paket yang digunakan oleh Buku Catatan Jupyter dengan memasukkan informasi berikut ke dalam sel Buku Catatan. Jalankan perintah dengan menggunakan CTRL + ENTER.
%%configure -f { "conf": { "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0", "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11", "spark.dynamicAllocation.enabled": false } }
Buat topik Kafka. Edit perintah di bawah ini dengan mengganti
YOUR_ZOOKEEPER_HOSTS
dengan informasi host Zookeeper yang telah diekstrak pada langkah pertama. Masukkan perintah yang sudah diedit ke Buku Catatan Jupyter Anda untuk membuat topiktripdata
.%%bash export KafkaZookeepers="YOUR_ZOOKEEPER_HOSTS" /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic tripdata --zookeeper $KafkaZookeepers
Ambil data perjalanan taksi. Masukkan perintah di sel berikutnya untuk memuat data perjalanan taksi di New York City. Data dimuat ke dalam kerangka data lalu kerangka data tersebut ditampilkan sebagai output sel.
import spark.implicits._ // Load the data from the New York City Taxi data REST API for 2016 Green Taxi Trip Data val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json" val result = scala.io.Source.fromURL(url).mkString // Create a dataframe from the JSON data val taxiDF = spark.read.json(Seq(result).toDS) // Display the dataframe containing trip data taxiDF.show()
Atur informasi host broker Kafka. Ganti
YOUR_KAFKA_BROKER_HOSTS
dengan informasi host broker yang Anda ekstrak di langkah 1. Masukkan perintah yang sudah diedit ke sel Buku Catatan Jupyter berikutnya.// The Kafka broker hosts and topic used to write to Kafka val kafkaBrokers="YOUR_KAFKA_BROKER_HOSTS" val kafkaTopic="tripdata" println("Finished setting Kafka broker and topic configuration.")
Kirim data ke Kafka. Dalam perintah berikut, bidang
vendorid
digunakan sebagai nilai kunci untuk pesan Kafka. Kunci tersebut digunakan oleh Kafka ketika mempartisi data. Semua bidang disimpan dalam pesan Kafka sebagai nilai untai JSON. Masukkan perintah berikut di Jupyter untuk menyimpan data ke Kafka menggunakan kueri batch.// Select the vendorid as the key and save the JSON string as the value. val query = taxiDF.selectExpr("CAST(vendorid AS STRING) as key", "to_JSON(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("topic", kafkaTopic).save() println("Data sent to Kafka")
Menyatakan skema. Perintah berikut menunjukkan cara menggunakan skema saat membaca data JSON dari kafka. Masukkan perintah di sel Jupyter berikutnya.
// Import bits useed for declaring schemas and working with JSON data import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ // Define a schema for the data val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType) // Reproduced here for readability //val schema = (new StructType) // .add("dropoff_latitude", StringType) // .add("dropoff_longitude", StringType) // .add("extra", StringType) // .add("fare_amount", StringType) // .add("improvement_surcharge", StringType) // .add("lpep_dropoff_datetime", StringType) // .add("lpep_pickup_datetime", StringType) // .add("mta_tax", StringType) // .add("passenger_count", StringType) // .add("payment_type", StringType) // .add("pickup_latitude", StringType) // .add("pickup_longitude", StringType) // .add("ratecodeid", StringType) // .add("store_and_fwd_flag", StringType) // .add("tip_amount", StringType) // .add("tolls_amount", StringType) // .add("total_amount", StringType) // .add("trip_distance", StringType) // .add("trip_type", StringType) // .add("vendorid", StringType) println("Schema declared")
Pilih data dan mulai streaming. Perintah berikut menunjukkan cara mengambil data dari Kafka menggunakan kueri batch. Dan kemudian menulis hasilnya ke HDFS di kluster Spark. Dalam contoh ini,
select
mengambil pesan (bidang nilai) dari Kafka dan menerapkan skemanya ke dalamnya. Data kemudian ditulis ke HDFS (WASB atau ADL) dalam format parket. Masukkan perintah di sel Jupyter berikutnya.// Read a batch from Kafka val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load() // Select data and write to file val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/example/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save() println("Wrote data to file")
Anda dapat memverifikasi bahwa file dibuat dengan memasukkan perintah di sel Jupyter berikutnya. Ia mencantumkan file-file dalam direktori
/example/batchtripdata
.%%bash hdfs dfs -ls /example/batchtripdata
Sementara contoh sebelumnya menggunakan kueri batch, perintah berikut ini menunjukkan cara melakukan hal yang sama menggunakan kueri streaming. Masukkan perintah di sel Jupyter berikutnya.
// Stream from Kafka val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load() // Select data from the stream and write to file kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/example/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000) println("Wrote data to file")
Jalankan sel berikut untuk memverifikasi bahwa file ditulis oleh kueri streaming.
%%bash hdfs dfs -ls /example/streamingtripdata
Membersihkan sumber daya
Untuk membersihkan sumber daya yang dibuat oleh tutorial ini, Anda dapat menghapus grup sumber daya terkait. Menghapus grup sumber daya juga akan menghapus kluster HDInsight terkait. Dan sumber daya lain yang berkaitan dengan grup sumber daya.
Menghapus grup sumber daya menggunakan portal Microsoft Azure:
- Di portal Microsoft Azure, perluas menu di sisi kiri untuk membuka menu layanan, dan pilih Grup Sumber Daya untuk menampilkan daftar grup sumber daya Anda.
- Cari grup sumber daya yang ingin dihapus, lalu klik kanan tombol Lainnya (...) di sisi kanan daftar.
- Pilih Hapus grup sumber daya, lalu konfirmasi.
Peringatan
Tagihan klaster HDInsight mulai dihitung setelah klaster dibuat dan akan berhenti saat klaster dihapus. Penagihan dihitung pro-rata per menit, sehingga Anda harus selalu menghapus kluster jika tidak digunakan lagi.
Menghapus Kafka di kluster HDInsight dapat menghapus data apa pun yang disimpan di Kafka.