Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
ETL işlem hatlarında akan verileri doğrulayan kalite kısıtlamaları uygulamak için beklentileri kullanın. Beklentiler, veri kalitesi ölçümleri hakkında daha fazla içgörü sağlar ve geçersiz kayıtları algılarken güncelleştirmelerde başarısız olmanıza veya kayıtları bırakmanıza olanak sağlar.
Bu makalede, söz dizimi örnekleri ve davranış seçenekleri de dahil olmak üzere beklentilere genel bir bakış sağlanır. Daha gelişmiş kullanım örnekleri ve önerilen en iyi yöntemler için bkz. Beklenti önerileri ve gelişmiş desenler.
Beklentiler nelerdir?
Beklentiler, sorgudan geçen her kayda veri kalitesi denetimleri uygulayan, işlem hattı üzerinden gerçekleştirilen maddi görünümler, akış tabloları ve görünüm oluşturma deyimlerinde isteğe bağlı şartlardır. Beklentiler, kısıtlamaları belirtmek için standart SQL Boole deyimlerini kullanır. Tek bir veri kümesi için birden çok beklentiyi birleştirebilir ve bir işlem hattındaki tüm veri kümesi bildirimlerinde beklentileri ayarlayabilirsiniz.
Aşağıdaki bölümlerde, bir beklentinin üç bileşeni tanıtılarak söz dizimi örnekleri sağlanır.
Beklenti adı
Her beklentinin, takip ve izleme için kullanılan bir tanımlayıcı olan bir adı olmalıdır. Doğrulanan ölçümleri bildiren bir ad seçin. Aşağıdaki örnek, yaş değerinin 0 ile 120 arasında olduğunu onaylamak için beklenti valid_customer_age
tanımlar:
Önemli
Belirli bir veri kümesi için bir beklenti adı benzersiz olmalıdır. Bir işlem hattındaki birden çok veri kümesinde beklentileri yeniden kullanabilirsiniz. bkz. Taşınabilir ve yeniden kullanılabilir beklentiler.
Piton
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Değerlendirme kısıtlaması
Constraint yan tümcesi, her kayıt için doğru veya yanlış olarak değerlendirilmesi gereken bir SQL koşullu deyimidir. Kısıtlama, doğrulanan şeyin gerçek mantığını içerir. Bir kayıt bu koşulda başarısız olduğunda, beklenen durum tetiklenir.
Kısıtlamalar geçerli SQL söz dizimini kullanmalıdır ve aşağıdakileri içeremez:
- Özel Python işlevleri
- Dış hizmet çağrıları
- Diğer tablolara başvuran alt sorgular
Aşağıda, veri kümesi oluşturma deyimlerine eklenebilen kısıtlamaların örnekleri verilmiştir:
Piton
Python'da kısıtlama söz dizimi şöyledir:
@dlt.expect(<constraint-name>, <constraint-clause>)
Birden çok kısıtlama belirtilebilir:
@dlt.expect(<constraint-name>, <constraint-clause>)
@dlt.expect(<constraint2-name>, <constraint2-clause>)
Örnekler:
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")
# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
SQL'de bir kısıtlamanın söz dizimi şöyledir:
CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> )
Birden çok kısıtlama virgülle ayrılmalıdır:
CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> ),
CONSTRAINT <constraint2-name> EXPECT ( <constraint2-clause> )
Örnekler:
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0),
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Geçersiz kayıt üzerinde işlem
Kayıt doğrulama denetiminde başarısız olduğunda ne olacağını belirlemek için bir eylem belirtmeniz gerekir. Aşağıdaki tabloda kullanılabilir eylemler açıklanmaktadır:
Eylem | SQL söz dizimi | Python söz dizimi | Sonuç |
---|---|---|---|
uyarı (varsayılan) | EXPECT |
dlt.expect |
Hedefe geçersiz kayıtlar yazılır. |
bırak | EXPECT ... ON VIOLATION DROP ROW |
dlt.expect_or_drop |
Veriler hedefe yazılmadan önce geçersiz kayıtlar atılır. Bırakılan kayıtların sayısı diğer veri kümesi ölçümleriyle birlikte günlüğe kaydedilir. |
başarısız | EXPECT ... ON VIOLATION FAIL UPDATE |
dlt.expect_or_fail |
Geçersiz kayıtlar güncelleştirmenin başarılı olmasını engelliyor. Yeniden işlemeden önce el ile müdahale gerekir. Bu beklenti tek bir akışın başarısız olmasına neden olur ve işlem hattınızdaki diğer akışların başarısız olmasına neden olmaz. |
Ayrıca, veri kaybetmeden veya süreci aksatmadan geçersiz kayıtları karantinaya almak için gelişmiş mantık uygulayabilirsiniz. Geçersiz kayıtları karantinaya alma sayfasını görün.
Beklentiler izleme ölçümleri
İşlem hattı kullanıcı arabiriminden warn
veya drop
eylemleri için izleme ölçümlerini görebilirsiniz.
fail
geçersiz bir kayıt algılandığında güncelleştirmenin başarısız olmasına neden olduğundan ölçümler kaydedilmez.
Beklenti ölçümlerini görüntülemek için aşağıdaki adımları tamamlayın:
- Azure Databricks çalışma alanınızın kenar çubuğunda İşler ve İşlem Hatları'na tıklayın.
- İşlem hattınızın adına tıklayın.
- Beklentileri tanımlanmış bir veri kümesine tıklayın.
- Sağ kenar çubuğunda Veri kalitesi sekmesini seçin.
Lakeflow Bildirimli İşlem Hatları olay günlüğünü sorgulayarak veri kalitesi ölçümlerini görüntüleyebilirsiniz. Olay günlüğünden veri kalitesini sorgulayın.
Geçersiz kayıtları tutma
Beklentilerin varsayılan davranışı geçersiz kayıtların korunmasıdır. Beklentiyi ihlal eden kayıtları tutmak ancak kısıtlamayı geçen veya başarısız olan kayıtların sayısını toplamak istediğinizde expect
işlecini kullanın. Beklentiyi ihlal eden kayıtlar, geçerli kayıtlarla birlikte hedef veri kümesine eklenir:
Piton
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Geçersiz kayıtları bırakma
Geçersiz kayıtların daha fazla işlenmesini önlemek için expect_or_drop
işlecini kullanın. Beklentiyi ihlal eden kayıtlar hedef veri kümesinden bırakılır:
Piton
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Geçersiz kayıtlarda başarısız ol
Geçersiz kayıtlar kabul edilemez olduğunda, expect_or_fail
işlecini kullanarak kayıt doğrulama başarısız olduğunda yürütmeyi hemen durdurun. İşlem bir tablo güncelleştirmesiyse, sistem atomik olarak işlemi geri alır:
Piton
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Önemli
İşlem hattında tanımlanmış birden çok paralel akışınız varsa, tek bir akışın başarısız olması diğer akışların başarısız olmasına neden olmaz.
Beklenen başarısız güncelleştirmelerle ilgili sorunları giderme
İşlem hattı, bir beklenti ihlali nedeniyle başarısız olduğunda, işlem hattını yeniden çalıştırmadan önce geçersiz verileri doğru şekilde işlemek için işlem hattı kodunu düzeltmeniz gerekir.
İşlem hatlarının başarısız olması için yapılandırılan beklentiler, ihlalleri algılamak ve bildirmek için gereken bilgileri izlemek için dönüşümlerinizin Spark sorgu planını değiştirir. Bu bilgileri hangi giriş kaydının birçok sorgu için ihlale neden olduğunu belirlemek için kullanabilirsiniz. Lakeflow Bildirimli İşlem Hatları, bu tür ihlalleri raporlamak için özel bir hata mesajı sağlar. Aşağıda bir beklenti ihlali hata iletisi örneği verilmişti:
[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false
Birden çok beklenti yönetimi
Uyarı
Hem SQL hem de Python tek bir veri kümesinde birden çok beklentiyi desteklese de, yalnızca Python birden çok beklentiyi gruplandırmanıza ve toplu eylemler belirtmenize olanak tanır.
Birden çok beklentiyi birlikte gruplandırabilir ve expect_all
, expect_all_or_drop
ve expect_all_or_fail
işlevlerini kullanarak toplu eylemleri belirtebilirsiniz.
Bu dekoratörler, bir Python sözlüğünü argüman olarak kabul eder; burada anahtar, beklenenin adı ve değer, beklenen sınırlamadır. İşlem hattınızdaki birden çok veri kümesinde aynı beklenti kümesini yeniden kullanabilirsiniz. Aşağıda, expect_all
Python işleçlerinin her birinin örnekleri gösterilmektedir:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset
Sınırlama
- Yalnızca akış tabloları ve gerçekleştirilmiş görünümler beklentileri desteklediği için veri kalitesi ölçümleri yalnızca bu nesne türleri için desteklenir.
- Veri kalitesi ölçümleri şu durumlarda kullanılamaz:
- Sorguda hiçbir beklenti tanımlanmamıştır.
- Akış, beklentileri desteklemeyen bir işleç kullanır.
- Lakeflow Bildirimli İşlem Hatları havuzları gibi akış türü beklentileri desteklemez.
- Belirli bir akış yürütmesi için ilişkili akış tablosunda veya oluşturulmuş görünümde güncelleme yoktur.
- İşlem hattı yapılandırması,
pipelines.metrics.flowTimeReporter.enabled
gibi ölçümleri yakalamak için gerekli ayarları içermez.
- Bazı durumlarda bir
COMPLETED
akış ölçüm içermeyebilir. Bunun yerine, ölçümler her bir mikro toplu işlemde, durumuflow_progress
olan birRUNNING
olayında bildirilir.