Aracılığıyla paylaş


Lakeflow Spark Bildirimli İşlem Hatları için en iyi yöntemler

Bu sayfada, Lakeflow Spark Bildirimli İşlem Hatları ile işlem hatlarını tasarlamak, oluşturmak ve çalıştırmak için önerilen desenler açıklanmaktadır. Yeni bir işlem hattı başlatırken veya mevcut bir işlem hattını geliştirirken bu yönergeleri uygulayın.

Doğru veri kümesi türünü seçin

Lakeflow Spark Bildirimli İşlem Hatları üç veri kümesi türü sunar: akış tabloları, gerçekleştirilmiş görünümler ve geçici görünümler. İşlem hattınızın her katmanı için doğru türün seçilmesi gereksiz işlem maliyetlerini önler ve kodunuzun kolayca akılda kalmasını sağlar.

Akış tabloları , veri alımı ve düşük gecikme süreli akış dönüştürmeleri için doğru seçimdir. Her giriş satırı yalnızca bir kez okunur ve işlenir. Bu sayede bulut depolama veya ileti veri yollarından yalnızca ekleme iş yükleri, yüksek hacimli veriler ve olay odaklı işleme için idealdir.

Gerçekleştirilmiş görünümler , karmaşık dönüşümler ve analitik sorgular için doğru seçimdir. Sonuçları önceden hesaplanır ve artımlı yenileme kullanılarak güncel tutulur, bu nedenle bunlara yönelik sorgular hızlı olur. Gerçekleştirilmiş görünümdeki verileri doğrudan değiştiremezsiniz; sorgu tanımı çıkışı denetler.

Geçici görünümler, depolamaya veri aktarmadan dönüştürme mantığınızı düzenleyen işlem hattı kapsamına sahip görünümlerdir. Bunları kendi tablosuna ihtiyaç duymadan ara adımlar için kullanın.

Aşağıdaki tabloda her türün ne zaman kullanılacağı özetlemektedir:

Kullanım örneği Önerilen tür Nedeni
Bulut depolamadan veya ileti veri yolu'ndan alma Akış tablosu Her kaydı bir kez işler; yüksek hacimli ve yalnızca ekleme iş yüklerini işler.
CDC akışları (ekler, güncelleştirmeler, silmeler) Akış tablosu Sıralı ve çiftlenmiş verileri temizlenmiş CDC alımı için hedef olarak APPLY CHANGES INTO kullanılır.
Karmaşık toplamalar ve birleştirmeler Gerçekleştirilmiş görünüm Kademeli olarak güncellendi; her güncellemede tam hesaplama işlemini önler.
Pano sorgusu hızlandırma Gerçekleştirilmiş görünüm Önceden hesaplanan sonuçlar sorguları ham tablolara göre daha hızlı hale getirir.
Ara dönüşümler (aşağı akış okuyucu yok) Geçici görünüm depolama maliyetine neden olmadan işlem hattı mantığını düzenler.

Daha fazla bilgi için bkz . Akış tabloları, Gerçekleştirilmiş görünümler ve Lakeflow Spark Bildirimli İşlem Hatları kavramları.

Kesinlik temelli MERGE yerine bildirim temelli CDC kullanma

Kesinlik temelli SQL MERGE deyimleriyle değişiklik verisi yakalama (CDC) uygulamak için olay sıralamasını, yinelenenleri kaldırmayı, kısmi güncelleştirmeleri ve şema evrimini doğru şekilde işlemek için önemli bir özel kod gerekir. Bu endişelerin her biri bağımsız olarak çözülmelidir ve sonuçta elde edilen kodun korunması ve test edilmesi zordur.

Lakeflow Spark Bildirim Tabanlı İşlem Hatları, sıralama, yinelenenleri kaldırma, sıra dışı olaylar ve şema evrimi gibi işlemleri bildirimli olarak yöneten APPLY CHANGES INTO deyimi (SQL) ve apply_changes() işlevi (Python) sağlar. Değişiklik akışının ve hedef tablonun şeklini açıklarsınız; işlem hattı gerisini işler. APPLY CHANGES INTO hem SCD Tür 1 (üzerine yazma) hem de SCD Tür 2 'yi (geçmiş koruma) destekler.

