Bagikan melalui


Ringkasan Streaming Apache Spark

Streaming Apache Spark menyediakan pemrosesan stream data pada kluster Microsoft Azure HDInsight Spark. Dengan jaminan bahwa setiap peristiwa input diproses persis sebanyak satu kali, bahkan jika kegagalan simpul terjadi. Spark Stream adalah pekerjaan jangka panjang yang menerima data input dari berbagai sumber, termasuk Azure Event Hubs. Juga: Azure IoT Hub, Apache Kafka, Apache Flume, Twitter, ZeroMQ, soket Protokol Kendali Transmisi mentah, atau dari pemantauan sistem file YARN Apache Hadoop. Tidak seperti proses yang semata-mata digerakkan oleh peristiwa, Spark Stream mengumpulkan data input ke dalam jendela waktu. Seperti irisan 2 detik, untuk kemudian mentransformasikan setiap batch data menggunakan peta, operasi pengurangan, penggabungan, dan ekstraksi. Spark Stream kemudian menulis data yang ditransformasikan ke sistem file, database, dasbor, dan konsol.

Pemrosesan Streaming dengan HDInsight dan Spark Streaming.

Aplikasi Spark Streaming harus menunggu sepersekian detik untuk mengumpulkan setiap micro-batch peristiwa sebelum mengirim batch tersebut untuk diproses. Sebaliknya, aplikasi yang digerakkan oleh peristiwa secepatnya memproses setiap peristiwa. Latensi Spark Streaming biasanya di bawah beberapa detik. Manfaat pendekatan mikro-batch adalah pemrosesan data yang lebih efisien dan perhitungan agregat yang lebih sederhana.

Memperkenalkan DStream

Spark Streaming mewakili stream berkelanjutan dari data yang masuk menggunakan stream yang didiskretkan yang disebut DStream. DStream dapat dibuat dari sumber input seperti Azure Event Hubs atau Kafka. Atau dengan menerapkan transformasi pada DStream lain.

DStream menyediakan lapisan abstraksi di atas data peristiwa mentah.

Mulailah dengan satu peristiwa, misalnya pembacaan suhu dari termostat yang tersambung. Ketika peristiwa ini sampai di aplikasi Spark Streaming Anda, peristiwa disimpan dengan cara yang dapat diandalkan, di mana peristiwa direplikasi pada beberapa simpul. Toleransi kesalahan ini memastikan bahwa kegagalan simpul tunggal apa pun tidak akan mengakibatkan hilangnya peristiwa Anda. Inti Spark menggunakan struktur data yang mendistribusikan data di beberapa simpul dalam kluster. Di mana setiap simpul umumnya mempertahankan datanya sendiri dalam memori untuk kinerja terbaik. Struktur data ini disebut set data terdistribusi tangguh (RDD).

Setiap RDD mewakili peristiwa yang dikumpulkan selama jangka waktu yang ditentukan pengguna yang disebut interval batch. Karena setiap interval batch akan berlalu, produksi RDD baru berisi semua data dari interval tersebut. Set RDD berkelanjutan dikumpulkan ke dalam DStream. Misalnya, jika interval batch adalah sepanjang satu detik, DStream Anda memancarkan batch setiap detik yang berisi satu RDD yang berisi semua data yang diserap selama detik tersebut. Ketika memproses DStream, peristiwa suhu muncul di salah satu batch ini. Aplikasi Spark Streaming memproses batch yang berisi peristiwa dan pada akhirnya bertindak pada data yang disimpan di setiap RDD.

Contoh DStream dengan Peristiwa Suhu.

Struktur aplikasi Spark Streaming

Aplikasi Spark Streaming adalah aplikasi jangka panjang yang menerima data dari sumber yang diserap. Menerapkan transformasi untuk memproses data, lalu mendorong data ke satu atau beberapa tujuan. Struktur aplikasi Spark Streaming memiliki bagian statik dan bagian dinamis. Bagian statik menentukan dari mana data berasal, pemrosesan apa yang harus dilakukan pada data. Dan ke mana tujuan hasilnya nanti. Bagian dinamis menjalankan aplikasi tanpa batas waktu, menunggu sinyal berhenti aktif.

Misalnya, aplikasi sederhana berikut menerima baris teks pada soket TCP dan menghitung berapa kali setiap kata muncul.

Menentukan aplikasi

Definisi logika aplikasi memiliki empat langkah:

  1. Membuat StreamingContext.
  2. Membuat DStream dari StreamingContext.
  3. Menerapkan transformasi ke DStream.
  4. Mengeluarkan hasil.

