Aracılığıyla paylaş


AUTO CDC INTO (işlem hatları)

Lakeflow Spark Bildirimli İşlem Hatları'nın değişiklik veri yakalama (CDC) işlevini kullanan bir akış oluşturmak için AUTO CDC ... INTO deyimini kullanın. Bu ifade, CDC kaynağındaki değişiklikleri okur ve bunları hedef akışa uygular.

Sözdizimi

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Diğer işlem hattı sorgularıyla aynı CONSTRAINT yan tümceyi kullanarak hedef için veri kalitesi kısıtlamaları tanımlarsınız. bkz. İşlem hattı beklentileriyle veri kalitesini yönetme.

INSERT ve UPDATE olayları için varsayılan davranış, kaynaktan UPDATE olaylarını güncelleştirmek veya eklemektir: hedef tabloda belirtilen anahtarla eşleşen satırları güncelleyin veya hedef tabloda eşleşen bir kayıt yoksa yeni bir satır ekleyin. DELETE olayları için işleme APPLY AS DELETE WHEN koşuluyla belirtilebilir.

Önemli

Değişiklikleri uygulamak için bir hedef akış tablosu bildirmeniz gerekir. İsteğe bağlı olarak hedef tablonuzun şemasını belirtebilirsiniz. SCD tür 2 tabloları için, hedef tablonun şemasını belirtirken, __START_AT sütunuyla aynı veri türüne sahip olan __END_AT ve sequence_by sütunlarını da eklemeniz gerekir.

Bkz AUTO CDC API'leri: İşlem hatlarıyla değişiklik verilerini yakalamayı basitleştirin.

Parametreler

  • flow_name

    Oluşturulacak akışın adı.

  • source

    Verilerin kaynağı. Kaynak bir akış kaynağı olmalıdır. Kaynaktan okumak üzere akış semantiğini kullanmak için STREAM anahtar sözcüğünü kullanın. Okuma işlemi var olan bir kayıtta bir değişiklik veya silme işlemiyle karşılaşırsa bir hata oluşur. Statik veya yalnızca ekleme kaynaklarından okumak en güvenlidir. Değişiklik taahhütleri içeren verileri içeri çekmek için Python'ı ve SkipChangeCommits seçeneğini hataları işlemek için kullanabilirsiniz.

    Akış verileri hakkında daha fazla bilgi için bkz. İşlem hatları ile veri dönüştürme.

  • KEYS

    Kaynak verilerdeki bir satırı benzersiz olarak tanımlayan sütun veya sütun bileşimi. Bu sütunlardaki değerler, hedef tablodaki belirli kayıtlara hangi CDC olaylarının uygulanacağını belirlemek için kullanılır.

    Sütunların birleşimini tanımlamak için virgülle ayrılmış sütun listesi kullanın.

    Bu ana tümce gereklidir.

  • IGNORE NULL UPDATES

    Hedef sütunların bir alt kümesini içeren güncelleştirmelerin alımına izin verir. CDC olayı mevcut bir satırla eşleştiğinde ve NULL GÜNCELLEŞTIRMELERİ YOKSAY belirtildiğinde, değeri null olan sütunlar hedefteki mevcut değerlerini korur. Bu, bir değeri olan null iç içe sütunlar için de geçerlidir.

    Bu yan tümce isteğe bağlıdır.

    Varsayılan ayar, null değerlerinin mevcut sütunların üzerine yazılmasıdır.

  • APPLY AS DELETE WHEN

    Bir CDC olayının ne zaman upsert yerine bir DELETE olarak ele alınacağı belirtir.

    SCD tip 2 kaynaklar için, sırası bozuk verileri işlemek amacıyla, silinen satır temel Delta tablosunda geçici olarak bir "mezar taşı" olarak tutulur ve meta veri deposunda bu "mezar taşlarını" filtreleyen bir görünüm oluşturulur. Bekletme aralığı pipelines.cdc.tombstoneGCThresholdInSeconds yapılandırılabilir.

    Bu yan tümce isteğe bağlıdır.

  • APPLY AS TRUNCATE WHEN

    Bir CDC olayının tam tablo TRUNCATEolarak ne zaman ele alınacağı belirtir. Bu yan tümce hedef tablonun tam kesilmesini tetiklediğinden, yalnızca bu işlevi gerektiren belirli kullanım örnekleri için kullanılmalıdır.

    APPLY AS TRUNCATE WHEN yan tümcesi yalnızca SCD türü 1 için desteklenir. SCD tür 2, kesme işlemini desteklemez.

    Bu yan tümce isteğe bağlıdır.

  • SEQUENCE BY

    Kaynak verilerdeki CDC olaylarının mantıksal sırasını belirten sütun adı. İşlem hattı işleme, sıralama dışı gelen değişiklik olaylarını işlemek için bu sıralamayı kullanır.

    Sıralama için birden çok sütun gerekiyorsa bir STRUCT ifade kullanın: önce ilk yapı alanına göre, sonra bir bağlama varsa ikinci alana göre sıralanır ve bu şekilde devam eder.

    Belirtilen sütunlar sıralanabilir veri türleri olmalıdır.

    Bu madde gereklidir.

  • COLUMNS

    Hedef tabloya eklenecek sütunların bir alt kümesini belirtir. Şunlardan birini yapabilirsiniz:

    • Eklenecek sütunların tam listesini belirtin: COLUMNS (userId, name, city).
    • Dışlanması gereken sütunların listesini belirtin: COLUMNS * EXCEPT (operation, sequenceNum)

    Bu yan tümce isteğe bağlıdır.

    Varsayılan değer, COLUMNS yan tümcesi belirtilmediğinde tüm sütunları hedef tabloya eklemektir.

  • STORED AS

    Kayıtların SCD türü 1 veya SCD tür 2 olarak depolanması.

    Bu yan tümce isteğe bağlıdır.

    Varsayılan değer SCD tür 1'dir.

  • TRACK HISTORY ON

    Belirtilen sütunlarda herhangi bir değişiklik olduğunda geçmiş kayıtları oluşturmak için çıkış sütunlarının bir alt kümesini belirtir. Şunlardan birini yapabilirsiniz:

    • İzlenen sütunların tam listesini belirtin: COLUMNS (userId, name, city).
    • İzlemenin dışında tutulacak sütunların listesini belirtin: COLUMNS * EXCEPT (operation, sequenceNum)

    Bu yan tümce isteğe bağlıdır. Varsayılan değer, herhangi bir değişiklik olduğunda tüm çıkış sütunlarının geçmişini izlemektir ve ile eşdeğerdir TRACK HISTORY ON *.

Örnekler

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);