Daha fazla bilgi için bkz. Değişim veri yakalama ve anlık görüntüler ve AUTO CDC API'leri: İşlem hatları ile değişim veri yakalamayı basitleştirme.

Veri kalitesini beklentilerle sağlamak

Beklentiler, bir veri kümesinden geçen her satıra uygulanan true/false SQL ifadeleridir. Bir satır koşulda başarısız olduğunda, işlem hattı yapılandırdığınız ihlal ilkesine göre yanıt verir. Beklentiler, ilkeden bağımsız olarak ölçümleri işlem hattı olay günlüğüne yayar, böylece zaman içinde veri kalitesi eğilimlerini izleyebilirsiniz.

Bir ihlal politikasını seç

Üç ihlal ilkesi mevcuttur. Hatalı verilere olan toleransınızla eşleşeni seçin:

  • warn (varsayılan): Geçerli olmayan kayıtlar hedef tabloya yazılır ve ölçümlerde işaretlenir. Tüm verileri yakalamanız ancak kalite sorunlarına görünürlük sağlamak istediğinizde bu ilkeyi kullanın.
  • drop: Geçerli olmayan kayıtlar yazılmadan önce göz ardı edilir. Hatalı satırların beklendiği ve yayılmaması gerektiği durumlarda bunu kullanın.
  • başarısız: İşlem hattı güncelleştirmesi ilk geçersiz kayıtta durdurulur. Hatalı kayıtların ciddi bir yukarı akış sorununa işaret ettiği kritik veriler için bunu kullanın.

Aşağıdaki örneklerde bir akış tablosuna uygulanan her ilke gösterilmektedir:

SQL

-- Warn: write invalid records but track them in metrics
CREATE OR REFRESH STREAMING TABLE orders_raw (
  CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL)
) AS SELECT * FROM STREAM read_files("/volumes/raw/orders", format => "json");

