Bagikan melalui


Pulihkan dari kegagalan kueri Streaming Terstruktur dengan alur kerja

Streaming Terstruktur menyediakan toleransi kesalahan dan konsistensi data untuk kueri streaming; menggunakan alur kerja Azure Databricks, Anda dapat dengan mudah mengonfigurasi kueri Streaming Terstruktur untuk memulai ulang secara otomatis saat kegagalan. Dengan mengaktifkan titik pemeriksaan untuk kueri streaming, Anda dapat memulai ulang kueri setelah kegagalan. Kueri yang dimulai ulang berlanjut di mana yang gagal ditinggalkan.

Mengaktifkan titik pemeriksaan untuk kueri Streaming Terstruktur

Databricks menyarankan agar Anda selalu menentukan checkpointLocation opsi jalur penyimpanan cloud sebelum memulai kueri. Contohnya:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

Lokasi titik pemeriksaan ini mempertahankan semua informasi penting yang mengidentifikasi kueri. Setiap kueri harus memiliki lokasi titik pemeriksaan yang berbeda. Beberapa kueri tidak boleh memiliki lokasi yang sama. Untuk informasi selengkapnya, lihat Panduan Pemrograman Streaming Terstruktur.

Catatan

Meskipun checkpointLocation diperlukan untuk sebagian besar jenis sink output, beberapa sink, seperti sink memori, dapat secara otomatis menghasilkan lokasi titik pemeriksaan sementara ketika Anda tidak menyediakan checkpointLocation. Lokasi titik pemeriksaan sementara ini tidak memastikan toleransi kesalahan atau jaminan konsistensi data dan mungkin tidak dibersihkan dengan benar. Hindari potensi perangkap dengan selalu menentukan checkpointLocation.

Mengonfigurasi pekerjaan Streaming Terstruktur untuk memulai ulang kueri streaming saat gagal

Anda dapat membuat pekerjaan di Azure Databricks dengan notebook atau JAR yang memiliki kueri streaming Anda dan mengonfigurasinya ke:

  • Selalu buat kluster baru.
  • Selalu coba lagi ketika gagal.

Memulai ulang secara otomatis pada kegagalan pekerjaan sangat penting saat mengonfigurasi beban kerja streaming dengan evolusi skema. Evolusi skema berfungsi di Azure Databricks dengan meningkatkan kesalahan yang diharapkan ketika perubahan skema terdeteksi, lalu memproses data dengan benar menggunakan skema baru saat pekerjaan dimulai ulang. Databricks merekomendasikan untuk selalu mengonfigurasi tugas streaming yang berisi kueri dengan evolusi skema untuk memulai ulang secara otomatis dalam alur kerja Databricks.

Pekerjaan memiliki integrasi yang ketat dengan API Streaming Terstruktur dan dapat memantau semua kueri streaming yang aktif dalam proses. Konfigurasi ini memastikan bahwa jika ada bagian dari kueri yang gagal, pekerjaan secara otomatis mengakhiri eksekusi (bersama dengan semua kueri lainnya) dan memulai eksekusi baru di kluster baru. Ini menjalankan kembali buku catatan atau kode JAR dan memulai ulang semua kueri lagi. Ini adalah cara paling aman untuk kembali ke keadaan yang baik.

Catatan

  • Kegagalan dalam salah satu kueri streaming aktif menyebabkan proses aktif gagal dan mengakhiri semua kueri streaming lainnya.
  • Anda tidak perlu menggunakan streamingQuery.awaitTermination() atau spark.streams.awaitAnyTermination() di akhir notebook Anda. pekerjaan akan secara otomatis mencegah proses untuk selesai saat kueri streaming aktif.
  • Databricks merekomendasikan penggunaan pekerjaan alih-alih %run dan dbutils.notebook.run() saat mengatur notebook Streaming Terstruktur. Lihat Menjalankan buku catatan Databricks dari buku catatan lain.

Berikut ini adalah contoh konfigurasi pekerjaan yang direkomendasikan.

  • Kluster: Selalu atur ini untuk menggunakan kluster baru dan gunakan versi Spark terbaru (atau setidaknya versi 2.1). Kueri yang dimulai di Spark 2.1 ke atas dapat dipulihkan setelah kueri dan peningkatan versi Spark.
  • Pemberitahuan: Atur ini jika Anda menginginkan pemberitahuan email tentang kegagalan.
  • Jadwal: Jangan atur jadwal.
  • Batas waktu: Jangan tetapkan batas waktu. Kueri streaming berjalan untuk waktu yang lama tanpa batas waktu.
  • Maksimum jumlah eksekusi bersamaan: Atur ke 1. Hanya boleh ada satu instans dari setiap kueri yang aktif secara bersamaan.
  • Coba ulang: Atur ke Tidak batas.

Lihat Membuat dan menjalankan Pekerjaan Azure Databricks untuk memahami konfigurasi ini.

Pulihkan setelah perubahan dalam kueri Streaming Terstruktur

Ada batasan pada perubahan apa yang diizinkan dalam kueri streaming di antara beberapa coba ulang dari lokasi titik pemeriksaan yang sama. Berikut adalah beberapa perubahan yang tidak diizinkan atau efek perubahan tidak ditentukan dengan baik. Untuk semuanya:

  • Istilah diizinkan berarti Anda dapat melakukan perubahan yang ditentukan, tetapi apakah semantik efeknya didefinisikan dengan baik, itu tergantung pada kueri dan perubahannya.
  • Istilah tidak diizinkan berarti Anda tidak boleh melakukan perubahan yang ditentukan karena kueri yang dimulai ulang kemungkinan akan gagal dengan kesalahan yang tidak dapat diprediksi.
  • sdf mewakili DataFrame/Dataset streaming yang dihasilkan dengan sparkSession.readStream.

