Aracılığıyla paylaş


create_auto_cdc_from_snapshot_flow

İşlev, create_auto_cdc_from_snapshot_flow veritabanı anlık görüntülerinden kaynak verilerini işlemek için Lakeflow Spark Bildirimsel İşlem Hatları'nın veri yakalama (CDC) işlevselliğini kullanan bir akış oluşturur. Bkz. CDC API ile AUTO CDC FROM SNAPSHOT nasıl uygulanır?.

Uyarı

Bu işlev önceki işlevinin apply_changes_from_snapshot()yerini alır. İki işlev aynı imzaya sahiptir. Databricks, yeni adı kullanmak için güncelleştirme önerir.

Önemli

Bu işlem için bir hedef akış tablonuz olmalıdır. Gerekli hedef tabloyu oluşturmak için create_streaming_table() işlevini kullanabilirsiniz. Aynı akış tablosunu hem create_auto_cdc_from_snapshot_flow() hem de create_auto_cdc_flow() ile hedefleyemezsiniz.

Sözdizimi

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Uyarı

İşleme için AUTO CDC FROM SNAPSHOT varsayılan davranış, hedefte aynı anahtarlara sahip eşleşen bir kayıt olmadığında yeni bir satır eklemektir. Eşleşen bir kayıt varsa, yalnızca satırdaki değerlerden herhangi biri değişmişse güncelleştirilir. Hedefte anahtarları bulunan ancak kaynakta artık bulunmayan satırlar silinir.

Anlık görüntülerle CDC işleme hakkında daha fazla bilgi edinmek için bkz. AUTO CDC API'leri: İşlem hatları ile değişiklik verilerini yakalamayı basitleştirme. işlevini kullanma create_auto_cdc_from_snapshot_flow() örnekleri için düzenli anlık görüntü alımı ve geçmiş anlık görüntü alımı örneklerine bakın.

Parametreler

Parametre Türü Description
target str Gerekli. Güncelleştirilecek tablonun adı. İşlevi yürütmeden create_auto_cdc_from_snapshot_flow() önce hedef tabloyu oluşturmak için create_streaming_table() işlevini kullanabilirsiniz.
source str veya lambda function Gerekli. Bir tablonun adı veya düzenli aralıklarla anlık görüntülenecek görünüm ya da işlenecek anlık görüntü DataFrame'i ve anlık görüntü sürümünü döndüren bir Python lambda işlevi. Bkz. Bağımsız değişkeni uygulamasource.
keys list Gerekli. Kaynak verilerdeki bir satırı benzersiz olarak tanımlayan sütun veya sütun bileşimi. Bu, hedef tablodaki belirli kayıtlara hangi CDC olaylarının uygulanacağını belirlemek için kullanılır. Şunları belirtebilirsiniz:
  • Dizelerin listesi: ["userId", "orderId"]
  • Spark SQL col() işlevlerinin listesi: [col("userId"), col("orderId"]. İşlevlere yönelik col() bağımsız değişkenler niteleyici içeremez. Örneğin, kullanabilirsiniz col(userId), ancak kullanamazsınız col(source.userId).
stored_as_scd_type str veya int Kayıtların SCD türü 1 veya SCD tür 2 olarak depolanması. 1 SCD türü 1 veya 2 SCD türü 2 için olarak ayarlayın. Varsayılan değer SCD tür 1'dir.
track_history_column_list veya track_history_except_column_list list Hedef tablodaki geçmiş için izlenecek çıktı sütunlarının bir alt kümesi. İzlenecek sütunların tam listesini belirtmek için kullanın track_history_column_list . İzlemenin dışında tutulacak sütunları belirtmek için kullanın track_history_except_column_list . Değeri dize listesi olarak veya Spark SQL col() işlevleri olarak bildirebilirsiniz:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

İşlevlere yönelik col() bağımsız değişkenler niteleyici içeremez. Örneğin, kullanabilirsiniz col(userId), ancak kullanamazsınız col(source.userId). Varsayılan değer, işleve hiçbir track_history_column_list veya track_history_except_column_list bağımsız değişken geçirilmediğinde hedef tabloya tüm sütunları eklemektir.

source bağımsız değişkenini uygula

create_auto_cdc_from_snapshot_flow() işlevi, source bağımsız değişkenini içerir. Geçmiş anlık görüntüleri işlemek için, source bağımsız değişkeninin, işlenecek anlık görüntü verilerini içeren bir Python DataFrame ve bir anlık görüntü sürümü olmak üzere iki değer döndüren bir Python lambda işlevi olması beklenir.

Lambda işlevinin imzası aşağıdadır:

lambda Any => Optional[(DataFrame, Any)]
  • Lambda işlevine verilen bağımsız değişken, en son işlenmiş anlık görüntü sürümüdür.
  • Lambda işlevinin dönüş değeri veya iki değerden oluşan bir tanımlama grubudur None : Tanımlama grubunun ilk değeri, işlenecek anlık görüntüyü içeren bir DataFrame'dir. Öbeğin ikinci değeri, anlık görüntünün mantıksal düzenini temsil eden anlık görüntü sürümüdür.

Lambda işlevini uygulayan ve çağıran bir örnek:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Lakeflow Spark Bildirimli İşlem Hatları çalışma zamanı, işlevi içeren create_auto_cdc_from_snapshot_flow() işlem hattı her tetiklendiğinde aşağıdaki adımları gerçekleştirir:

  1. Sonraki anlık görüntüdeki DataFrame'i ve buna karşılık gelen anlık görüntü sürümünü yüklemek için next_snapshot_and_version işlevini çalıştırır.
  2. DataFrame döndürülmezse çalıştırma sonlandırılır ve işlem hattı güncelleştirmesi tamamlandı olarak işaretlenir.
  3. Yeni anlık görüntüdeki değişiklikleri algılar ve bunları hedef tabloya artımlı olarak uygular.
  4. Sonraki anlık görüntüyü ve sürümünü yüklemek için 1. adıma döner.