Bagikan melalui


Tutorial: Gunakan Streaming Terstruktur Apache Spark dengan Apache Kafka di HDInsight

Tutorial ini menunjukkan cara menggunakan Apache Spark Structured Streaming untuk membaca dan menulis data dengan Apache Kafka di Azure HDInsight.

Spark Structured Streaming adalah mesin pemroses aliran yang dibangun di Spark SQL. Ini memungkinkan Anda untuk menyatakan komputasi streaming dengan cara yang sama seperti komputasi batch pada data statis.

Dalam tutorial ini, Anda akan mempelajari cara:

  • Menggunakan templat Azure Resource Manager untuk membuat kluster
  • Gunakan Spark Structured Streaming dengan Kafka

Bila Anda sudah menyelesaikan langkah-langkah dalam dokumen ini, jangan lupa hapus kluster untuk menghindari biaya berlebih.

Prasyarat

Penting

Langkah-langkah dalam dokumen ini memerlukan grup sumber daya Azure yang memuat Spark di HDInsight dan Kafka di kluster HDInsight. Kedua kluster ini terletak di dalam Azure Virtual Network, yang memungkinkan kluster Spark untuk langsung berkomunikasi dengan kluster Kafka.

Demi kenyamanan Anda, dokumen ini terhubung ke templat yang dapat membuat semua sumber daya Azure yang diperlukan.

Untuk informasi selengkapnya tentang penggunaan HDInsight di jaringan virtual, lihat dokumen Rencanakan jaringan virtual untuk HDInsight.

Structured Streaming dengan Apache Kafka

Spark Structured Streaming adalah mesin pemrosesan streaming yang dibuat di mesin Spark SQL. Ketika menggunakan Structured Streaming, Anda dapat menulis kueri streaming dengan cara yang sama saat Anda menulis kueri batch.

Cuplikan kode berikut menunjukkan pembacaan dari Kafka dan penyimpanan ke file. Yang pertama adalah operasi batch, sedangkan yang kedua adalah operasi streaming:

// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()

// Select data and write to file
kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .write
                .format("parquet")
                .option("path","/example/batchtripdata")
                .option("checkpointLocation", "/batchcheckpoint")
                .save()
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()

// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .writeStream
                .format("parquet")
                .option("path","/example/streamingtripdata")
                .option("checkpointLocation", "/streamcheckpoint")
                .start.awaitTermination(30000)

Pada kedua cuplikan, data dibaca dari Kafka dan ditulis ke file. Perbedaan dari contoh-contoh tersebut adalah:

Batch Streaming
read readStream
write writeStream
save start

Operasi streaming juga menggunakan awaitTermination(30000), yang menghentikan streaming setelah 30.000 mdtk.

Untuk menggunakan Structured Streaming dengan Kafka, proyek Anda harus memiliki dependensi paket org.apache.spark : spark-sql-kafka-0-10_2.11. Versi paket ini harus cocok dengan versi Spark di HDInsight. Untuk Spark 2.4 (tersedia di HDInsight 4.0), Anda dapat menemukan informasi dependensi untuk berbagai jenis proyek di https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar.

Untuk Buku Catatan Jupyter yang digunakan dengan tutorial ini, sel berikut memuat dependensi paket ini:

%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
    }
}

Membuat kluster

Apache Kafka di HDInsight tidak menyediakan akses ke broker Kafka melalui internet publik. Apa pun yang menggunakan Kafka harus ada di dalam jaringan virtual Azure yang sama. Dalam tutorial ini, kluster Kafka dan Spark ada di dalam jaringan virtual Azure yang sama.

Diagram berikut menunjukkan bagaimana komunikasi mengalir antara Spark dan Kafka:

Diagram of Spark and Kafka clusters in an Azure virtual network.

Catatan

Layanan Kafka terbatas pada komunikasi dalam jaringan virtual. Layanan lain pada kluster, seperti SSH dan Ambari, dapat diakses melalui internet. Untuk informasi selengkapnya tentang port publik yang tersedia lewat HDInsight, lihat Port dan URI yang digunakan oleh HDInsight.

