Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
İşlev, create_auto_cdc_from_snapshot_flow veritabanı anlık görüntülerinden kaynak verilerini işlemek için Lakeflow Spark Bildirimsel İşlem Hatları'nın veri yakalama (CDC) işlevselliğini kullanan bir akış oluşturur. Bkz. CDC API ile AUTO CDC FROM SNAPSHOT nasıl uygulanır?.
Uyarı
Bu işlev önceki işlevinin apply_changes_from_snapshot()yerini alır. İki işlev aynı imzaya sahiptir. Databricks, yeni adı kullanmak için güncelleştirme önerir.
Önemli
Bu işlem için bir hedef akış tablonuz olmalıdır. Gerekli hedef tabloyu oluşturmak için create_streaming_table() işlevini kullanabilirsiniz. Aynı akış tablosunu hem create_auto_cdc_from_snapshot_flow() hem de create_auto_cdc_flow() ile hedefleyemezsiniz.
Sözdizimi
from pyspark import pipelines as dp
dp.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Uyarı
İşleme için AUTO CDC FROM SNAPSHOT varsayılan davranış, hedefte aynı anahtarlara sahip eşleşen bir kayıt olmadığında yeni bir satır eklemektir. Eşleşen bir kayıt varsa, yalnızca satırdaki değerlerden herhangi biri değişmişse güncelleştirilir. Hedefte anahtarları bulunan ancak kaynakta artık bulunmayan satırlar silinir.
Anlık görüntülerle CDC işleme hakkında daha fazla bilgi edinmek için bkz. AUTO CDC API'leri: İşlem hatları ile değişiklik verilerini yakalamayı basitleştirme. işlevini kullanma create_auto_cdc_from_snapshot_flow() örnekleri için düzenli anlık görüntü alımı ve geçmiş anlık görüntü alımı örneklerine bakın.
Parametreler
| Parametre | Türü | Description |
|---|---|---|
target |
str |
Gerekli. Güncelleştirilecek tablonun adı.
İşlevi yürütmeden create_auto_cdc_from_snapshot_flow() önce hedef tabloyu oluşturmak için create_streaming_table() işlevini kullanabilirsiniz. |
source |
str veya lambda function |
Gerekli. Bir tablonun adı veya düzenli aralıklarla anlık görüntülenecek görünüm ya da işlenecek anlık görüntü DataFrame'i ve anlık görüntü sürümünü döndüren bir Python lambda işlevi. Bkz. Bağımsız değişkeni uygulamasource. |
keys |
list |
Gerekli. Kaynak verilerdeki bir satırı benzersiz olarak tanımlayan sütun veya sütun bileşimi. Bu, hedef tablodaki belirli kayıtlara hangi CDC olaylarının uygulanacağını belirlemek için kullanılır. Şunları belirtebilirsiniz:
|
stored_as_scd_type |
str veya int |
Kayıtların SCD türü 1 veya SCD tür 2 olarak depolanması.
1 SCD türü 1 veya 2 SCD türü 2 için olarak ayarlayın. Varsayılan değer SCD tür 1'dir. |
track_history_column_list veya track_history_except_column_list |
list |
Hedef tablodaki geçmiş için izlenecek çıktı sütunlarının bir alt kümesi. İzlenecek sütunların tam listesini belirtmek için kullanın track_history_column_list . İzlemenin dışında tutulacak sütunları belirtmek için kullanın track_history_except_column_list . Değeri dize listesi olarak veya Spark SQL col() işlevleri olarak bildirebilirsiniz:
İşlevlere yönelik col() bağımsız değişkenler niteleyici içeremez. Örneğin, kullanabilirsiniz col(userId), ancak kullanamazsınız col(source.userId). Varsayılan değer, işleve hiçbir track_history_column_list veya track_history_except_column_list bağımsız değişken geçirilmediğinde hedef tabloya tüm sütunları eklemektir. |
source bağımsız değişkenini uygula
create_auto_cdc_from_snapshot_flow() işlevi, source bağımsız değişkenini içerir. Geçmiş anlık görüntüleri işlemek için, source bağımsız değişkeninin, işlenecek anlık görüntü verilerini içeren bir Python DataFrame ve bir anlık görüntü sürümü olmak üzere iki değer döndüren bir Python lambda işlevi olması beklenir.
Lambda işlevinin imzası aşağıdadır:
lambda Any => Optional[(DataFrame, Any)]
- Lambda işlevine verilen bağımsız değişken, en son işlenmiş anlık görüntü sürümüdür.
- Lambda işlevinin dönüş değeri veya iki değerden oluşan bir tanımlama grubudur
None: Tanımlama grubunun ilk değeri, işlenecek anlık görüntüyü içeren bir DataFrame'dir. Öbeğin ikinci değeri, anlık görüntünün mantıksal düzenini temsil eden anlık görüntü sürümüdür.
Lambda işlevini uygulayan ve çağıran bir örnek:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)
Lakeflow Spark Bildirimli İşlem Hatları çalışma zamanı, işlevi içeren create_auto_cdc_from_snapshot_flow() işlem hattı her tetiklendiğinde aşağıdaki adımları gerçekleştirir:
- Sonraki anlık görüntüdeki DataFrame'i ve buna karşılık gelen anlık görüntü sürümünü yüklemek için
next_snapshot_and_versionişlevini çalıştırır. - DataFrame döndürülmezse çalıştırma sonlandırılır ve işlem hattı güncelleştirmesi tamamlandı olarak işaretlenir.
- Yeni anlık görüntüdeki değişiklikleri algılar ve bunları hedef tabloya artımlı olarak uygular.
- Sonraki anlık görüntüyü ve sürümünü yüklemek için 1. adıma döner.