Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Büyük miktarda veriyle ilgilenirken, veri kümesinin tamamını yeniden işlemek yerine yalnızca yeni ve değiştirilmiş kayıtları işleyebilen bir işlem hattına ihtiyacınız vardır. Buna artımlı ETL adı verilir. Databricks SQL'de, yordam kodu yazmadan veya el ile yenilemeleri zamanlamadan akış tablolarını ve gerçekleştirilmiş görünümleri kullanarak artımlı ETL işlem hatları oluşturabilirsiniz.
Bu öğretici, yaygın bir modelde size yol gösterir: ürün değişikliklerini zaman içinde izleme. Bir kaynak tablo oluşturur, değişiklik olaylarını yakalar, her ürünün tam geçmişini koruyan bir boyut tablosu oluşturur ve üstüne bir toplam raporlama katmanı eklersiniz.
Bu öğreticideki temel özellik şudur: AUTO CDC. Geleneksel bir depoda, ekleme, güncelleme ve silme olaylarını hedef tabloya birleştirmek için karmaşık MERGE INTO deyimler yazmanız gerekir. Bu yaklaşım, özellikle olaylar sıra dışı olduğunda hataya açıktır.
AUTO CDC bunu sizin için halleder. İş anahtarını, sıralama sütununu ve SCD Tür 1 (yalnızca en son değer) veya SCD Tür 2 (tam geçmiş) isteyip istemediğinizi bildirirsiniz ve Azure Databricks doğru birleştirme mantığını otomatik olarak uygular. CDC'ye genel bakış için bkz . AUTO CDC API'leri: İşlem hatlarıyla değişiklik verilerini yakalamayı basitleştirme.
Bu dersin sonunda şunlara sahip olacaksınız:
- Değişiklik Veri Akışı ile değişiklikleri izleyen bir kaynak tablo oluşturuldu.
- CDC olay akışını anlamak için ham değişiklik verileri incelendi.
- Bu olaylardan bir SCD Tür 2 boyut tablosu oluşturmak için
AUTO CDCkullanıldı. - İşlem hattında silme olayları artımlı olarak işlendi.
- Artımlı olarak toplu raporu koruyan malzemeleşmiş bir görünüm oluşturuldu.
- Değişikliklerin işlem hattı üzerinden otomatik olarak yayılması için yapılandırıldı
SCHEDULE REFRESH EVERY 1 DAY.
Gereksinimler
Bu öğreticiyi tamamlamak için aşağıdaki gereksinimleri karşılamanız gerekir:
- Unity Kataloğu etkinleştirilmiş Azure Databricks çalışma alanı.
- SQL ambarı (sunucusuz veya profesyonel).
- İşlem kaynağı oluşturma veya işlem kaynağınaerişim iznine sahip olun.
- Hesabınız için sunucusuz işlem etkinleştirildi. Bkz. Sınırlı bölgesel kullanılabilirliğe sahip özellikler.
1. Adım: Kataloğunuzu ve şemanızı ayarlama
Databricks SQL düzenleyicisini açın ve çalışma kataloğunuzu ve şemanızı ayarlayın. Seçtiğiniz katalog ve şema için USE izniniz olmalıdır:
USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;
2. Adım: Kaynak tablo oluşturma ve verileri yükleme
products (CDF) etkin olarak tablosu oluşturun. CDF, her ekleme, güncelleştirme ve silme işlemini sorgulanabilir değişiklik günlüğü olarak kaydeden bir Delta Lake özelliğidir. Bu, bir işlem kaynağı sisteminden alınan bir CDC akışına benzer, ancak değişiklikler dış bir günlükten ziyade doğrudan Delta tablosu içinde yakalanır. Aşağı akış işlem hattının kullanacağı değişiklik olaylarını oluşturmak için burada CDF kullanırsınız.
Tabloyu oluşturun ve ilk kayıtları yükleyin:
CREATE OR REPLACE TABLE products ( product_id INT, product_name STRING, category STRING, warehouse STRING ) TBLPROPERTIES (delta.enableChangeDataFeed = true); INSERT INTO products VALUES (1, 'Spoon', 'Cutlery', 'Seattle'), (2, 'Fork', 'Cutlery', 'Portland'), (3, 'Knife', 'Cutlery', 'Denver'), (4, 'Chair', 'Furniture', 'Austin'), (5, 'Table', 'Furniture', 'Chicago'), (6, 'Lamp', 'Lighting', 'Boston'), (7, 'Mug', 'Kitchenware', 'Seattle'), (8, 'Plate', 'Kitchenware', 'Atlanta'), (9, 'Bowl', 'Kitchenware', 'Dallas'), (10, 'Glass', 'Kitchenware', 'Phoenix');Yeni ürünler, ambar taşıma ve kategori yeniden ataması dahil olmak üzere yukarı akış değişikliklerinin simülasyonunu yapın:
INSERT INTO products VALUES (11, 'Napkin', 'Dining', 'San Francisco'), (12, 'Coaster', 'Dining', 'New York'); UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1; UPDATE products SET category = 'Dining' WHERE product_id = 2;
3. Adım: Değişiklik veri akışını sorgulama
Aşağı akış işlem hattını oluşturmadan önce, AUTO CDC'ün neyi işleyeceğini anlayabilmek için ham değişiklik olaylarına bakmanız faydalı olur.
table_changes() işlevi CDF günlüğünü okur ve yakalanan her işlemi meta veri sütunlarıyla birlikte döndürür:
SELECT
product_id, product_name, warehouse,
_change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;
Örneğin, Spoon'un üç olayı vardır: bir insert (Seattle), bir update_preimage (Seattle) ve bir update_postimage (Los Angeles).
Tek bir mantıksal değişikliğin (örneğin, Spoon'un farklı bir ambara taşınması) birden çok olay ürettiğine dikkat edin: ön görüntü ve postimage. Geleneksel bir depoda, bu olayların tümünü bir hedef tabloda uyumlu hale getirmek, ekleme, güncelleme ve silme işlemlerini ayrı mantıklar kullanarak yönetmek ve olayların doğru sırayla uygulandığından emin olmak için bir MERGE deyimi yazarsınız. Bu karmaşıklığı, bir sonraki adımda ortadan kaldıran tam olarak budur AUTO CDC.
4. Adım: ile SCD Tür 2 boyutu oluşturma AUTO CDC
Önemli
AUTO CDC
Beta aşamasında. Databricks Runtime 17.3 veya üzerini gerektirir.
Akış tablosu verileri artımlı olarak işler. Her yenilemede, son çalıştırmadan bu yana yalnızca yeni satırları okur, bu nedenle tam veri kümesini yeniden işlemesi gerekmez. Bu, yüksek hacimli veya sık değişen kaynaklar için uygun hale getirir.
AUTO CDC akış tablosunun üzerine değişiklik verisi yakalama işlemi ekler. Eklemeleri, güncelleştirmeleri ve silmeleri el ile işleyen bir MERGE INTO deyimi yazmak yerine, iş anahtarını ve sıralama sütununu bildirir ve Azure Databricks doğru mantığı uygulamasına izin verirsiniz.
AUTO CDC ayrıca, dağıtılmış sistemlerden gelen olayları veya çakışan zaman damgalarıyla toplu yükleri işlemek için kullanılırken MERGE INTO sık karşılaşılan bir sorun olan sıra dışı olayları otomatik olarak işler.
Aşağıdaki deyim, her ürünün tam sürüm geçmişini koruyan bir SCD Type 2 tablosu oluşturur. Her sürüm __START_AT ve __END_AT zaman damgalarını alır.
NULL in __END_AT geçerli sürümü işaretler.
CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
-
SCHEDULE REFRESH EVERY 1 DAY: tabloyu günlük bir zamanlamaya göre yeniler. -
FLOW AUTO CDC: bunu CDC akışı olarak tanımlamaktadır. Azure Databricks ekleme, güncelleştirme ve silme semantiğini otomatik olarak uygular. -
KEYS (product_id): işletme anahtarı. Aynı anahtara sahip olaylar sürümlenmiş satırlar halinde birleştirilir. -
APPLY AS DELETE WHEN _change_type = 'delete': silme olayı geldiğinde geçerli sürümü kapatır. Bu, silme olayını tanımlayan koşulu tanımlamanızı sağlar. -
SEQUENCE BY _commit_timestamp: olay sıralamasını oluşturur. Sipariş dışı gelenleri doğru şekilde işler. -
STORED AS SCD TYPE 2: tüm geçmişi korur.AUTO CDChem SCD Tür 1'i hem de SCD Tür 2'i destekler.
Boyut tablosunu sorgula:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
- Kaşık: iki versiyon. Seattle (kapalı,
__END_ATayar) ve Los Angeles (güncel,__END_AT = NULL). - Çatal: iki sürüm. Çatal bıçak takımı kategorisi (kapalı) ve Yemek kategorisi (güncel).
- Peçete ve Coaster: her biri bir sürüm (yeni eklendi,
__END_AT = NULL). - Diğer tüm ürünler: birer sürüm (
__END_AT = NULL).
5. Adım: İşlem hattı üzerinden silme işlemlerini gerçekleştirme
Artık kaynak tablodan silerek iki sonlandırılan ürünün benzetimini yapılsın:
DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;
Bu silme olayları CDF günlüğüne kaydedilir, ancak akış tablosu bunları henüz görmedi. Yeni olayları işlemek için akış tablosunu yenileyin:
REFRESH STREAMING TABLE products_history;
Silmelerin uygulandığını doğrulamak için boyut tablosunu sorgula:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
Kase ve Cam artık __END_AT seti ile kapatıldı, sonlandırıldığı işaretlendi. Diğer tüm geçerli ürünler değişmeden kalır. Akış tablosu, önceki yenilemedeki eklemeleri ve güncelleştirmeleri yeniden işlemeden yalnızca yeni silme olaylarını işledi.
Adım 6: Karma maddeleştirilmiş görünüm oluşturma
Artık kaynak değişiklikleriyle güncel kalan bir boyut tablonuz olduğuna göre, en üste bir raporlama katmanı ekleyebilirsiniz.
Gerçekleştirilmiş görünüm, önceden hesaplanan sorgu sonuçlarını fiziksel tablo olarak depolar. Farklı olarak, normal bir görünümden okuma yaptığınızda sorguyu her seferinde yeniden yürüten, maddileştirilmiş bir görünüm sonuçları kalıcı olarak saklayarak, her yenilemede yalnızca yukarı akış değişikliklerinden etkilenen satırları yeniden hesaplar. Bu, sorgu performansının önemli olduğu panolar ve raporlar için uygun hale getirir.
CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
category,
COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;
SCHEDULE REFRESH EVERY 1 DAY bu görünümün günlük zamanlamaya göre yenilendiğini gösterir. Akış tablosunda aynı zamanlamayla birlikte, artık kaynak tablodaki değişikliklerin boyut boyunca art arda ve her yenileme döngüsündeki toplamaya dönüştürüldüğü üç aşamalı bir işlem hattına sahipsiniz. Çalıştırılacak manuel yenileme yok.
SELECT * FROM products_by_category ORDER BY active_products DESC;
7. Adım: Uçtan uca kaskad doğrulaması
İşlem hattının tamamını art arda doğrulamak için kaynak tabloda bir değişiklik yapın:
UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;
Bıçak Denver'dan Seattle'a taşınır. Bu tek DML değişikliği, üç aşamanın birlikte nasıl çalıştığını gösteren tam bir işlem hattı kaskadını tetikler.
-
productsdeğişiklik olayını CDF aracılığıyla kaydeder. -
products_historyolayı işler ve Knife için yeni bir sürüm ekler. -
products_by_categoryyalnızca etkilenen Cutlery satırını yeniden hesaplar.
Doğrula
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;
SELECT * FROM products_by_category ORDER BY active_products DESC;
Temizleme
Bu öğretici tarafından oluşturulan kaynakları temizlemek için aşağıdaki SQL'i kullanın:
DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;