Aracılığıyla paylaş


Dış RDBMS tablosunu AUTO CDC kullanarak çoğaltma

Bu sayfa, veri boru hattında AUTO CDC API kullanarak dış bir ilişkisel veritabanı yönetim sistemi (RDBMS) tablosunun Azure Databricks'e nasıl çoğaltılacağını size gösterir. Öğreneceksiniz:

  • Kaynakları ayarlamak için yaygın modeller.
  • Mevcut verilerin once akışını kullanarak bir kerelik tam kopyasını gerçekleştirme.
  • Yeni değişiklikleri sürekli almak için bir change akışı nasıl kullanılır?

Bu düzen, yavaş değişen boyut (SCD) tabloları oluşturmak veya hedef tabloyu dış kayıt sistemiyle eşitlenmiş durumda tutmak için idealdir.

Başlamadan önce

Bu kılavuzda, kaynağınızdan aşağıdaki veri kümelerine erişiminiz olduğu varsayılır:

  • Bulut depolamadaki kaynak tablonun tam anlık görüntüsü. Bu veri kümesi ilk yük için kullanılır.
  • Aynı bulut depolama konumuna (örneğin, Debezium, Kafka veya günlük tabanlı CDC kullanılarak) doldurulan sürekli bir değişiklik akışı. Bu akış, devam eden AUTO CDC işlemin girişidir.

Kaynak görünümleri ayarlama

İlk olarak, hedef tabloyu bir bulut depolama yolundan rdbms_ordersorders_snapshot_pathdoldurmak için iki kaynak görünümü tanımlayın. Her ikisi de bulut depolamadaki ham veriler üzerinde akış görünümleri olarak oluşturulur. Verilerin AUTO CDC işleminde kullanılmadan önce yazılmasını gerektirmeyen veritabanı görünümlerinin kullanılması daha yüksek verimlilik sağlar.

  • İlk kaynak görünümü tam anlık görüntüdür (full_orders_snapshot)
  • İkincisi sürekli değişiklik akışıdır (rdbms_orders_change_feed).

Bu kılavuzdaki örneklerde kaynak olarak bulut depolama kullanılır, ancak akış tabloları tarafından desteklenen herhangi bir kaynağı kullanabilirsiniz.

full_orders_snapshot()

Bu adım, sipariş verilerinin ilk tam anlık görüntüsünü okuyan bir görünüme sahip bir işlem hattı oluşturur.

Piton

Aşağıdaki Python örneği:

  • spark.readStream Otomatik Yükleyici (format("cloudFiles")) ile kullanır
  • Tarafından tanımlanan bir dizinden JSON dosyalarını okur orders_snapshot_path
  • includeExistingFiles Belirtilen yolda zaten mevcut olan geçmiş verilerin işlendiğinden emin olmak için true ayarlar.
  • inferColumnTypes şemayı otomatik olarak çıkaracak şekilde true ayarlar
  • Tüm sütunları .select("\*") ile döndürür.
@dp.view()
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(orders_snapshot_path)
        .select("*")
    )

SQL

Aşağıdaki SQL örneği, seçenekleri dize anahtar-değer çiftlerinin haritası olarak geçirir. orders_snapshot_path bir SQL değişkeni olarak kullanılabilir olmalıdır (örneğin, işlem hattı parametreleri kullanarak tanımlanmış veya manuel olarak interpolate edilmiştir).

CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
  "cloudFiles.includeExistingFiles", "true",
  "cloudFiles.inferColumnTypes", "true"
));

rdbms_orders_change_feed()

Bu adım, artımlı değişiklik verilerini (örneğin CDC günlüklerinden veya değişiklik tablolarından) okuyan ikinci bir görünüm oluşturur. Bu dosyadan orders_cdc_path okur ve CDC stilindeki JSON dosyalarının düzenli olarak bu yola bırakıldığını varsayar.

Piton

@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

SQL

Aşağıdaki SQL örneğinde ${orders_cdc_path} bir değişkendir ve işlem hattı ayarlarınızda bir değer ayarlanarak veya kodunuzda açıkça bir değişken ayarlanarak ilişkilendirme yapılabilir.

CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));

İlk hidrasyon (ilk akış)

Kaynaklar ayarlandıktan sonra mantık, AUTO CDC her iki kaynağı da bir hedef akış tablosuyla birleştirir. İlk olarak, AUTO CDC ve ONCE=TRUE tek seferlik bir işlem akışı kullanarak RDBMS tablosunun tüm içeriğini bir akış tablosuna kopyalayın. Bu, hedef tabloyu gelecekteki güncelleştirmelerde yeniden yürütmeye gerek kalmadan geçmiş verilerle hazırlar.

Piton

from pyspark import pipelines as dp

# Step 1: Create the target streaming table

dp.create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

dp.create_auto_cdc_flow(
  flow_name = "initial_load_orders",
  once = True,  # one-time load
  target = "rdbms_orders",
  source = "full_orders_snapshot",  # e.g., ingested from JDBC into bronze
  keys = ["order_id"],
  sequence_by = "timestamp",
  stored_as_scd_type = "1"
)

SQL


-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;

-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

Akış once yalnızca bir kez yürütülür. İşlem hattı oluşturulduktan sonra full_orders_snapshot öğeye eklenen yeni dosyalar göz ardı edilir.

Önemli

Akış tablosunun rdbms_orders tam yenilenmesi, once akışını yeniden çalıştırır. Bulut depolamadaki ilk anlık görüntü verileri kaldırıldıysa, bu veri kaybına neden olur.

Sürekli değişiklik akışı (değişiklik akışı)

İlk anlık görüntü yüklendikten sonra, RDBMS'nin CDC beslemesindeki değişiklikleri sürekli olarak almak için başka bir AUTO CDC akışı kullanın. Bu, tablonuzu rdbms_orders eklemeler, güncelleştirmeler ve silmelerle güncel tutar.

Piton

from pyspark import pipelines as dp

# Step 3: Change Flow — Ingest ongoing CDC stream from source system

dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

SQL

-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

Değerlendirmeler

Yedek dolum eşzamanlılığı Akış once yalnızca hedef tablo tamamen yenilendiğinde yeniden çalışır.
Birden çok akış Düzeltmelerde, geç gelen verilerde veya alternatif akışlarda birleştirmek için birden çok değişiklik akışı kullanabilirsiniz, ancak tümünün bir şemayı ve anahtarları paylaşması gerekir.
Tam yenileme Akış tablosundaki rdbms_orders tam yenileme, once akışını yeniden çalıştırır. İlk bulut depolama konumu ilk anlık görüntü verilerinin ayıklamasını yapmışsa bu durum veri kaybına neden olabilir.
Akış yürütme sırası Akış yürütme sırası önemli değildir. Sonuç aynıdır.

Ek kaynaklar