Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Anda dapat menggunakan operasi DataFrame atau fungsi nilai tabel SQL untuk mengkueri data dan metadata status Streaming Terstruktur. Gunakan fungsi-fungsi ini untuk mengamati informasi status untuk kueri stateful Streaming Terstruktur, yang dapat berguna untuk pemantauan dan penelusuran kesalahan.
Anda harus memiliki akses baca ke jalur cek poin untuk kueri streaming agar dapat mengkueri data status dan metadata. Fungsi yang dijelaskan dalam artikel ini menyediakan akses baca-saja ke data status dan metadata. Anda hanya dapat menggunakan semantik baca batch untuk melakukan kueri informasi status.
Catatan
Anda tidak dapat mengkueri informasi status untuk Alur Deklaratif Lakeflow Spark, tabel streaming, atau tampilan materialisasi. Anda tidak dapat meminta kueri informasi status menggunakan komputasi tanpa server atau komputasi yang dikonfigurasi dengan mode akses standar.
Persyaratan
- Gunakan salah satu konfigurasi komputasi berikut:
- Databricks Runtime versi 16.3 dan lebih baru yang dikonfigurasikan pada sistem komputasi dengan mode akses standar.
- Databricks Runtime 14.3 LTS ke atas pada komputasi yang dikonfigurasi dengan mode akses terdedikasi atau tanpa isolasi.
- Akses baca ke jalur titik pemeriksaan yang digunakan oleh kueri streaming.
Membaca status penyimpanan Streaming Terstruktur
Anda dapat membaca informasi penyimpanan status untuk kueri Streaming Terstruktur yang dijalankan dalam Databricks Runtime yang didukung. Gunakan sintaks berikut:
Ular sawah
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Parameter API pembaca kondisi
API pembaca status mendukung konfigurasi opsional berikut:
| Opsi | Jenis | Nilai bawaan | Deskripsi |
|---|---|---|---|
batchId |
Panjang | Batch ID terbaru | Mewakili batch target yang akan dibaca. Tentukan opsi ini untuk mengkueri informasi status untuk status kueri sebelumnya. Batch harus dikomitmenkan tetapi belum dibersihkan. |
operatorId |
Panjang | 0 | Mewakili operator target yang akan dibaca. Opsi ini digunakan saat kueri menggunakan beberapa operator stateful. |
storeName |
string | "BAWAAN" | Merupakan nama penyimpanan status target yang akan dibaca. Opsi ini digunakan saat operator stateful menggunakan beberapa instans penyimpanan status. Baik storeName atau joinSide harus ditentukan untuk gabungan stream-steam, tetapi tidak keduanya. |
joinSide |
String ("kiri" atau "kanan") | Mewakili sisi target tempat membaca. Opsi ini digunakan saat pengguna ingin membaca status dari gabungan stream-stream. | |
stateVarName |
string | Tidak ada | Nama variabel status yang akan dibaca sebagai bagian dari kueri ini. Nama variabel status adalah nama unik yang diberikan untuk setiap variabel dalam fungsi init yang digunakan oleh operator StatefulProcessor. Opsi ini adalah opsi yang diperlukan jika transformWithState operator digunakan. Opsi ini hanya berlaku untuk transformWithState operator dan diabaikan untuk operator lain. Tersedia di Databricks Runtime 16.2 ke atas. |
readRegisteredTimers |
Boolean | tidak benar | Atur ke true untuk membaca timer terdaftar yang digunakan dalam operator transformWithState. Opsi ini hanya berlaku untuk transformWithState operator dan diabaikan untuk operator lain. Tersedia di Databricks Runtime 16.2 ke atas. |
flattenCollectionTypes |
Boolean | benar | Jika true, meratakan rekaman yang dikembalikan untuk variabel status peta dan daftar. Jika false, rekaman dikembalikan menggunakan Spark SQL Array atau Map. Opsi ini hanya berlaku untuk transformWithState operator dan diabaikan untuk operator lain. Tersedia di Databricks Runtime 16.2 ke atas. |
Data yang dikembalikan memiliki skema berikut:
| Kolom | Jenis | Deskripsi |
|---|---|---|
key |
Struct (jenis lebih lanjut berasal dari kunci status) | Kunci untuk rekaman operator stateful di titik pemeriksaan status. |
value |
Struct (tipe lanjutan diturunkan dari nilai keadaan) | Nilai untuk catatan operator stateful di titik pemeriksaan status. |
partition_id |
Bilangan bulat | Partisi titik pemeriksaan status yang berisi catatan operator berbasis status. |
Lihat fungsi bernilai tabel read_statestore.
Membaca metadata status Streaming Terstruktur
Penting
Anda harus menjalankan kueri streaming pada Databricks Runtime 14.2 atau lebih tinggi untuk merekam metadata status. File metadata status tidak mengganggu kompatibilitas mundur. Jika Anda memilih untuk menjalankan kueri streaming pada Databricks Runtime 14.1 atau di bawahnya, file metadata status yang ada diabaikan dan tidak ada file metadata status baru yang ditulis.
Anda dapat membaca informasi metadata status untuk kueri Streaming Terstruktur yang dijalankan pada Databricks Runtime 14.2 atau yang lebih baru. Gunakan sintaks berikut:
Ular sawah
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Data yang dikembalikan memiliki skema berikut:
| Kolom | Jenis | Deskripsi |
|---|---|---|
operatorId |
Bilangan bulat | ID angka bulat dari operator streaming yang bersifat stateful. |
operatorName |
Bilangan bulat | Nama operator stateful streaming. |
stateStoreName |
string | Nama penyimpanan status operator. |
numPartitions |
Bilangan bulat | Jumlah partisi penyimpanan status. |
minBatchId |
Panjang | ID batch terkecil yang tersedia untuk menanyakan keadaan. |
maxBatchId |
Panjang | ID batch maksimum yang tersedia untuk melakukan kueri status. |
Catatan
Nilai ID batch yang disediakan oleh minBatchId dan maxBatchId mencerminkan status pada saat titik pemeriksaan ditulis. Batch lama dibersihkan secara otomatis dengan eksekusi mikro-batch, sehingga nilai yang disediakan di sini tidak dijamin masih tersedia.
Lihat fungsi bernilai tabel read_state_metadata.
Contoh: Melakukan query pada satu sisi gabungan aliran-aliran
Gunakan sintaks berikut untuk mengkueri sisi kiri gabungan stream-stream:
Ular sawah
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Contoh: Penyimpanan status kueri untuk aliran dengan beberapa operator stateful
Contoh ini menggunakan pembaca metadata status untuk mengumpulkan detail metadata kueri streaming dengan beberapa operator stateful, lalu menggunakan hasil metadata sebagai opsi untuk pembaca status.
Pembaca metadata status mengambil jalur titik pemeriksaan sebagai satu-satunya opsi, seperti dalam contoh sintaks berikut:
Ular sawah
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Tabel berikut ini mewakili contoh output metadata penyimpanan status:
| operatorId | Nama Operator | stateStoreName | JumlahPartisi | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | asali | 200 | 0 | 13 |
| 1 | menghapus duplikasi dalam Watermark | asali | 200 | 0 | 13 |
Untuk mendapatkan hasil untuk dedupeWithinWatermark operator, kueri pembaca status dengan operatorId opsi, seperti dalam contoh berikut:
Ular sawah
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);