Aracılığıyla paylaş


create_auto_cdc_flow

İşlev, create_auto_cdc_flow() değişiklik veri akışından (CDF) kaynak verileri işlemek için Lakeflow Spark Deklaratif İşlem Hatları değişiklik verisi yakalama (CDC) işlevini kullanan bir akış oluşturur.

Uyarı

Bu işlev önceki işlevinin apply_changes()yerini alır. İki işlev aynı imzaya sahiptir. Databricks, yeni adı kullanmak için güncelleştirme önerir.

Önemli

Değişiklikleri uygulamak için bir hedef akış tablosu bildirmeniz gerekir. İsteğe bağlı olarak hedef tablonuzun şemasını belirtebilirsiniz. Hedef tablonun şemasını create_auto_cdc_flow() belirtirken, __START_AT ve __END_AT sütunlarını, sequence_by alanlarıyla aynı veri türüne sahip olacak şekilde eklemeniz gerekir.

Gerekli hedef tabloyu oluşturmak için işlem hattı Python arabiriminde create_streaming_table() işlevini kullanabilirsiniz.

Sözdizimi

from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = <bool>,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None,
  name = None,
  once = <bool>
)

İşleme için create_auto_cdc_flow ve INSERT olayları için UPDATE varsayılan davranış, kaynaktan CDC olayları eklemektir: hedef tablodaki belirtilen anahtarlarla eşleşen satırları güncelleştirin veya hedef tabloda eşleşen bir kayıt olmadığında yeni bir satır ekleyin. Olaylar için DELETE işlemi apply_as_deletes parametresi ile belirtilebilir.

Değişiklik akışıyla CDC işleme hakkında daha fazla bilgi edinmek için bkz. AUTO CDC API'leri: İşlem hatları ile değişiklik verilerini yakalamayı basitleştirme. işlevini kullanma create_auto_cdc_flow() örneği için bkz . Örnek: SCD tür 1 ve SCD tür 2, CDF kaynak verileriyle işleniyor.

Parametreler

Parametre Türü Description
target str Gerekli. Güncelleştirilecek tablonun adı. İşlevi yürütmeden create_auto_cdc_flow() önce hedef tabloyu oluşturmak için create_streaming_table() işlevini kullanabilirsiniz.
source str Gerekli. CDC kayıtlarını içeren veri kaynağı.
keys list Gerekli. Kaynak verilerdeki bir satırı benzersiz olarak tanımlayan sütun veya sütun bileşimi. Bu, hedef tablodaki belirli kayıtlara hangi CDC olaylarının uygulanacağını belirlemek için kullanılır. Şunları belirtebilirsiniz:
  • Dizelerin listesi: ["userId", "orderId"]
  • Spark SQL col() işlevlerinin listesi: [col("userId"), col("orderId")]. İşlevlere yönelik col() bağımsız değişkenler niteleyici içeremez. Örneğin, kullanabilirsiniz col(userId), ancak kullanamazsınız col(source.userId).
sequence_by str, col() veya struct() Gerekli. Kaynak verilerdeki CDC olaylarının mantıksal sırasını belirten sütun adları. Lakeflow Spark Bildirimli İşlem Hatları, sıra dışı gelen değişiklik olaylarını işlemek için bu sıralamayı kullanır. Belirtilen sütun sıralanabilir bir veri türü olmalıdır. Şunları belirtebilirsiniz:
  • Dize: "sequenceNum"
  • Spark SQL col() işlevi: col("sequenceNum"). İşlevlere yönelik col() bağımsız değişkenler niteleyici içeremez. Örneğin, kullanabilirsiniz col(userId), ancak kullanamazsınız col(source.userId).
  • struct() Bağlamaları kesmek için birden çok sütunu birleştiren: struct("timestamp_col", "id_col"), önce ilk yapı alanına göre, sonra bir bağlama varsa ikinci alana göre sıralanır ve bu şekilde devam eder.
