Yapılandırılmış Akış durumu bilgilerini okuma

Yapılandırılmış Akış durumu verilerini ve meta verilerini sorgulamak için DataFrame işlemlerini veya SQL tablo değeri işlevlerini kullanabilirsiniz. Yapılandırılmış Akış durum bilgisi olan sorgulara ilişkin durum bilgilerini gözlemlemek için bu işlevleri kullanın. Bu bilgiler izleme ve hata ayıklama için yararlı olabilir.

Durum verilerini veya meta verileri sorgulamak için akış sorgusunun denetim noktası yoluna okuma erişiminiz olmalıdır. Bu makalede açıklanan işlevler, durum verilerine ve meta verilerine salt okunur erişim sağlar. Durum bilgilerini sorgulamak için yalnızca toplu okuma semantiği kullanabilirsiniz.

Not

Lakeflow Spark Bildirimli İşlem Hatları, akış tabloları veya gerçekleştirilmiş görünümler için durum bilgilerini sorgulayamazsınız. Sunucusuz bilişim veya standart erişim moduyla yapılandırılmış bilişim kullanarak durum bilgilerini sorgulayamazsınız.

Gereksinimler

  • Aşağıdaki işlem yapılandırmalarından birini kullanın:
    • Standart erişim moduyla yapılandırılmış işlem üzerinde Databricks Runtime 16.3 ve üzeri.
    • Databricks Runtime 14.3 LTS ve üzeri, ayrılmış veya yalıtım olmadan erişim moduna göre yapılandırılmış işlem ortamında çalışır.
  • Akış sorgusu tarafından kullanılan denetim noktası yoluna okuma erişimi.

Yapılandırılmış Akış durum deposunu oku

Desteklenen herhangi bir Databricks Runtime'da yürütülen Yapılandırılmış Akış sorguları için durum deposu bilgilerini okuyabilirsiniz. Aşağıdaki sözdizimini kullanın:

Piton

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

Scala

val df = spark.read
  .format("statestore")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Durum okuyucu API'leri seçenekleri ve şeması

Biçim seçeneklerinin statestore tam listesi için bkz. Durum deposu.

Çıkış verileri aşağıdaki şemaya sahiptir:

Sütun Tür Açıklama
key Yapı (durum anahtarından türetilen başka tür) Durum denetim noktasında, durumsal işleç kaydının anahtarı.
value Yapı (durum değerinden türetilen başka bir tür) Durum kontrol noktasında durum bilgisi olan işleç kaydının değeri.
partition_id Tamsayı Durum kontrol noktasının, durum bilgisi olan işleç kaydını içeren bölümü.

Databricks Runtime 16.4 LTS ve üzeri sürümlerin readChangeFeed seçeneği olarak trueayarlandığında çıkış verileri aşağıdaki şemaya sahiptir:

Sütun Tür Açıklama
batch_id Uzun Durum değişikliğinin ait olduğu toplu işlem kimliği.
change_type Dize Toplu işlem tarafından uygulanan değişiklik türü: ekleme ve güncelleştirmeler için update, silme işlemleri için delete.
key Yapı (durum anahtarından türetilen başka tür) Durum denetim noktasında, durumsal işleç kaydının anahtarı.
value Yapı (durum değerinden türetilen başka bir tür) Durum kontrol noktasında durum bilgisi olan işleç kaydının değeri. null, change_type değeri delete olan kayıtlar için.
partition_id Tamsayı Durum kontrol noktasının, durum bilgisi olan işleç kaydını içeren bölümü.

Bakınız read_statestore tablo değerli fonksiyon.

Structured Streaming durum değişikliklerini okuyun

Databricks Runtime 16.4 LTS ve üzerinde kullanılabilir. Durumun tek bir mikro gruptaki tam durumunu görüntülemek yerine mikro gruplar arasında nasıl değiştiğini okumak için readChangeFeed değerini true olarak ayarlayın ve changeStartBatchId belirtin. İsteğe bağlı olarak belirtin changeEndBatchId. Seçeneklerin tam listesi için bkz. Durum deposu.

Örneğin, 2 toplu işinden en son kaydedilen toplu işine kadar olan durum değişikliklerini okumak için:

Piton

df = (spark.read
  .format("statestore")
  .option("readChangeFeed", True)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")
)

Scala

val df = spark.read
  .format("statestore")
  .option("readChangeFeed", true)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_statestore(
    '<checkpointLocation>',
    readChangeFeed => true,
    changeStartBatchId => 2
);

Çıkış şeması ek batch_id ve change_type sütunlar içerir. Şemanın tamamı için bkz. Durum okuyucu API seçenekleri ve şeması.

Yapılandırılmış Akış durumu meta verilerini okuma

Databricks Runtime 14.3 LTS veya üzerinde kullanılabilir. Yapılandırılmış Akış sorguları için durum meta verileri bilgilerini okuyabilirsiniz:

Piton

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Döndürülen veriler aşağıdaki şemaya sahiptir:

Sütun Tür Açıklama
operatorId Tamsayı Durumlu akış işlecinin tamsayı kimliği.
operatorName Dize Durumlu akış operatörünün adı.
stateStoreName Dize Operatörün durum deposunun adı.
numPartitions Tamsayı Durum deposunun bölüm sayısı.
minBatchId Uzun Sorgulamak için mevcut olan en düşük toplu kimlik.
maxBatchId Uzun Durumu sorgulamak için kullanılabilir en yüksek toplu iş kimliği.

Not

minBatchId ve maxBatchId tarafından verilen yığın kimliği değerleri, denetim noktasının yazıldığı zamandaki durumu yansıtır. Eski toplu işlemler mikro toplu iş yürütmesi ile otomatik olarak temizlenir, bu nedenle burada sağlanan değerin hala kullanılabilir olması garanti değildir.

Bakınız read_state_metadata tablo değerli fonksiyon.

Örnek: Stream-stream join'un bir tarafını sorgulama

Stream-stream birleştirmesinin sol tarafını sorgulamak için aşağıdaki söz dizimini kullanın:

Piton

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Örnek: Durum bilgisi olan birden çok işleç içeren akış için sorgu durum deposu

Bu örneklerde durum bilgisi olan birden çok işleç içeren bir akış sorgusunun meta veri ayrıntılarını toplamak için durum meta veri okuyucusu kullanılır ve ardından meta veri sonuçları durum okuyucu için seçenekler olarak kullanılır.

Durum meta veri okuyucusu, aşağıdaki söz dizimi örneğinde olduğu gibi denetim noktası yolunu tek seçenek olarak alır:

Piton

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Aşağıdaki tablo, durum deposu meta verilerinin örnek çıkışını temsil eder:

operatorId operatörAdı DurumDeposuAdı numPartitions minBatchId maxBatchId
0 stateStoreSave varsayılan 200 0 13 (on üç)
1 dedupeWithinWatermark varsayılan 200 0 13 (on üç)

operatörün dedupeWithinWatermark sonuçlarını almak için, aşağıdaki örnekte olduğu gibi durum okuyucusunu operatorId seçeneğiyle sorgulayın:

Piton

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);