Aracılığıyla paylaş


CREATE STREAMING TABLE ... FLOW AUTO CDC

Şunlar için geçerlidir:onay işareti evet olarak işaretlenmiş Databricks SQL

Önemli

Bu özellik Beta sürümündedir. Databricks Runtime 17.3 ve üzerini gerektirir.

Kaynaktan FLOW AUTO CDC akış tablosuna değişiklik verisi yakalama (CDC) kayıtlarını işlemek için ile CREATE STREAMING TABLE yan tümcesini kullanın.

MERGE INTO Daha önce deyimi, Azure Databricks'te CDC kayıtlarını işlemek için yaygın olarak kullanılıyordu. Ancak, MERGE INTO sıra dışı kayıtlar nedeniyle yanlış sonuçlar verebilir veya kayıtları yeniden sıralamak için karmaşık mantık gerektirir.

AUTO CDC sıra dışı kayıtları otomatik olarak işleyerek CDC'yi basitleştirir. Kayıtları tanımlamak için anahtarlar, sıralama için bir sıra sütunu ve sonuçların SCD tür 1 (doğrudan güncelleştirmeler) veya SCD tür 2 (geçmiş izleme) olarak depolanıp depolanmayacağını belirtirsiniz.

Sözdizimi

CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

ve UPDATE olayları için INSERT varsayılan davranış, kaynaktan CDC olayları eklemektir: hedef tablodaki belirtilen anahtarlarla eşleşen satırları güncelleştirin veya hedef tabloda eşleşen bir kayıt olmadığında yeni bir satır ekleyin. DELETE olayları için işleme APPLY AS DELETE WHEN koşuluyla belirtilebilir.

Parametreler

  • source

    Verilerin kaynağı. Kaynak bir akış kaynağı olmalıdır. Akış semantiğini kullanarak kaynaktan okumak için STREAM anahtar sözcüğünü kullanın. Okuma işlemi var olan bir kayıtta bir değişiklik veya silme işlemiyle karşılaşırsa bir hata oluşur. Statik veya yalnızca ekleme kaynaklarından okumak en güvenlidir.

    Akış verileri hakkında daha fazla bilgi için bkz. İşlem hatları ile veri dönüştürme.

  • KEYS

    Kaynak verilerdeki bir satırı benzersiz olarak tanımlayan sütun veya sütun bileşimi. Bu sütunlardaki değerler, hedef tablodaki belirli kayıtlara hangi CDC olaylarının uygulanacağını belirlemek için kullanılır.

    Sütunların birleşimini tanımlamak için virgülle ayrılmış sütun listesi kullanın.

    Bu madde gereklidir.

  • IGNORE NULL UPDATES

    Hedef sütunların bir alt kümesini içeren güncelleştirmelerin alımına izin verir. CDC olayı mevcut bir satırla eşleştiğinde ve IGNORE NULL UPDATES belirtildiğinde, değeri olan null sütunlar hedefteki mevcut değerlerini korur. Bu, bir değeri olan null iç içe sütunlar için de geçerlidir.

    Bu yan tümce isteğe bağlıdır.

    Varsayılan ayar, null değerlerinin mevcut sütunların üzerine yazılmasıdır.

  • APPLY AS DELETE WHEN

    Bir CDC olayının ne zaman upsert yerine bir DELETE olarak ele alınacağı belirtir.

    SCD tip 2 kaynaklar için, sırası bozuk verileri işlemek amacıyla, silinen satır temel Delta tablosunda geçici olarak bir "mezar taşı" olarak tutulur ve meta veri deposunda bu "mezar taşlarını" filtreleyen bir görünüm oluşturulur. Bekletme aralığı pipelines.cdc.tombstoneGCThresholdInSeconds yapılandırılabilir.

    Bu yan tümce isteğe bağlıdır.

  • APPLY AS TRUNCATE WHEN

    Bir CDC olayının tam tablo TRUNCATEolarak ne zaman ele alınacağı belirtir. Bu yan tümce hedef tablonun tam kesilmesini tetiklediğinden, yalnızca bu işlevi gerektiren belirli kullanım örnekleri için kullanılmalıdır.

    APPLY AS TRUNCATE WHEN yan tümcesi yalnızca SCD türü 1 için desteklenir. SCD tür 2, kesme işlemini desteklemez.

    Bu yan tümce isteğe bağlıdır.

  • SEQUENCE BY

    Kaynak verilerdeki CDC olaylarının mantıksal sırasını belirten sütun adı. İşlem hattı işleme, sıralama dışı gelen değişiklik olaylarını işlemek için bu sıralamayı kullanır.

    Sıralama için birden çok sütun gerekiyorsa bir STRUCT ifade kullanın: önce ilk yapı alanına göre, sonra bir bağlama varsa ikinci alana göre sıralanır ve bu şekilde devam eder.

    Belirtilen sütunlar sıralanabilir veri türleri olmalıdır.

    Bu madde gereklidir.

  • COLUMNS

    Hedef tabloya eklenecek sütunların bir alt kümesini belirtir. Şunlardan birini yapabilirsiniz:

    • Eklenecek sütunların tam listesini belirtin: COLUMNS (userId, name, city).
    • Dışlanması gereken sütunların listesini belirtin: COLUMNS * EXCEPT (operation, sequenceNum)

    Bu yan tümce isteğe bağlıdır.

    Varsayılan değer, COLUMNS yan tümcesi belirtilmediğinde tüm sütunları hedef tabloya eklemektir.

  • STORED AS

    Kayıtların SCD türü 1 veya SCD tür 2 olarak depolanması.

    Bu yan tümce isteğe bağlıdır.

    Varsayılan değer SCD tür 1'dir.

  • TRACK HISTORY ON

    Belirtilen sütunlarda herhangi bir değişiklik olduğunda geçmiş kayıtları oluşturmak için çıkış sütunlarının bir alt kümesini belirtir. Şunlardan birini yapabilirsiniz:

    • İzlenen sütunların tam listesini belirtin: COLUMNS (userId, name, city).
    • İzlemenin dışında tutulacak sütunların listesini belirtin: COLUMNS * EXCEPT (operation, sequenceNum)

    Bu yan tümce isteğe bağlıdır. Varsayılan değer, herhangi bir değişiklik olduğunda tüm çıkış sütunlarının geçmişini izlemektir ve ile eşdeğerdir TRACK HISTORY ON *.

Örnekler

-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;

-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2;

-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);