Bagikan melalui


Membaca informasi status Streaming Terstruktur

Anda dapat menggunakan operasi DataFrame atau fungsi nilai tabel SQL untuk mengkueri data dan metadata status Streaming Terstruktur. Gunakan fungsi-fungsi ini untuk mengamati informasi status untuk kueri stateful Streaming Terstruktur, yang dapat berguna untuk pemantauan dan penelusuran kesalahan.

Anda harus memiliki akses baca ke jalur cek poin untuk kueri streaming agar dapat mengkueri data status dan metadata. Fungsi yang dijelaskan dalam artikel ini menyediakan akses baca-saja ke data status dan metadata. Anda hanya dapat menggunakan semantik baca batch untuk melakukan kueri informasi status.

Catatan

Anda tidak dapat mengkueri informasi status untuk Alur Deklaratif Lakeflow Spark, tabel streaming, atau tampilan materialisasi. Anda tidak dapat meminta kueri informasi status menggunakan komputasi tanpa server atau komputasi yang dikonfigurasi dengan mode akses standar.

Persyaratan

  • Gunakan salah satu konfigurasi komputasi berikut:
    • Databricks Runtime versi 16.3 dan lebih baru yang dikonfigurasikan pada sistem komputasi dengan mode akses standar.
    • Databricks Runtime 14.3 LTS ke atas pada komputasi yang dikonfigurasi dengan mode akses terdedikasi atau tanpa isolasi.
  • Akses baca ke jalur titik pemeriksaan yang digunakan oleh kueri streaming.

Membaca status penyimpanan Streaming Terstruktur

Anda dapat membaca informasi penyimpanan status untuk kueri Streaming Terstruktur yang dijalankan dalam Databricks Runtime yang didukung. Gunakan sintaks berikut:

Ular sawah

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Parameter API pembaca kondisi

API pembaca status mendukung konfigurasi opsional berikut:

Opsi Jenis Nilai bawaan Deskripsi
batchId Panjang Batch ID terbaru Mewakili batch target yang akan dibaca. Tentukan opsi ini untuk mengkueri informasi status untuk status kueri sebelumnya. Batch harus dikomitmenkan tetapi belum dibersihkan.
operatorId Panjang 0 Mewakili operator target yang akan dibaca. Opsi ini digunakan saat kueri menggunakan beberapa operator stateful.
storeName string "BAWAAN" Merupakan nama penyimpanan status target yang akan dibaca. Opsi ini digunakan saat operator stateful menggunakan beberapa instans penyimpanan status. Baik storeName atau joinSide harus ditentukan untuk gabungan stream-steam, tetapi tidak keduanya.
joinSide String ("kiri" atau "kanan") Mewakili sisi target tempat membaca. Opsi ini digunakan saat pengguna ingin membaca status dari gabungan stream-stream.
stateVarName string Tidak ada Nama variabel status yang akan dibaca sebagai bagian dari kueri ini. Nama variabel status adalah nama unik yang diberikan untuk setiap variabel dalam fungsi init yang digunakan oleh operator StatefulProcessor. Opsi ini adalah opsi yang diperlukan jika transformWithState operator digunakan. Opsi ini hanya berlaku untuk transformWithState operator dan diabaikan untuk operator lain. Tersedia di Databricks Runtime 16.2 ke atas.
readRegisteredTimers Boolean tidak benar Atur ke true untuk membaca timer terdaftar yang digunakan dalam operator transformWithState. Opsi ini hanya berlaku untuk transformWithState operator dan diabaikan untuk operator lain. Tersedia di Databricks Runtime 16.2 ke atas.
flattenCollectionTypes Boolean benar Jika true, meratakan rekaman yang dikembalikan untuk variabel status peta dan daftar. Jika false, rekaman dikembalikan menggunakan Spark SQL Array atau Map. Opsi ini hanya berlaku untuk transformWithState operator dan diabaikan untuk operator lain. Tersedia di Databricks Runtime 16.2 ke atas.

Data yang dikembalikan memiliki skema berikut:

Kolom Jenis Deskripsi
key Struct (jenis lebih lanjut berasal dari kunci status) Kunci untuk rekaman operator stateful di titik pemeriksaan status.
value Struct (tipe lanjutan diturunkan dari nilai keadaan) Nilai untuk catatan operator stateful di titik pemeriksaan status.
partition_id Bilangan bulat Partisi titik pemeriksaan status yang berisi catatan operator berbasis status.

Lihat fungsi bernilai tabel read_statestore.

Membaca metadata status Streaming Terstruktur

Penting

Anda harus menjalankan kueri streaming pada Databricks Runtime 14.2 atau lebih tinggi untuk merekam metadata status. File metadata status tidak mengganggu kompatibilitas mundur. Jika Anda memilih untuk menjalankan kueri streaming pada Databricks Runtime 14.1 atau di bawahnya, file metadata status yang ada diabaikan dan tidak ada file metadata status baru yang ditulis.

Anda dapat membaca informasi metadata status untuk kueri Streaming Terstruktur yang dijalankan pada Databricks Runtime 14.2 atau yang lebih baru. Gunakan sintaks berikut:

Ular sawah

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Data yang dikembalikan memiliki skema berikut:

Kolom Jenis Deskripsi
operatorId Bilangan bulat ID angka bulat dari operator streaming yang bersifat stateful.
operatorName Bilangan bulat Nama operator stateful streaming.
stateStoreName string Nama penyimpanan status operator.
numPartitions Bilangan bulat Jumlah partisi penyimpanan status.
minBatchId Panjang ID batch terkecil yang tersedia untuk menanyakan keadaan.
maxBatchId Panjang ID batch maksimum yang tersedia untuk melakukan kueri status.

Catatan

Nilai ID batch yang disediakan oleh minBatchId dan maxBatchId mencerminkan status pada saat titik pemeriksaan ditulis. Batch lama dibersihkan secara otomatis dengan eksekusi mikro-batch, sehingga nilai yang disediakan di sini tidak dijamin masih tersedia.

Lihat fungsi bernilai tabel read_state_metadata.

Contoh: Melakukan query pada satu sisi gabungan aliran-aliran

Gunakan sintaks berikut untuk mengkueri sisi kiri gabungan stream-stream:

Ular sawah

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Contoh: Penyimpanan status kueri untuk aliran dengan beberapa operator stateful

Contoh ini menggunakan pembaca metadata status untuk mengumpulkan detail metadata kueri streaming dengan beberapa operator stateful, lalu menggunakan hasil metadata sebagai opsi untuk pembaca status.

Pembaca metadata status mengambil jalur titik pemeriksaan sebagai satu-satunya opsi, seperti dalam contoh sintaks berikut:

Ular sawah

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Tabel berikut ini mewakili contoh output metadata penyimpanan status:

operatorId Nama Operator stateStoreName JumlahPartisi minBatchId maxBatchId
0 stateStoreSave asali 200 0 13
1 menghapus duplikasi dalam Watermark asali 200 0 13

Untuk mendapatkan hasil untuk dedupeWithinWatermark operator, kueri pembaca status dengan operatorId opsi, seperti dalam contoh berikut:

Ular sawah

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);