Jenis perubahan dalam kueri Streaming Terstruktur

  • Perubahan pada jumlah atau jenis (sumber yang berbeda) dari sumber input: Ini tidak diizinkan.
  • Perubahan parameter sumber input: Terlebih ini diizinkan atau semantik perubahannya didefinisikan dengan baik atau tidak, itu tergantung pada sumber dan kueri. Berikut beberapa contohnya.
    • Penambahan, penghapusan, dan modifikasi batas tarif diperbolehkan:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      ke

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Perubahan pada artikel dan file langganan umumnya tidak diizinkan karena hasilnya tidak dapat diprediksi: spark.readStream.format("kafka").option("subscribe", "article") menjadi spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Perubahan interval pemicu: Anda dapat mengubah pemicu antara batch inkremental dan interval waktu. Lihat Mengubah interval pemicu di antara eksekusi.
  • Perubahan jenis sink output: Diperbolehkan melakukan perubahan di antara beberapa kombinasi sink tertentu. Perlu diverifikasi berdasarkan kasus per kasus. Berikut beberapa contohnya.
    • Sink file ke sink Kafka diperbolehkan. Kafka hanya akan melihat data baru.
    • Sink Kafka ke sink file tidak diperbolehkan.
    • Sink Kafka diubah menjadi foreach, atau sebaliknya, diperbolehkan.
  • Perubahan parameter sink output: Terlebih ini diizinkan atau semantik perubahannya didefinisikan dengan baik atau tidak, itu tergantung pada sink dan kueri. Berikut beberapa contohnya.
    • Perubahan pada direktori output sink file tidak diperbolehkan: sdf.writeStream.format("parquet").option("path", "/somePath") menjadi sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Perubahan pada topik output diizinkan: sdf.writeStream.format("kafka").option("topic", "topic1") ke sdf.writeStream.format("kafka").option("topic", "topic2")
    • Perubahan pada sink foreach yang ditentukan pengguna (yaitu, kode ForeachWriter) diperbolehkan, tetapi semantik perubahannya tergantung pada kode.
  • Perubahan dalam proyeksi / filter / operasi seperti peta: Beberapa kasus diperbolehkan. Misalnya:
    • Penambahan / penghapusan filter diperbolehkan: sdf.selectExpr("a") menjadi sdf.where(...).selectExpr("a").filter(...).
    • Perubahan proyeksi dengan skema output yang sama diperbolehkan: sdf.selectExpr("stringColumn AS json").writeStream menjadi sdf.select(to_json(...).as("json")).writeStream.
    • Perubahan proyeksi dengan skema output yang berbeda diperbolehkan secara kondisional: sdf.selectExpr("a").writeStream menjadi sdf.selectExpr("b").writeStream hanya diperbolehkan jika sink output memungkinkan perubahan skema dari "a" ke "b".
  • Perubahan dalam operasi stateful: Beberapa operasi dalam kueri streaming perlu mempertahankan data status untuk terus memperbarui hasilnya. Structured Streaming secara otomatis memeriksa data status ke penyimpanan yang toleran terhadap kesalahan (misalnya, DBFS, penyimpanan Azure Blob) dan memulihkannya setelah restart. Namun, ini akan mengasumsikan bahwa skema data status tetap sama di seluruh restart. Ini berarti bahwa setiap perubahan (yaitu, penambahan, penghapusan, atau modifikasi skema) pada operasi stateful kueri streaming tidak diizinkan di antara proses restart. Berikut adalah daftar operasi stateful yang skemanya tidak boleh diubah antara restart untuk memastikan pemulihan status:
    • Agregasi streaming: Misalnya, sdf.groupBy("a").agg(...). Tidak diperbolehkan adanya perubahan pada jumlah atau jenis kunci pengelompokan atau agregat.
    • Deduplikasi streaming: Misalnya, sdf.dropDuplicates("a"). Tidak diperbolehkan adanya perubahan pada jumlah atau jenis kunci pengelompokan atau agregat.
    • Gabungan stream-stream: Misalnya, sdf1.join(sdf2, ...) (yaitu kedua input dihasilkan dengan sparkSession.readStream). Perubahan dalam skema atau kolom equi-join tidak diperbolehkan. Perubahan pada jenis gabungan (luar atau dalam) tidak diperbolehkan. Perubahan lain yang terjadi dalam kondisi bergabung tidaklah jelas.
    • Operasi stateful arbitrer: Misalnya, sdf.groupByKey(...).mapGroupsWithState(...) atau sdf.groupByKey(...).flatMapGroupsWithState(...). Setiap perubahan pada skema status yang ditentukan pengguna dan jenis batas waktu, tidak diperbolehkan. Setiap perubahan dalam fungsi pemetaan status yang ditentukan pengguna diperbolehkan, tetapi efek semantik dari perubahannya tergantung pada logika yang ditentukan pengguna. Jika Anda benar-benar ingin mendukung perubahan skema status, maka Anda dapat secara eksplisit mengode/mendekode struktur data status kompleks Anda menjadi byte dengan skema pengodean/pendekodean yang mendukung migrasi skema. Misalnya, jika Anda menyimpan status Anda sebagai byte yang dikodekan Avro, maka Anda dapat mengubah skema Avro-state-schema antara kueri dimulai ulang karena ini memulihkan status biner.