Untuk membuat Azure Virtual Network, lalu mmebuat kluster Kafka dan Spark di dalamnya, gunakan langkah-langkah berikut:

  1. Gunakan tombol berikut untuk masuk ke Azure dan buka templat di portal Microsoft Azure.

    Deploy to Azure button for new cluster

    Templat Azure Resource Manager ada di https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json.

    Templat ini membuat sumber daya berikut:

    • Kafka pada kluster HDInsight 4.0 atau 5.0.

    • Spark 2.4 atau 3.1 pada kluster HDInsight 4.0 atau 5.0.

    • Azure Virtual Network yang memuat kluster HDInsight.

      Penting

      Notebook streaming terstruktur yang digunakan dalam tutorial ini memerlukan Spark 2.4 atau 3.1 pada HDInsight 4.0 atau 5.0. Jika Anda menggunakan versi Spark yang lebih lama pada Microsoft Azure HDInsight, Anda menerima kesalahan saat menggunakan notebook tersebut.

  2. Gunakan informasi berikut ini untuk mengisi entri di bagian Templat Kustomisasi:

    Pengaturan Nilai
    Langganan Langganan Azure Anda
    Grup sumber daya Grup sumber daya yang berisi sumber daya.
    Lokasi Wilayah Azure tempat sumber daya dibuat.
    Nama Kluster Spark Nama kluster Spark tersebut. Enam karakter pertama harus berbeda dari nama kluster Kafka.
    Nama Kluster Kafka Nama kluster Kafka tersebut. Enam karakter pertama harus berbeda dari nama kluster Spark.
    Nama Pengguna Masuk Kluster Nama pengguna admin untuk kluster.
    Kata Sandi Masuk Kluster Kata sandi pengguna admin untuk kluster.
    Nama Pengguna SSH Pengguna SSH untuk membuat kluster.
    Kata Sandi SSH Kata sandi untuk pengguna SSH.

    Screenshot of the customized template.

  3. Baca Syarat dan Ketentuan, lalu pilih Saya menyetujui syarat dan ketentuan yang tercantum di atas.

  4. Pilih Beli.

Catatan

Pembuatan kluster dapat memakan waktu hingga 20 menit.

Gunakan Spark Structured Streaming