Definisi ini bersifat statik, dan tidak ada data yang diproses sampai Anda menjalankan aplikasi.

Membuat StreamingContext

Buat StreamingContext dari SparkContext yang menunjuk ke kluster Anda. Ketika membuat StreamingContext, Anda menentukan ukuran batch dalam detik, misalnya:

import org.apache.spark._
import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(1))

Membuat DStream

Dengan instans StreamingContext, buat input DStream untuk sumber input Anda. Dalam hal ini, aplikasi mengawasi tampilan file baru di penyimpanan yang terpasang secara default.

val lines = ssc.textFileStream("/uploads/Test/")

Menerapkan transformasi

Anda melaksanakan pemrosesan dengan menerapkan transformasi pada DStream. Aplikasi ini menerima satu baris teks pada satu waktu dari file, yang membagi setiap baris menjadi kata-kata. Lalu menggunakan pola pengurangan peta untuk menghitung berapa kali setiap kata muncul.

val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

Mengeluarkan hasil

Dorong hasil transformasi ke sistem tujuan dengan menerapkan operasi output. Dalam hal ini, hasil dari setiap eksekusi melalui komputasi dicetak dalam output konsol.

wordCounts.print()

Jalankan aplikasi

Mulai aplikasi streaming dan jalankan sampai sinyal penghentian diterima.

ssc.start()
ssc.awaitTermination()

Untuk perincian tentang API Spark Stream, lihat Panduan Pemrograman Streaming Apache Spark.

Contoh aplikasi berikut ini berdiri sendiri, sehingga Anda dapat menjalankannya di dalam Jupyter Notebook. Contoh ini membuat sumber data tiruan di kelas DummySource yang menghasilkan nilai penghitung dan waktu saat ini dalam milidetik setiap lima detik. Objek StreamingContext baru memiliki interval batch 30 detik. Setiap kali batch dibuat, aplikasi streaming memeriksa RDD yang diproduksi. Lalu mengonversi RDD menjadi Spark DataFrame, dan membuat tabel sementara pada DataFrame.

class DummySource extends org.apache.spark.streaming.receiver.Receiver[(Int, Long)](org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2) {

    /** Start the thread that simulates receiving data */
    def onStart() {
        new Thread("Dummy Source") { override def run() { receive() } }.start()
    }

    def onStop() {  }

    /** Periodically generate a random number from 0 to 9, and the timestamp */
    private def receive() {
        var counter = 0  
        while(!isStopped()) {
            store(Iterator((counter, System.currentTimeMillis)))
            counter += 1
            Thread.sleep(5000)
        }
    }
}

// A batch is created every 30 seconds
val ssc = new org.apache.spark.streaming.StreamingContext(spark.sparkContext, org.apache.spark.streaming.Seconds(30))

// Set the active SQLContext so that we can access it statically within the foreachRDD
org.apache.spark.sql.SQLContext.setActive(spark.sqlContext)

// Create the stream
val stream = ssc.receiverStream(new DummySource())

// Process RDDs in the batch
stream.foreachRDD { rdd =>

    // Access the SQLContext and create a table called demo_numbers we can query
    val _sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
    _sqlContext.createDataFrame(rdd).toDF("value", "time")
        .registerTempTable("demo_numbers")
}

// Start the stream processing
ssc.start()

Tunggu sekitar 30 detik setelah memulai aplikasi di atas. Kemudian, Anda bisa mengkueri DataFrame secara berkala untuk melihat set nilai saat ini yang ada dalam batch, misalnya dengan menggunakan kueri SQL ini:

%%sql
SELECT * FROM demo_numbers

Output yang dihasilkan terlihat seperti output berikut:

value waktu
10 1497314465256
11 1497314470272
12 1497314475289
13 1497314480310
14 1497314485327
15 1497314490346

Ada enam nilai, karena DummySource menciptakan nilai setiap 5 detik dan aplikasi memancarkan batch setiap 30 detik.

Jendela geser

Untuk melakukan penghitungan agregat pada DStream Anda selama beberapa periode waktu, misalnya untuk mendapatkan suhu rata-rata selama dua detik terakhir, gunakan operasi sliding window yang disertakan dengan Spark Streaming. Jendela geser memiliki durasi (panjang jendela) dan interval saat konten jendela dievaluasi (interval slide).

