Aracılığıyla paylaş


Yapılandırılmış Akış durumu bilgilerini okuma

Yapılandırılmış Akış durumu verilerini ve meta verilerini sorgulamak için DataFrame işlemlerini veya SQL tablo değeri işlevlerini kullanabilirsiniz. Yapılandırılmış Akış durum bilgisi olan sorgulara ilişkin durum bilgilerini gözlemlemek için bu işlevleri kullanın. Bu bilgiler izleme ve hata ayıklama için yararlı olabilir.

Durum verilerini veya meta verileri sorgulamak için akış sorgusunun denetim noktası yoluna okuma erişiminiz olmalıdır. Bu makalede açıklanan işlevler, durum verilerine ve meta verilerine salt okunur erişim sağlar. Durum bilgilerini sorgulamak için yalnızca toplu okuma semantiği kullanabilirsiniz.

Not

Lakeflow Spark Bildirimli İşlem Hatları, akış tabloları veya gerçekleştirilmiş görünümler için durum bilgilerini sorgulayamazsınız. Sunucusuz bilişim veya standart erişim moduyla yapılandırılmış bilişim kullanarak durum bilgilerini sorgulayamazsınız.

Gereksinimler

  • Aşağıdaki işlem yapılandırmalarından birini kullanın:
    • Standart erişim moduyla yapılandırılmış işlem üzerinde Databricks Runtime 16.3 ve üzeri.
    • Databricks Runtime 14.3 LTS ve üzeri, ayrılmış veya yalıtım olmadan erişim moduna göre yapılandırılmış işlem ortamında çalışır.
  • Akış sorgusu tarafından kullanılan denetim noktası yoluna okuma erişimi.

Yapılandırılmış Akış durum deposunu oku

Desteklenen herhangi bir Databricks Runtime'da yürütülen Yapılandırılmış Akış sorguları için durum deposu bilgilerini okuyabilirsiniz. Aşağıdaki sözdizimini kullanın:

Piton

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Durum okuyucu API parametreleri

Durum okuyucu API'si aşağıdaki isteğe bağlı yapılandırmaları destekler:

Seçenek Tür Varsayılan değer Açıklama
batchId Uzun en son işlem kimliği Okunacak hedef toplu işlemi temsil eder. Sorgunun önceki bir durumu için durum bilgilerini sorgulamak için bu seçeneği belirtin. Toplu işlem gerçekleştirilmelidir ancak henüz temizlenmemelidir.
operatorId Uzun 0 Okunacak hedef işleci temsil eder. Bu seçenek, sorgu birden çok durum bilgisi olan işleç kullandığında kullanılır.
storeName Dize Varsayılan Okunacak hedef durum deposu adını temsil eder. Durumlu bir işleç birden fazla durum deposu örneği kullandığında bu seçenek kullanılır. Akış buharı birleşimi için storeName veya joinSide belirtilmelidir, ancak ikisi birden belirtilmez.
joinSide Dize ("sol" veya "sağ") Okunacak hedef tarafı temsil eder. Kullanıcılar, bir stream-stream birleşiminden durumu okumak istediklerinde bu seçenek kullanılır.
stateVarName Dize Hiç kimse Bu sorgunun bir parçası olarak okunacak durum değişkeni adı. Durum değişkeni adı, init operatörü tarafından kullanılan StatefulProcessor işlevinin transformWithState içindeki her değişkene verilen benzersiz isimdir. İşleç kullanılıyorsa transformWithState bu seçenek gerekli bir seçenektir. Bu seçenek yalnızca transformWithState işleci için geçerlidir ve diğer işleçler için göz ardı edilir. Databricks Runtime 16.2 ve üzerinde kullanılabilir.
readRegisteredTimers Boolean (Boole Mantığı) yanlış true öğesini, transformWithState işleci içinde kullanılan kayıtlı zamanlayıcıları okumak için ayarlayın. Bu seçenek yalnızca transformWithState işleci için geçerlidir ve diğer işleçler için göz ardı edilir. Databricks Runtime 16.2 ve üzerinde kullanılabilir.
flattenCollectionTypes Boolean (Boole Mantığı) doğru ise true, eşleme ve liste durumu değişkenleri için döndürülen kayıtları düz hale döndürür. ise false, kayıtlar spark SQL Array veya Mapkullanılarak döndürülür. Bu seçenek yalnızca transformWithState işleci için geçerlidir ve diğer işleçler için göz ardı edilir. Databricks Runtime 16.2 ve üzerinde kullanılabilir.