ignore_null_updates bool Hedef sütunların bir alt kümesini içeren güncelleştirmelerin alımına izin verin. Bir CDC olayı mevcut bir satırla eşleştiğinde ve ignore_null_updates olduğunda, True sütunları hedefteki mevcut değerlerini korur. Bu, değeri nullolan iç içe sütunlar için de geçerlidir. ignore_null_updates False olduğunda, mevcut değerler null değerleriyle üzerine yazılır.
Varsayılan değer: False.
apply_as_deletes str veya expr() Bir CDC olayının ne zaman upsert yerine bir DELETE olarak ele alınacağı belirtir. Şunları belirtebilirsiniz:
  • Dize: "Operation = 'DELETE'"
  • Spark SQL expr() işlevi: expr("Operation = 'DELETE'")

Sıra dışı verileri işlemek için, silinen satır geçici olarak temel Delta Tablosu'nda bir mezar taşı olarak tutulur ve meta veri deposunda mezar taşlarını filtreleyen bir görünüm oluşturulur. Bekletme aralığı varsayılan olarak iki gündür ve tablo özelliğiyle pipelines.cdc.tombstoneGCThresholdInSeconds yapılandırılabilir.
apply_as_truncates str veya expr() Bir CDC olayının tam tablo TRUNCATEolarak ne zaman ele alınacağı belirtir. Şunları belirtebilirsiniz:
  • Dize: "Operation = 'TRUNCATE'"
  • Spark SQL expr() işlevi: expr("Operation = 'TRUNCATE'")

Bu yan tümce hedef tablonun tam kesilmesini tetiklediğinden, yalnızca bu işlevi gerektiren belirli kullanım örnekleri için kullanılmalıdır. apply_as_truncates parametresi yalnızca SCD türü 1 için desteklenir. SCD Tip 2, kısaltma işlemlerini desteklemez.
column_list veya except_column_list list Hedef tabloya eklenecek sütunların alt kümesi. Eklenecek sütunların tam listesini belirtmek için kullanın column_list . Dışlanması gereken sütunları belirtmek için kullanın except_column_list . Değeri dize listesi olarak veya Spark SQL col() işlevleri olarak bildirebilirsiniz:
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

İşlevlere yönelik col() bağımsız değişkenler niteleyici içeremez. Örneğin, kullanabilirsiniz col(userId), ancak kullanamazsınız col(source.userId). Varsayılan değer, işleve hiçbir column_list veya except_column_list bağımsız değişken geçirilmediğinde hedef tabloya tüm sütunları eklemektir.
stored_as_scd_type str veya int Kayıtların SCD türü 1 veya SCD tür 2 olarak depolanması. 1 SCD türü 1 veya 2 SCD türü 2 için olarak ayarlayın. Varsayılan değer SCD tür 1'dir.
track_history_column_list veya track_history_except_column_list list Hedef tablodaki geçmiş için izlenecek çıktı sütunlarının bir alt kümesi. İzlenecek sütunların tam listesini belirtmek için kullanın track_history_column_list . İzlemenin dışında tutulacak sütunları belirtmek için kullanın track_history_except_column_list . Değeri dize listesi olarak veya Spark SQL col() işlevleri olarak bildirebilirsiniz:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

İşlevlere yönelik col() bağımsız değişkenler niteleyici içeremez. Örneğin, kullanabilirsiniz col(userId), ancak kullanamazsınız col(source.userId). Varsayılan değer, işleve hiçbir track_history_column_list veya track_history_except_column_list bağımsız değişken geçirilmediğinde hedef tabloya tüm sütunları eklemektir.
name str Akış adı. Sağlanmazsa, varsayılan olarak target ile aynı değeri kullanır.
once bool İsteğe bağlı olarak, akışı yedek doldurma gibi tek seferlik bir akış olarak tanımlayın. Kullanımı once=True , akışı iki şekilde değiştirir:
  • Geri dönüş değeri. streaming-query. Bu durumda, bir akış DataFrame değil, bir yığın DataFrame olmalıdır.
  • Akış varsayılan olarak bir kez çalıştırılır. Eğer işlem hattı eksiksiz bir yenilemeyle güncellenirse, ONCE akış verileri yeniden oluşturmak için tekrar çalıştırılır.