Jendela geser dapat menjadi tumpang tindih, misalnya, Anda dapat menentukan jendela dengan panjang dua detik, yang meluncur setiap satu detik. Tindakan ini artinya setiap kali Anda melakukan penghitungan agregasi, jendela akan menyertakan data dari satu detik terakhir dari jendela sebelumnya. Dan setiap data baru dalam satu detik berikutnya.

Contoh Jendela Awal dengan Peristiwa Suhu.

Contoh Jendela dengan Peristiwa Suhu Setelah Geser.

Contoh berikut memperbarui kode yang menggunakan DummySource, untuk mengumpulkan batch ke dalam jendela dengan durasi satu menit dan pergeseran satu menit.

class DummySource extends org.apache.spark.streaming.receiver.Receiver[(Int, Long)](org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2) {

    /** Start the thread that simulates receiving data */
    def onStart() {
        new Thread("Dummy Source") { override def run() { receive() } }.start()
    }

    def onStop() {  }

    /** Periodically generate a random number from 0 to 9, and the timestamp */
    private def receive() {
        var counter = 0  
        while(!isStopped()) {
            store(Iterator((counter, System.currentTimeMillis)))
            counter += 1
            Thread.sleep(5000)
        }
    }
}

// A batch is created every 30 seconds
val ssc = new org.apache.spark.streaming.StreamingContext(spark.sparkContext, org.apache.spark.streaming.Seconds(30))

// Set the active SQLContext so that we can access it statically within the foreachRDD
org.apache.spark.sql.SQLContext.setActive(spark.sqlContext)

// Create the stream
val stream = ssc.receiverStream(new DummySource())

// Process batches in 1 minute windows
stream.window(org.apache.spark.streaming.Minutes(1)).foreachRDD { rdd =>

    // Access the SQLContext and create a table called demo_numbers we can query
    val _sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
    _sqlContext.createDataFrame(rdd).toDF("value", "time")
    .registerTempTable("demo_numbers")
}

// Start the stream processing
ssc.start()

Setelah menit pertama, ada 12 entri - enam entri dari masing-masing dua batch yang dikumpulkan di jendela.

value waktu
1 1497316294139
2 1497316299158
3 1497316304178
4 1497316309204
5 1497316314224
6 1497316319243
7 1497316324260
8 1497316329278
9 1497316334293
10 1497316339314
11 1497316344339
12 1497316349361

Fungsi jendela geser yang tersedia di API Spark Streaming mencakup jendela, countByWindow, reduceByWindow, dan countByValueAndWindow. Untuk perincian tentang fungsi-fungsi ini, lihat Transformasi pada DStreams.

Checkpointing

Untuk memberikan ketangguhan dan toleransi kesalahan, Spark Streaming mengandalkan titik pemeriksaan untuk memastikan bahwa pemrosesan streaming dapat berlanjut tanpa gangguan, bahkan saat menghadapi kegagalan simpul. Spark membuat titik pemeriksaan ke penyimpanan yang tahan lama (Azure Storage atau Data Lake Storage). Titik pemeriksaan ini menyimpan metadata aplikasi streaming seperti konfigurasi, dan operasi yang ditentukan oleh aplikasi. Juga, setiap batch yang mengantre tetapi belum diproses. Terkadang, titik pemeriksaan juga akan menyertakan penyimpanan data dalam RDD agar lebih cepat membangun kembali status data dari apa yang ada di RDD yang dikelola oleh Spark.

Menyebarkan aplikasi Spark Streaming

Anda biasanya membuat aplikasi Spark Streaming secara lokal ke dalam file JAR. Kemudian menyebarkannya ke Spark pada Microsoft Azure HDInsight dengan menyalin file JAR ke penyimpanan yang terpasang secara default. Anda dapat memulai aplikasi dengan API LIVY REST yang tersedia dari kluster Anda menggunakan operasi POST. Bodi POST menyertakan dokumen JSON yang menyediakan jalur ke JAR Anda. Dan nama kelas yang metode utamanya menentukan dan menjalankan aplikasi streaming, serta secara opsional kebutuhan sumber daya pekerjaan (seperti jumlah pelaksana, memori, dan inti). Selain itu, pengaturan konfigurasi apa pun yang diperlukan kode aplikasi Anda.

Menyebarkan aplikasi Spark Streaming.

Status semua aplikasi juga dapat diperiksa dengan permintaan GET terhadap titik akhir LIVY. Akhirnya, Anda dapat mengakhiri aplikasi yang sedang berjalan dengan mengeluarkan permintaan DELETE terhadap titik akhir LIVY. Untuk perincian tentang API LIVY, lihat Pekerjaan jarak jauh dengan Apache LIVY

Langkah berikutnya