Döndürülen veriler aşağıdaki şemaya sahiptir:

Sütun Tür Açıklama
key Yapı (durum anahtarından türetilen başka tür) Durum denetim noktasında, durumsal işleç kaydının anahtarı.
value Yapı (durum değerinden türetilen başka bir tür) Durum kontrol noktasında durum bilgisi olan işleç kaydının değeri.
partition_id Tamsayı Durum kontrol noktasının, durum bilgisi olan işleç kaydını içeren bölümü.

Bakınız read_statestore tablo değerli fonksiyon.

Yapılandırılmış Akış durumu meta verilerini okuma

Önemli

Durum meta verilerini kaydetmek için Databricks Runtime 14.2 veya üzerinde akış sorguları çalıştırmanız gerekir. Durum meta verileri dosyaları geriye dönük uyumluluğu bozmaz. Databricks Runtime 14.1 veya altında bir akış sorgusu çalıştırmayı seçerseniz, mevcut durum meta veri dosyaları yoksayılır ve yeni durum meta veri dosyaları yazılmaz.

Databricks Runtime 14.2 veya üzeri üzerinde çalıştırılan Yapılandırılmış Akış sorguları için durum meta verileri bilgilerini okuyabilirsiniz. Aşağıdaki sözdizimini kullanın:

Piton

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Döndürülen veriler aşağıdaki şemaya sahiptir:

Sütun Tür Açıklama
operatorId Tamsayı Durumlu akış işlecinin tamsayı kimliği.
operatorName Tamsayı Durumlu akış operatörünün adı.
stateStoreName Dize Operatörün durum deposunun adı.
numPartitions Tamsayı Durum deposunun bölüm sayısı.
minBatchId Uzun Sorgulamak için mevcut olan en düşük toplu kimlik.
maxBatchId Uzun Durumu sorgulamak için kullanılabilir en yüksek toplu iş kimliği.

Not

minBatchId ve maxBatchId tarafından verilen yığın kimliği değerleri, denetim noktasının yazıldığı zamandaki durumu yansıtır. Eski toplu işlemler mikro toplu iş yürütmesi ile otomatik olarak temizlenir, bu nedenle burada sağlanan değerin hala kullanılabilir olması garanti değildir.

Bakınız read_state_metadata tablo değerli fonksiyon.

Örnek: Stream-stream join'un bir tarafını sorgulama

Stream-stream birleştirmesinin sol tarafını sorgulamak için aşağıdaki söz dizimini kullanın:

Piton

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Örnek: Durum bilgisi olan birden çok işleç içeren akış için sorgu durum deposu

Bu örneklerde durum bilgisi olan birden çok işleç içeren bir akış sorgusunun meta veri ayrıntılarını toplamak için durum meta veri okuyucusu kullanılır ve ardından meta veri sonuçları durum okuyucu için seçenekler olarak kullanılır.

Durum meta veri okuyucusu, aşağıdaki söz dizimi örneğinde olduğu gibi denetim noktası yolunu tek seçenek olarak alır:

Piton

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Aşağıdaki tablo, durum deposu meta verilerinin örnek çıkışını temsil eder:

operatorId operatörAdı DurumDeposuAdı numPartitions minBatchId maxBatchId
0 stateStoreSave varsayılan 200 0 13 (on üç)
1 dedupeWithinWatermark varsayılan 200 0 13 (on üç)

operatörün dedupeWithinWatermark sonuçlarını almak için, aşağıdaki örnekte olduğu gibi durum okuyucusunu operatorId seçeneğiyle sorgulayın:

Piton

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);