-- Drop: discard invalid records before writing
CREATE OR REFRESH STREAMING TABLE orders_clean (
  CONSTRAINT non_negative_amount EXPECT (amount >= 0) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(orders_raw);

-- Fail: stop the pipeline on any invalid record
CREATE OR REFRESH STREAMING TABLE orders_critical (
  CONSTRAINT required_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS SELECT * FROM STREAM(orders_clean);

Python

from pyspark import pipelines as dp

# Warn: write invalid records but track them in metrics
@dp.table
@dp.expect("valid_order_id", "order_id IS NOT NULL")
def orders_raw():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .load("/volumes/raw/orders")

# Drop: discard invalid records before writing
@dp.table
@dp.expect_or_drop("non_negative_amount", "amount >= 0")
def orders_clean():
    return spark.readStream.table("orders_raw")

# Fail: stop the pipeline on any invalid record
@dp.table
@dp.expect_or_fail("required_customer_id", "customer_id IS NOT NULL")
def orders_critical():
    return spark.readStream.table("orders_clean")

Geçersiz kayıtları karantinaya al

Bırakılan kayıtları sessizce atmak yerine araştırma için korumak istediğinizde, bir karantina düzeni kullanın. İki akış kullanarak doğrulama başarısız olan satırları ayrı bir akış tablosuna yönlendirin: biri ana tablodan geçersiz satırlar bırakır, diğeri de karantina tablosuna yalnızca geçersiz satırları yazar. Bu, temiz veri kümenizi kirletmeden hatalı verileri araştırmanıza, düzeltmenize ve yeniden işlemenize olanak tanır.

Karantina deseninin ayrıntılı bir örneği için bkz . Beklenti önerileri ve gelişmiş desenler.

Beklentiler hakkında daha fazla bilgi için bkz. İşlem hattı beklentileriyle veri kalitesini yönetme.

İşlem hatlarınızı parametreleştirme

İşlem hatları varsayılan katalog ve şema ayarlarına sahiptir, bu nedenle aynı katalog ve şema içinde okuyan ve yazan kod, ortamlar arasında parametre olmadan çalışır. Ancak işlem hattınızın ikinci bir kataloğa veya şemaya başvurması gerekiyorsa (örneğin, geliştirme ve üretim arasında farklılık gösteren paylaşılan bir kaynak kataloğundan okuma), bu adları doğrudan kaynak kodunuzda sabit kodlamaktan kaçının. Bunun yerine, bunları işlem hattı yapılandırma parametreleri (işlem hattı ayarlarında ayarlanan anahtar-değer çiftleri) olarak tanımlayın ve kodunuzda bunlara başvurun. Bu, parametre değerlerini değiştirerek tek bir kod tabanının ortamlar arasında doğru şekilde çalışmasını sağlar.

SQL

CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id, COUNT(txn_id) AS txn_count, SUM(amount) AS total_amount
FROM ${source_catalog}.sales.transactions
GROUP BY account_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import count, sum

@dp.materialized_view
def transaction_summary():
    source_catalog = spark.conf.get("source_catalog")
    return spark.read.table(f"{source_catalog}.sales.transactions") \
        .groupBy("account_id") \
        .agg(
            count("txn_id").alias("txn_count"),
            sum("amount").alias("total_amount")
        )

Daha fazla bilgi için bkz. İşlem hatlarıyla parametreleri kullanma.

Her ortam için doğru işlem hattı modunu seçme

Geliştirme ve üretim güncelleştirme modları

İşlem hatları geliştirme veya üretim güncelleştirme modunda çalışır. Hedefinizle eşleşen modu seçin.

Geliştirme modunda işlem hattı uzun süre çalışan bir kümeyi güncelleştirmeler arasında yeniden kullanıyor ve hatalarda yeniden denemez. Bu işlem hattı kodu yazarken ve test ederken yineleme döngüsünü hızlandırır çünkü kümenin yeniden başlatılmasını beklemeden hata ayrıntılarını hemen alırsınız.

Üretim modunda, her güncelleştirme tamamlandıktan sonra küme hemen kapatılır ve bu da işlem maliyetlerini azaltır. İşlem hattı, geçici altyapı hatalarını otomatik olarak işlemek için küme yeniden başlatmaları da dahil olmak üzere artan sayıda yeniden denemeler uygular. Tüm zamanlanmış işlem hattı çalıştırmaları için üretim modunu kullanın.

Tetiklenen ve sürekli işlem hattı modu karşılaştırması

Tetiklenen mod tüm kullanılabilir verileri işler ve ardından durur. İşlem hatlarının büyük çoğunluğu için doğru seçimdir: bir zamanlamaya göre (saatlik, günlük veya isteğe bağlı) çalışan ve alt dakikalık veri güncelliği gerektirmeyenler.

Sürekli mod kümeyi çalışır durumda tutar ve yeni verileri geldikçe işler. Yalnızca kullanım örneğinin saniyeler arası aralıkta gecikme süresi gerektirdiği durumlarda uygundur. Sürekli mod her zaman açık bir küme gerektirdiğinden tetiklenen moddan çok daha pahalıdır.

Daha fazla bilgi için bkz . Tetiklenen ve sürekli işlem hattı modu ve İşlem Hatlarını Yapılandırma.

Veri düzeni için sıvı kümeleme kullanma

Sıvı kümeleme, Delta tablolarındaki veri düzenini iyileştirmek için statik bölümlemenin ZORDER yerini alır. Bölümlemeden farklı olarak, önceden bir bölüm sütunu seçmenizi gerektirir ve değerler eşit olmayan şekilde dağıtıldığında veri dengesizliğine neden olabilir; sıvı kümeleme kendi kendine ayarlama, dengesizliğe dayanıklı ve artımlıdır; her çalıştırmada yalnızca yeniden düzenleme gerektiren veriler yeniden yazılır.

Sorgu desenleri geliştikçe tam tabloyu yeniden yazmadan kümeleme sütunlarını istediğiniz zaman değiştirin.

Akış tablosu tanımınızda kümeleme sütunlarını tanımlayın:

SQL

CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY (event_date, region)
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");

Python

from pyspark import pipelines as dp

@dp.table(cluster_by=["event_date", "region"])
def events():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "parquet") \
        .load("/volumes/raw/events")

Hangi sütunların kümelenmesi gerektiğinden emin değilseniz Databricks'in sorgu iş yükünüz temelinde en uygun kümeleme sütunlarını otomatik olarak seçmesine izin vermek için kullanın CLUSTER BY AUTO .

Daha fazla bilgi için Akış tabloları ve Tablolar için sıvı kümeleme kullanma ifadelerine bkz.

CI/CD ve Databricks Varlık Paketleri ile işlem hatlarını yönetme

İşlem hattı kaynak kodunuzun sürümünü denetleyin ve ortamlar arasında dağıtımları yönetmek için Databricks Varlık Paketleri'ni kullanın.

Daha fazla bilgi için bkz. Kaynak denetimli işlem hattı oluşturma, İşlem hattını Databricks Varlık Paketi projesine dönüştürme ve İşlem hatlarıyla parametreleri kullanma.

İşlem hattı kodunu sürüm denetiminde depolama

Tüm işlem hattı kaynak dosyalarını (Python ve SQL) paket yapılandırmanızla birlikte bir Git deposunda depolayın. Projenin tamamının sürüm denetimi, değişikliklerin eksiksiz bir geçmişini sunar, işbirliğini kolaylaştırır ve geliştirme ortamındaki değişiklikleri üretime yükseltmeden önce doğrulamanıza olanak tanır.

Databricks, bu iş akışını yönetmek için Databricks Varlık Paketleri önerir. Paket, YAML'deki işlem hattı yapılandırmanızı kaynak kodunuzun yanı sıra tanımlar ve databricks bundle CLI, terminal veya CI/CD sisteminizden işlem hatlarını doğrulamanıza, dağıtmanıza ve çalıştırmanıza olanak tanır.

Ortam yalıtımı için paket hedeflerini kullanma

Paketler, katalog adları, küme ilkeleri, bildirim adresleri ve diğer ayarlar için kendi geçersiz kılma kümesine sahip birden çok hedefi (örneğin, dev, staging, prod) etkinleştirir. Kaynak kodunuzu ortam sabitlerinden uzak tutarak dağıtım zamanında ortama özgü doğru değerleri eklemek için paket hedeflerini işlem hattı parametreleriyle birleştirin.

Tipik bir iş akışı şöyle görünür:

  1. Geliştirici, geliştirici kataloğundaki kişisel bir geliştirme işlem hattına dağıtım yaparak bir özellik dalında çalışır.
  2. Ana dal ile birleştirme yapıldığında, bir CI sistemi databricks bundle validate ve databricks bundle deploy --target staging işlemlerini çalıştırarak işlem hattını doğrular ve hazırlık ortamına dağıtır.
  3. Testler başarıyla geçildikten sonra CI sistemi databricks bundle deploy --target prod ile üretime dağıtılır.

Akış için en iyi yöntemler

Durumu yönetmek, geç verileri denetlemek ve akış işlem hatlarının güvenilir kalmasını sağlamak için bu desenleri kullanın.

Daha fazla bilgi için bkz. Filigranlarla durum bilgisi olan işlemeyi optimize etme, Akış denetim noktası hatasından işlem hattını kurtarma ve İşlem hatlarıyla geçmiş verileri geri doldurma.

Durum bilgisi olan işlemler için filigranları kullanma

Filigranlar, pencereli toplamalar ve yinelenenleri kaldırma gibi durum bilgisi içeren akış işlemleri sırasında işlem hattının bellekte tuttuğu durumu belirler. Filigran olmadan, işlem hattı her olası anahtar için veri topladıkça durum sınırsız olarak büyür ve sonunda uzun süreli işlem hatlarında bellek yetersizliği hatalarına neden olur.

Filigran, zaman damgası sütununu ve geç veriler için tolerans eşiğini belirtir. Eşik geçtikten sonra gelen kayıtlar bırakılır. Geç verilere olan toleransınızı bu durumu açık tutmanın bellek maliyetine göre dengeleyen bir eşik seçin.

Aşağıdaki örnek, üç dakikalık filigranla bir dakikalık yuvarlanan pencere toplama işlemini hesaplar:

SQL

CREATE OR REFRESH STREAMING TABLE event_counts AS
SELECT window(event_time, '1 minute') AS time_window, region, COUNT(*) AS cnt
FROM STREAM(events_raw)
  WATERMARK event_time DELAY OF INTERVAL 3 MINUTES
GROUP BY time_window, region;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import window

@dp.table
def event_counts():
    return (
        spark.readStream.table("events_raw")
            .withWatermark("event_time", "3 minutes")
            .groupBy(window("event_time", "1 minute"), "region")
            .count()
    )

Uyarı

Toplamaların her güncelleştirmede tamamen yeniden derlenmek yerine artımlı olarak işlenmesini sağlamak için bir filigran tanımlamanız gerekir.

Veri akışı durumunu ve tam yenilemeyi anlama

Akış durumu artımlıdır: işlem hattı her seferinde sıfırdan yeniden derlemek yerine güncelleştirmeler arasında durumu oluşturur ve korur. Durum bilgisi olan akışı verimli hale getiren budur, ancak durum bilgisi olan bir sorgunun mantığını değiştirirseniz (örneğin, filigran eşiğini değiştirirseniz veya toplama sütunlarını değiştirirseniz), mevcut durumun artık yeni mantıkla uyumlu olmadığı anlamına gelir. Bu durumda, tüm geçmiş verileri yeni mantıkla yeniden işlemek ve durumu sıfırdan yeniden oluşturmak için tam yenileme gerçekleştirmeniz gerekir.

Kaynağın geçmiş verileri tutmaması durumunda tam yenileme veri kaybına da yol açabilir. Örneğin, saklama süresi kısa olan bir Kafka kaynağı, yenileme sırasında yalnızca son birkaç dakika veri kullanılabilir duruma gelebilir ve bu da öncekinden çok daha az veri içeren bir tabloya neden olabilir. Özellikle tam yenilemenin maliyetli olduğu veya kaynağın veri saklama süresinin sınırlı olduğu yüksek hacimli akışlar için, durumlu sorgu mantığındaki değişiklikleri dikkatle planlayın. Madalyon mimarisinin kullanılması, en az dönüşümle bronz tablolar oluşturulmasına yardımcı olur ve gümüş veya altın tabloların bronz tablolardan tam geçmişle yeniden hesap yapmasına olanak tanır.

Akış-akış eşleştirmeleri

Akış akışı birleşimleri, birleştirmenin her iki tarafında bir filigran ve zamana bağlı birleştirme koşulu gerektirir. Birleştirme koşulundaki zaman aralığı, başka eşleşme mümkün olmadığında akış altyapısına bildirir ve artık eşleştirilemeyecek durumu çıkarmasına olanak sağlar. Filigranları veya zamana bağlı koşulunu atlarsanız, durum sınırsız bir şekilde büyür.

Aşağıdaki örnek reklam gösterim olaylarını tıklama olaylarıyla birleştirerek tıklamanın gösterimden sonraki üç dakika içinde gerçekleşmesini gerektirir:

SQL

CREATE OR REFRESH STREAMING TABLE impression_clicks AS
SELECT imp.ad_id, imp.impression_time, clk.click_time
FROM STREAM(ad_impressions)
    WATERMARK impression_time DELAY OF INTERVAL 3 MINUTES AS imp
JOIN STREAM(user_clicks)
    WATERMARK click_time DELAY OF INTERVAL 3 MINUTES AS clk
ON imp.ad_id = clk.ad_id
  AND clk.click_time BETWEEN imp.impression_time
    AND imp.impression_time + INTERVAL 3 MINUTES;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import expr

dp.create_streaming_table("impression_clicks")

@dp.append_flow(target="impression_clicks")
def join_impressions_and_clicks():
    impressions = spark.readStream.table("ad_impressions") \
        .withWatermark("impression_time", "3 minutes")
    clicks = spark.readStream.table("user_clicks") \
        .withWatermark("click_time", "3 minutes")
    return impressions.alias("imp").join(
        clicks.alias("clk"),
        expr("""
            imp.ad_id = clk.ad_id AND
            clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL 3 MINUTES
        """),
        "leftOuter"
    )

Bir akışı statik tabloya (anlık görüntü birleştirme) karşı birleştirdiğinizde, statik tablo anlık görüntüsü her mikrobatch'in başlangıcında yenilenir. Bu, geç gelen boyut kayıtlarının önceden işlenmiş olgulara geriye dönük olarak uygulanmadığı anlamına gelir. Geriye dönük uygulama gerekiyorsa malzemeleştirilmiş bir görünüm kullanın veya işlem hattını yeniden yapılandırın.

İşlem hattı performansını iyileştirme

İşlem maliyetlerini azaltmak ve işlem hattı güncelleştirmelerini hızlandırmak için bu teknikleri uygulayın.

Daha fazla bilgi için Materyalize edilmiş görünümler ve Filigranlarla durum bilgisi işlemeyi iyileştirme seçeneklerine bakın.

Küçük dosyalardan kaçının

Düşük hacimli bir kaynakta işlem hattının çok sık tetiklenmesi, bulut depolama alanına çok sayıda küçük dosya yazar. Her dosya ayrı bir meta veri araması ve G/Ç gidiş dönüş gerektirdiği ve bulut depolama API'leri büyük ölçekte listeleme işlemlerini kısıtladığı için küçük dosyalar okuma performansını düşürür. Bunu önlemek için, veri biriminizle eşleşen bir tetikleyici aralığı seçin: tetiklenen işlem hatlarını güncelleştirmeler arasında sürekli değil, anlamlı miktarda veri birikmesine izin veren bir zamanlamada çalıştırın.

Veri dengesizliklerini işleme

Bir join veya groupBy anahtarındaki değerler bölümler arasında eşit olmayan bir şekilde dağıtıldığında veri dengesizliği oluşur ve bu da az sayıdaki görevlerin verilerin çoğunu işlemesine neden olur. Bu, uçtan uca güncelleştirme süresini artıran etkin noktalar oluşturur. Depolanan tablolardaki dengesizliği gidermek için liquid clustering yöntemini kullanın. Uçuş içi hesaplama sırasında oluşan dengesizlik için, iki aşamada gruplandırma ve toplamadan önce rastgele bir demet soneki ekleyerek yüksek oranda eğriltilmiş anahtarları tuzla.

Daha fazla bilgi için bkz. Veri düzeni için sıvı kümeleme kullanma.

Materyalize edilmiş görünümler için artımlı yenileme kullanımı

Büyük bir toplama için gerçekleştirilmiş bir görünüm kullandığınızda, Lakeflow Spark Bildirimli İşlem Hatları bunu artımlı olarak yenilemeye çalışır; tam sonuç kümesini yeniden derlemek yerine yalnızca son güncelleştirmeden bu yana yapılan yukarı akış değişiklikleri işlenir. Artımlı yenileme, sorguyu her işlem hattı tetikleyicisinde sıfırdan yeniden çalıştırmaktan önemli ölçüde daha ucuzdur. Gerçekleştirilmiş görünümün artımlı olarak yenilenme olasılığını en üst düzeye çıkarmak için basit, belirleyici toplama sorguları yazın ve belirleyici olmayan işlevler gibi artımlı işlemeyi engelleyen yapılardan kaçının.

Bkz. gerçekleştirilmiş görünümler için artımlı yenileme.

Birleştirmeleri optimize etme

Bir tarafı küçük boyut tablosu olan birleşimler için Spark'a karışık birleştirme gerçekleştirmek yerine küçük tabloyu tüm yürütücülere yayınlamasını bildirmek için bir yayın ipucu ekleyin:

SQL

CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT o.*, /*+ BROADCAST(p) */ p.product_name, p.category
FROM orders o
JOIN products p ON o.product_id = p.product_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast

@dp.materialized_view
def enriched_orders():
    orders = spark.read.table("orders")
    products = spark.read.table("products")
    return orders.join(broadcast(products), "product_id")

Zaman serisi yakınlık birleşimleri için (örneğin, bir zaman aralığındaki en yakın olayı bulma), bir aralık birleştirme koşulu kullanın ve akışlar birleştirilirken her iki tarafın da zaman damgası olduğundan emin olun veya birleştirmeden önce olayları zaman demetlerine önceden ayırmayı göz önünde bulundurun.

İşlem hatlarınızı izleme

İşlem hattı olay günlüğü, Lakeflow Spark Bildirimli İşlem Hatlarında birincil gözlemlenebilirlik temelidir. Her işlem hattı çalıştırması, yürütme ilerleme durumunu, veri kalitesi beklentisi sonuçlarını, veri kökenini ve hata ayrıntılarını kapsayan yapılandırılmış kayıtları olay günlüğüne yazar. Olay günlüğü, doğrudan sorgulayabileceğiniz bir Delta tablosudur.

Temel alınan depolama yolunu bilmeden olay günlüğünü sorgulamak için paylaşılan bir kümede event_log() veya SQL ambarı üzerinde tablo değerli işlevini kullanın:

SELECT * FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 100;

Beklenen ölçümler için olay günlüğünü sorgulayarak veri kalitesi panoları oluşturun. details sütunu, zaman içindeki kalite eğilimlerini izlemek ve regresyonlar konusunda uyarı vermek için kullanabileceğiniz, her kısıtlama için geçiş/başarısız sayılarına sahip iç içe geçmiş bir JSON yapısı içerir.

Olay temelli uyarılar için, işlem hattı başarısız olduğunda veya veri kalitesi eşiği ihlal edildiğinde özel web kancalarını veya bildirim hizmetlerini (Slack veya PagerDuty gibi) tetikleme amacıyla olay kancalarını kullanın. Olay kancaları, işlem hattı olaylarına yanıt olarak çalışan Python işlevleridir.

Daha fazla bilgi için bkz. İşlem hatlarını izleme, İşlem hattı olay günlüğü ve Olay kancalarıyla işlem hatlarının özel izlemesini tanımlama.

Sunucusuz işlem kullanma

Databricks, yeni işlem hatları için sunucusuz işlem önerir. Sunucusuz olduğunda el ile küme yapılandırması yoktur; Databricks altyapıyı otomatik olarak yönetir. Sunucusuz işlem hatları, iş yükü taleplerine yanıt olarak hem yatay olarak (daha fazla yürütücü) hem de dikey olarak (daha büyük yürütücü boyutu) ölçeklenebilen gelişmiş otomatik ölçeklendirme kullanır. Sunucusuz işlem hatları her zaman Unity Kataloğu'nu kullandığı için idare ve köken izleme varsayılan olarak yerleşiktir.

Daha fazla bilgi için bkz. Sunucusuz işlem hattı yapılandırma.

Madalyon mimarisiyle işlem hatlarını düzenleme

Madalyon mimarisi, verileri bronz, gümüş ve altın olmak üzere her birinin ayrı bir amacı olan üç mantıksal katman halinde düzenler. Lakeflow Spark Bildirimli İşlem Hatları veri kümesi türlerini doğru katmana eşlemek, her katmanın sorumluluklarını açık tutar ve işlem hatlarının bakımını kolaylaştırır.

  • Bronz: Bulut depolama, ileti veri yolları veya CDC kaynaklarından ham verileri almak için akış tablolarını kullanın. Bronz tablolar ham kaynak verilerini minimum dönüşümle koruyarak gümüş veya altın katmanların, gereksinimler değişirse bronz katmandaki kaynaktan yeniden işlenmesini mümkün hale getirir.
  • Silver: Artımlı satır düzeyi dönüştürmeler (filtreleme, temizleme ve ayrıştırma) için akış tablolarını kullanın. Gümüş katmanlı mantık boyut tablolarına veya artımlı yenilemeden yararlanan karmaşık toplamalara karşı zenginleştirme birleşimleri içerdiğinde gerçekleştirilmiş görünümleri kullanın.
  • Gold: Panolara, raporlama araçlarına ve aşağı akış tüketicilerine sunulacak toplamaları, metrikleri ve özetleri önceden hesaplamak için materyalize edilmiş görünümleri kullanın.

Mümkün olduğunca alım (bronz) ve dönüştürmeyi (gümüş ve altın) ayrı işlem hattı DAG'lerine ayırın. Katmanları ayırma, her katmanı bağımsız olarak zamanlamanıza, izlemenize ve sorun gidermenize olanak tanır ve dönüştürme işlem hattındaki bir hata, yeni verilerin bronza inmesini engellemez.

Daha fazla bilgi için bkz . Akış tabloları ve Gerçekleştirilmiş görünümler.