Contoh ini menunjukkan cara penggunaan Spark Structured Streaming dengan Kafka di HDInsight. Ia menggunakan data perjalanan taksi, yang disediakan oleh New York City. Himpunan data yang digunakan oleh buku catatan ini diambil dari 2016 Green Taxi Trip Data.

  1. Kumpulkan informasi host. Gunakan perintah curl dan jq di bawah ini untuk mendapatkan informasi Kafka ZooKeeper dan host broker Anda. Perintah tersebut dirancang untuk prompt perintah Windows, sedikit variasi akan diperlukan untuk lingkungan lain. Ganti KafkaCluster dengan nama kluster Kafka Anda, dan KafkaPassword dengan kata sandi masuk kluster. Ganti juga C:\HDI\jq-win64.exe dengan jalur aktual ke instalasi jq Anda. Masukkan perintah di prompt perintah Windows dan simpan output untuk digunakan di langkah-langkah berikutnya.

    REM Enter cluster name in lowercase
    
    set CLUSTERNAME=KafkaCluster
    set PASSWORD=KafkaPassword
    
    curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):2181"""] | join(""",""")"
    
    curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/KAFKA/components/KAFKA_BROKER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):9092"""] | join(""",""")"
    
  2. Dari browser web, arahkan ke https://CLUSTERNAME.azurehdinsight.net/jupyter, di mana CLUSTERNAME merupakan nama kluster Anda. Ketika diminta, masukkan info masuk kluster (admin) dan kata sandi yang Anda gunakan saat membuat kluster.

  3. Pilih Baru > Spark untuk membuat notebook.

  4. Spark streaming memiliki batch mikro, yang berarti data datang sebagai batch dan pelaksana berjalan pada batch data. Jika pelaksana memiliki batas waktu siaga kurang dari waktu yang diperlukan untuk memproses batch, maka pelaksana akan terus ditambah dan dihapus. Jika batas waktu siaga pelaksana lebih besar dari durasi batch, pelaksana tidak pernah dihapus. Jadi sebaiknya Anda menonaktifkan alokasi dinamis dengan mengubah pengaturan spark.dynamicAllocation.enabled ke false ketika menjalankan aplikasi streaming.

    Muat paket yang digunakan oleh Buku Catatan Jupyter dengan memasukkan informasi berikut ke dalam sel Buku Catatan. Jalankan perintah dengan menggunakan CTRL + ENTER.

    %%configure -f
    {
        "conf": {
            "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
            "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11",
            "spark.dynamicAllocation.enabled": false
        }
    }
    
  5. Buat topik Kafka. Edit perintah di bawah ini dengan mengganti YOUR_ZOOKEEPER_HOSTS dengan informasi host Zookeeper yang telah diekstrak pada langkah pertama. Masukkan perintah yang sudah diedit ke Buku Catatan Jupyter Anda untuk membuat topik tripdata.

    %%bash
    export KafkaZookeepers="YOUR_ZOOKEEPER_HOSTS"
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic tripdata --zookeeper $KafkaZookeepers
    
  6. Ambil data perjalanan taksi. Masukkan perintah di sel berikutnya untuk memuat data perjalanan taksi di New York City. Data dimuat ke dalam kerangka data lalu kerangka data tersebut ditampilkan sebagai output sel.

    import spark.implicits._
    
    // Load the data from the New York City Taxi data REST API for 2016 Green Taxi Trip Data
    val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json"
    val result = scala.io.Source.fromURL(url).mkString
    
    // Create a dataframe from the JSON data
    val taxiDF = spark.read.json(Seq(result).toDS)
    
    // Display the dataframe containing trip data
    taxiDF.show()
    
  7. Atur informasi host broker Kafka. Ganti YOUR_KAFKA_BROKER_HOSTS dengan informasi host broker yang Anda ekstrak di langkah 1. Masukkan perintah yang sudah diedit ke sel Buku Catatan Jupyter berikutnya.

    // The Kafka broker hosts and topic used to write to Kafka
    val kafkaBrokers="YOUR_KAFKA_BROKER_HOSTS"
    val kafkaTopic="tripdata"
    
    println("Finished setting Kafka broker and topic configuration.")
    
  8. Kirim data ke Kafka. Dalam perintah berikut, bidang vendorid digunakan sebagai nilai kunci untuk pesan Kafka. Kunci tersebut digunakan oleh Kafka ketika mempartisi data. Semua bidang disimpan dalam pesan Kafka sebagai nilai untai JSON. Masukkan perintah berikut di Jupyter untuk menyimpan data ke Kafka menggunakan kueri batch.

    // Select the vendorid as the key and save the JSON string as the value.
    val query = taxiDF.selectExpr("CAST(vendorid AS STRING) as key", "to_JSON(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("topic", kafkaTopic).save()
    
    println("Data sent to Kafka")
    
  9. Menyatakan skema. Perintah berikut menunjukkan cara menggunakan skema saat membaca data JSON dari kafka. Masukkan perintah di sel Jupyter berikutnya.

    // Import bits useed for declaring schemas and working with JSON data
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    // Define a schema for the data
    val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType)
    // Reproduced here for readability
    //val schema = (new StructType)
    //   .add("dropoff_latitude", StringType)
    //   .add("dropoff_longitude", StringType)
    //   .add("extra", StringType)
    //   .add("fare_amount", StringType)
    //   .add("improvement_surcharge", StringType)
    //   .add("lpep_dropoff_datetime", StringType)
    //   .add("lpep_pickup_datetime", StringType)
    //   .add("mta_tax", StringType)
    //   .add("passenger_count", StringType)
    //   .add("payment_type", StringType)
    //   .add("pickup_latitude", StringType)
    //   .add("pickup_longitude", StringType)
    //   .add("ratecodeid", StringType)
    //   .add("store_and_fwd_flag", StringType)
    //   .add("tip_amount", StringType)
    //   .add("tolls_amount", StringType)
    //   .add("total_amount", StringType)
    //   .add("trip_distance", StringType)
    //   .add("trip_type", StringType)
    //   .add("vendorid", StringType)
    
    println("Schema declared")
    
  10. Pilih data dan mulai streaming. Perintah berikut menunjukkan cara mengambil data dari Kafka menggunakan kueri batch. Dan kemudian menulis hasilnya ke HDFS di kluster Spark. Dalam contoh ini, select mengambil pesan (bidang nilai) dari Kafka dan menerapkan skemanya ke dalamnya. Data kemudian ditulis ke HDFS (WASB atau ADL) dalam format parket. Masukkan perintah di sel Jupyter berikutnya.

    // Read a batch from Kafka
    val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
    
    // Select data and write to file
    val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/example/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save()
    
    println("Wrote data to file")
    
  11. Anda dapat memverifikasi bahwa file dibuat dengan memasukkan perintah di sel Jupyter berikutnya. Ia mencantumkan file-file dalam direktori /example/batchtripdata.

    %%bash
    hdfs dfs -ls /example/batchtripdata
    
  12. Sementara contoh sebelumnya menggunakan kueri batch, perintah berikut ini menunjukkan cara melakukan hal yang sama menggunakan kueri streaming. Masukkan perintah di sel Jupyter berikutnya.

    // Stream from Kafka
    val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
    
    // Select data from the stream and write to file
    kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/example/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000)
    println("Wrote data to file")
    
  13. Jalankan sel berikut untuk memverifikasi bahwa file ditulis oleh kueri streaming.

    %%bash
    hdfs dfs -ls /example/streamingtripdata
    

Membersihkan sumber daya

Untuk membersihkan sumber daya yang dibuat oleh tutorial ini, Anda dapat menghapus grup sumber daya terkait. Menghapus grup sumber daya juga akan menghapus kluster HDInsight terkait. Dan sumber daya lain yang berkaitan dengan grup sumber daya.

Menghapus grup sumber daya menggunakan portal Microsoft Azure:

  1. Di portal Microsoft Azure, perluas menu di sisi kiri untuk membuka menu layanan, dan pilih Grup Sumber Daya untuk menampilkan daftar grup sumber daya Anda.
  2. Cari grup sumber daya yang ingin dihapus, lalu klik kanan tombol Lainnya (...) di sisi kanan daftar.
  3. Pilih Hapus grup sumber daya, lalu konfirmasi.

Peringatan

Tagihan klaster HDInsight mulai dihitung setelah klaster dibuat dan akan berhenti saat klaster dihapus. Penagihan dihitung pro-rata per menit, sehingga Anda harus selalu menghapus kluster jika tidak digunakan lagi.

Menghapus Kafka di kluster HDInsight dapat menghapus data apa pun yang disimpan di Kafka.