Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Yapılandırılmış Akış durumu verilerini ve meta verilerini sorgulamak için DataFrame işlemlerini veya SQL tablo değeri işlevlerini kullanabilirsiniz. Yapılandırılmış Akış durum bilgisi olan sorgulara ilişkin durum bilgilerini gözlemlemek için bu işlevleri kullanın. Bu bilgiler izleme ve hata ayıklama için yararlı olabilir.
Durum verilerini veya meta verileri sorgulamak için akış sorgusunun denetim noktası yoluna okuma erişiminiz olmalıdır. Bu makalede açıklanan işlevler, durum verilerine ve meta verilerine salt okunur erişim sağlar. Durum bilgilerini sorgulamak için yalnızca toplu okuma semantiği kullanabilirsiniz.
Not
Lakeflow Spark Bildirimli İşlem Hatları, akış tabloları veya gerçekleştirilmiş görünümler için durum bilgilerini sorgulayamazsınız. Sunucusuz bilişim veya standart erişim moduyla yapılandırılmış bilişim kullanarak durum bilgilerini sorgulayamazsınız.
Gereksinimler
- Aşağıdaki işlem yapılandırmalarından birini kullanın:
- Standart erişim moduyla yapılandırılmış işlem üzerinde Databricks Runtime 16.3 ve üzeri.
- Databricks Runtime 14.3 LTS ve üzeri, ayrılmış veya yalıtım olmadan erişim moduna göre yapılandırılmış işlem ortamında çalışır.
- Akış sorgusu tarafından kullanılan denetim noktası yoluna okuma erişimi.
Yapılandırılmış Akış durum deposunu oku
Desteklenen herhangi bir Databricks Runtime'da yürütülen Yapılandırılmış Akış sorguları için durum deposu bilgilerini okuyabilirsiniz. Aşağıdaki sözdizimini kullanın:
Piton
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Durum okuyucu API parametreleri
Durum okuyucu API'si aşağıdaki isteğe bağlı yapılandırmaları destekler:
| Seçenek | Tür | Varsayılan değer | Açıklama |
|---|---|---|---|
batchId |
Uzun | en son işlem kimliği | Okunacak hedef toplu işlemi temsil eder. Sorgunun önceki bir durumu için durum bilgilerini sorgulamak için bu seçeneği belirtin. Toplu işlem gerçekleştirilmelidir ancak henüz temizlenmemelidir. |
operatorId |
Uzun | 0 | Okunacak hedef işleci temsil eder. Bu seçenek, sorgu birden çok durum bilgisi olan işleç kullandığında kullanılır. |
storeName |
Dize | Varsayılan | Okunacak hedef durum deposu adını temsil eder. Durumlu bir işleç birden fazla durum deposu örneği kullandığında bu seçenek kullanılır. Akış buharı birleşimi için storeName veya joinSide belirtilmelidir, ancak ikisi birden belirtilmez. |
joinSide |
Dize ("sol" veya "sağ") | Okunacak hedef tarafı temsil eder. Kullanıcılar, bir stream-stream birleşiminden durumu okumak istediklerinde bu seçenek kullanılır. | |
stateVarName |
Dize | Hiç kimse | Bu sorgunun bir parçası olarak okunacak durum değişkeni adı. Durum değişkeni adı, init operatörü tarafından kullanılan StatefulProcessor işlevinin transformWithState içindeki her değişkene verilen benzersiz isimdir. İşleç kullanılıyorsa transformWithState bu seçenek gerekli bir seçenektir. Bu seçenek yalnızca transformWithState işleci için geçerlidir ve diğer işleçler için göz ardı edilir. Databricks Runtime 16.2 ve üzerinde kullanılabilir. |
readRegisteredTimers |
Boolean (Boole Mantığı) | yanlış |
true öğesini, transformWithState işleci içinde kullanılan kayıtlı zamanlayıcıları okumak için ayarlayın. Bu seçenek yalnızca transformWithState işleci için geçerlidir ve diğer işleçler için göz ardı edilir. Databricks Runtime 16.2 ve üzerinde kullanılabilir. |
flattenCollectionTypes |
Boolean (Boole Mantığı) | doğru | ise true, eşleme ve liste durumu değişkenleri için döndürülen kayıtları düz hale döndürür. ise false, kayıtlar spark SQL Array veya Mapkullanılarak döndürülür. Bu seçenek yalnızca transformWithState işleci için geçerlidir ve diğer işleçler için göz ardı edilir. Databricks Runtime 16.2 ve üzerinde kullanılabilir. |
Döndürülen veriler aşağıdaki şemaya sahiptir:
| Sütun | Tür | Açıklama |
|---|---|---|
key |
Yapı (durum anahtarından türetilen başka tür) | Durum denetim noktasında, durumsal işleç kaydının anahtarı. |
value |
Yapı (durum değerinden türetilen başka bir tür) | Durum kontrol noktasında durum bilgisi olan işleç kaydının değeri. |
partition_id |
Tamsayı | Durum kontrol noktasının, durum bilgisi olan işleç kaydını içeren bölümü. |
Bakınız read_statestore tablo değerli fonksiyon.
Yapılandırılmış Akış durumu meta verilerini okuma
Önemli
Durum meta verilerini kaydetmek için Databricks Runtime 14.2 veya üzerinde akış sorguları çalıştırmanız gerekir. Durum meta verileri dosyaları geriye dönük uyumluluğu bozmaz. Databricks Runtime 14.1 veya altında bir akış sorgusu çalıştırmayı seçerseniz, mevcut durum meta veri dosyaları yoksayılır ve yeni durum meta veri dosyaları yazılmaz.
Databricks Runtime 14.2 veya üzeri üzerinde çalıştırılan Yapılandırılmış Akış sorguları için durum meta verileri bilgilerini okuyabilirsiniz. Aşağıdaki sözdizimini kullanın:
Piton
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Döndürülen veriler aşağıdaki şemaya sahiptir:
| Sütun | Tür | Açıklama |
|---|---|---|
operatorId |
Tamsayı | Durumlu akış işlecinin tamsayı kimliği. |
operatorName |
Tamsayı | Durumlu akış operatörünün adı. |
stateStoreName |
Dize | Operatörün durum deposunun adı. |
numPartitions |
Tamsayı | Durum deposunun bölüm sayısı. |
minBatchId |
Uzun | Sorgulamak için mevcut olan en düşük toplu kimlik. |
maxBatchId |
Uzun | Durumu sorgulamak için kullanılabilir en yüksek toplu iş kimliği. |
Not
minBatchId ve maxBatchId tarafından verilen yığın kimliği değerleri, denetim noktasının yazıldığı zamandaki durumu yansıtır. Eski toplu işlemler mikro toplu iş yürütmesi ile otomatik olarak temizlenir, bu nedenle burada sağlanan değerin hala kullanılabilir olması garanti değildir.
Bakınız read_state_metadata tablo değerli fonksiyon.
Örnek: Stream-stream join'un bir tarafını sorgulama
Stream-stream birleştirmesinin sol tarafını sorgulamak için aşağıdaki söz dizimini kullanın:
Piton
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Örnek: Durum bilgisi olan birden çok işleç içeren akış için sorgu durum deposu
Bu örneklerde durum bilgisi olan birden çok işleç içeren bir akış sorgusunun meta veri ayrıntılarını toplamak için durum meta veri okuyucusu kullanılır ve ardından meta veri sonuçları durum okuyucu için seçenekler olarak kullanılır.
Durum meta veri okuyucusu, aşağıdaki söz dizimi örneğinde olduğu gibi denetim noktası yolunu tek seçenek olarak alır:
Piton
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Aşağıdaki tablo, durum deposu meta verilerinin örnek çıkışını temsil eder:
| operatorId | operatörAdı | DurumDeposuAdı | numPartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | varsayılan | 200 | 0 | 13 (on üç) |
| 1 | dedupeWithinWatermark | varsayılan | 200 | 0 | 13 (on üç) |
operatörün dedupeWithinWatermark sonuçlarını almak için, aşağıdaki örnekte olduğu gibi durum okuyucusunu operatorId seçeneğiyle sorgulayın:
Piton
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);