Sambungkan aplikasi Apache Spark Anda dengan Azure Event Hubs

Tutorial ini memandu Anda menyambungkan aplikasi Spark ke Azure Event Hubs untuk streaming real time. Integrasi ini memungkinkan streaming tanpa harus mengubah klien protokol Anda, atau menjalankan klaster Kafka atau Zookeeper Anda sendiri. Tutorial ini membutuhkan Apache Spark v2.4+ dan Apache Kafka v2.0+.

Catatan

Sampel ini tersedia di GitHub

Dalam tutorial ini, Anda akan mempelajari cara:

  • Membuat namespace layanan Azure Event Hubs
  • Mengkloning proyek contoh
  • Jalankan Spark
  • Baca dari Event Hubs for Kafka
  • Tuliskan ke Event Hubs for Kafka

Prasyarat

Sebelum memulai tutorial ini, pastikan bahwa Anda memiliki:

Catatan

Adapter Spark-Kafka diperbarui untuk mendukung Kafka v2.0 pada Spark v2.4. Pada rilis Spark sebelumnya, adapter mendukung Kafka v0.10 dan versi yang lebih baru tetapi mengandalkan secara spesifik pada API Kafka v0.10. Karena Event Hubs for Kafka tidak mendukung Kafka v0.10, adapter Spark-Kafka dari versi Spark sebelum v2.4 tidak didukung oleh Event Hubs for Kafka Ecosystems.

Membuat namespace layanan Azure Event Hubs

Namespace layanan Azure Event Hubs diperlukan untuk mengirim dan menerima dari layanan Azure Event Hubs apa pun. Lihat Membuat pusat aktivitas untuk petunjuk membuat namespace layanan dan pusat aktivitas. Dapatkan string koneksi Azure Event Hubs dan nama domain yang sepenuhnya memenuhi syarat (FQDN) untuk digunakan nanti. Untuk petunjuk, lihat Mendapatkan string koneksi Azure Event Hubs.

Mengkloning proyek contoh

Lakukan kloning repositori Azure Event Hubs dan arahkan ke subfolder tutorials/spark:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Baca dari Event Hubs for Kafka

Dengan beberapa perubahan konfigurasi, Anda dapat mulai membaca dari Azure Event Hubs for Kafka. Perbarui BOOTSTRAP_SERVERSdan EH_SASL dengan detail dari namespace layanan Anda dan Anda dapat memulai streaming dengan Azure Event Hubs seperti yang Anda lakukan dengan Kafka. Untuk contoh kode lengkap, lihat file sparkConsumer.scala di GitHub.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Jika Anda menerima kesalahan yang mirip dengan kesalahan berikut, tambahkan .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") ke spark.readStream panggilan dan coba lagi.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Tuliskan ke Event Hubs for Kafka

Anda juga dapat menulis ke Azure Event Hubs dengan cara yang sama seperti Anda menulis ke Kafka. Jangan lupa untuk memperbarui konfigurasi Anda untuk mengubah BOOTSTRAP_SERVERS dan EH_SASL dengan informasi dari namespace layanan Azure Event Hubs Anda. Untuk contoh kode lengkap, lihat file sparkProducer.scala di GitHub.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Langkah berikutnya

Untuk mempelajari selengkapnya tentang Azure Event Hubs dan Event Hubs for Kafka, lihat artikel berikut ini: