Udostępnij za pośrednictwem


Najlepsze rozwiązania dotyczące potoków deklaratywnych platformy Spark w usłudze Lakeflow

Na tej stronie opisano zalecane wzorce projektowania, budowania i obsługi potoków z Lakeflow Spark Declarative Pipelines. Zastosuj te wytyczne podczas uruchamiania nowego potoku lub ulepszania istniejącego.

Wybieranie odpowiedniego typu zestawu danych

Deklaratywne potoki Lakeflow Spark oferują trzy typy zestawów danych: tabele strumieniowe, zmaterializowane widoki i widoki tymczasowe. Wybranie odpowiedniego typu dla każdej warstwy potoku pozwala uniknąć niepotrzebnych kosztów obliczeniowych i ułatwia zrozumienie kodu.

Tabele przesyłania strumieniowego są właściwym wyborem do pozyskiwania danych oraz przekształceń strumieniowych o niskiej latencji. Każdy wiersz wejściowy jest odczytywany i przetwarzany tylko raz, co czyni go idealnym rozwiązaniem dla obciążeń tylko do dodawania, dużych ilości danych oraz przetwarzania sterowanego zdarzeniami z magazynu w chmurze lub systemu kolejkowania komunikatów.

Zmaterializowane widoki są właściwym wyborem dla złożonych przekształceń i zapytań analitycznych. Ich wyniki są wstępnie obliczane i aktualizowane przy użyciu odświeżania przyrostowego, więc zapytania względem nich są szybkie. Nie można bezpośrednio modyfikować danych w zmaterializowanym widoku — definicja zapytania kontroluje dane wyjściowe.

Widoki tymczasowe to widoki w zakresie potoku, które organizują logikę przekształcania bez zapisywania danych do magazynu. Użyj ich do wykonywania kroków pośrednich, które nie wymagają własnej tabeli.

Poniższa tabela zawiera podsumowanie, kiedy należy używać każdego typu:

Przypadek użycia Zalecany typ Powód
Pozyskiwanie danych z magazynu w chmurze lub z magistrali komunikatów Tabela strumieniowa Przetwarza każdy rekord raz; obsługuje duże obciążenia związane wyłącznie z dołączaniem danych.
Strumienie CDC (wstawianie, aktualizacje, usuwanie) Tabela strumieniowa Używany jako element docelowy APPLY CHANGES INTO dla uporządkowanego, deduplikowanego pozyskiwania CDC.
Złożone agregacje i sprzężenia Zmaterializowany widok Odświeżanie przyrostowe; pozwala uniknąć pełnej ponownej kompilacji w każdej aktualizacji.
Przyspieszanie zapytań pulpitu nawigacyjnego Zmaterializowany widok Wstępnie obliczone wyniki sprawiają, że zapytania są szybsze niż w przypadku nieprzetworzonych tabel.
Przekształcenia pośrednie (bez odbiorców dalszych) Widok tymczasowy Organizuje logikę przepływu danych bez ponoszenia kosztów przechowywania danych.

Aby uzyskać więcej informacji, zobacz Tabele strumieniowe, Zmaterializowane widoki oraz Koncepcje potoków deklaratywnych Spark Lakeflow.

Używanie deklaratywnej usługi CDC zamiast imperatywnej funkcji MERGE

Implementowanie przechwytywania danych zmian (CDC) za pomocą instrukcji imperatywnych SQL MERGE wymaga znacznego kodu niestandardowego w celu prawidłowego obsługi kolejności zdarzeń, deduplikacji, częściowych aktualizacji i ewolucji schematu. Każde z tych problemów należy rozwiązać niezależnie, a wynikowy kod jest trudny do utrzymania i testowania.

Lakeflow Spark Declarative Pipelines zapewnia instrukcję APPLY CHANGES INTO (SQL) oraz funkcję apply_changes() (Python), które deklaratywnie obsługują porządkowanie, deduplikację, zdarzenia poza kolejnością i ewolucję schematu. Opisujesz kształt strumienia zmian i tabelę docelową — potok obsługuje resztę. APPLY CHANGES INTO obsługuje zarówno typ SCD 1 (zastępowanie) i typ SCD 2 (zachowywanie historii).

Aby uzyskać więcej informacji, zobacz przechwytywanie zmian danych i migawki oraz AUTO CDC API: upraszczanie przechwytywania zmian danych za pomocą potoków.

Wymuszanie jakości danych przez ustalanie oczekiwań

Oczekiwania to wyrażenia SQL typu prawda/fałsz, które są stosowane do każdego wiersza przechodzącego przez zestaw danych. Gdy wiersz zakończy się niepowodzeniem, potok odpowiada zgodnie ze skonfigurowanymi zasadami naruszenia. Oczekiwania wysyłają metryki do dziennika zdarzeń w potoku bez względu na zasady, dzięki czemu można śledzić trendy jakości danych w czasie.

Wybierz politykę naruszenia

Dostępne są trzy polityki dotyczące naruszeń. Wybierz ten, który odpowiada tolerancji dla nieprawidłowych danych:

  • ostrzegaj (wartość domyślna): rekordy, które nie są prawidłowe, są zapisywane w tabeli docelowej i oznaczone w metrykach. Użyj tych zasad, gdy musisz przechwycić wszystkie dane, ale chcesz uzyskać wgląd w problemy z jakością.
  • drop: Rekordy, które nie są prawidłowe, są odrzucane przed zapisaniem. Użyj tej opcji, gdy oczekiwane są nieprawidłowe wiersze i nie powinny propagować podrzędnych.
  • awaria: aktualizacja rurociągu zatrzymuje się na pierwszym nieprawidłowym rekordzie. Użyj tej funkcji w przypadku danych krytycznych, w których dowolny zły rekord wskazuje poważny problem nadrzędny.

W poniższych przykładach przedstawiono poszczególne zasady zastosowane do tabeli przesyłania strumieniowego:

SQL

-- Warn: write invalid records but track them in metrics
CREATE OR REFRESH STREAMING TABLE orders_raw (
  CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL)
) AS SELECT * FROM STREAM read_files("/volumes/raw/orders", format => "json");

-- Drop: discard invalid records before writing
CREATE OR REFRESH STREAMING TABLE orders_clean (
  CONSTRAINT non_negative_amount EXPECT (amount >= 0) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(orders_raw);

-- Fail: stop the pipeline on any invalid record
CREATE OR REFRESH STREAMING TABLE orders_critical (
  CONSTRAINT required_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS SELECT * FROM STREAM(orders_clean);

Python

from pyspark import pipelines as dp

# Warn: write invalid records but track them in metrics
@dp.table
@dp.expect("valid_order_id", "order_id IS NOT NULL")
def orders_raw():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .load("/volumes/raw/orders")

# Drop: discard invalid records before writing
@dp.table
@dp.expect_or_drop("non_negative_amount", "amount >= 0")
def orders_clean():
    return spark.readStream.table("orders_raw")

# Fail: stop the pipeline on any invalid record
@dp.table
@dp.expect_or_fail("required_customer_id", "customer_id IS NOT NULL")
def orders_critical():
    return spark.readStream.table("orders_clean")

Przenieś nieprawidłowe rekordy do kwarantanny

Jeśli chcesz zachować porzucone rekordy na potrzeby badania, a nie dyskretnie je odrzucać, użyj wzorca kwarantanny. Przekierowanie wierszy, które nie przeszły weryfikacji, do oddzielnej tabeli strumieniowej za pomocą dwóch przepływów: jeden pomija nieprawidłowe wiersze z tabeli głównej, natomiast drugi zapisuje tylko nieprawidłowe wiersze w tabeli kwarantanny. Dzięki temu można badać, naprawiać i ponownie przetwarzać nieprawidłowe dane bez zakłócania czystego zestawu danych.

Aby zapoznać się ze szczegółowym przykładem wzorca kwarantanny, zobacz Zalecenia dotyczące oczekiwań i zaawansowane wzorce.

Aby uzyskać więcej informacji na temat oczekiwań, zobacz Zarządzanie jakością danych z wykorzystaniem oczekiwań dotyczących potoku.

Parametryzowanie przepływów pracy

Pipelines mają domyślne ustawienia katalogu i schematu, więc kod, który odczytuje i zapisuje w tym samym katalogu i schemacie, działa w różnych środowiskach bez konieczności użycia parametrów. Jeśli jednak potok musi odwoływać się do drugiego katalogu lub schematu — na przykład odczytywanie z udostępnionego katalogu źródłowego, które różni się między programowaniem a produkcją, unikaj kodowania tych nazw bezpośrednio w kodzie źródłowym. Zamiast tego zdefiniuj je jako parametry konfiguracji potoku (pary klucz-wartość ustawione w ustawieniach potoku) i odwołaj się do nich w kodzie. Dzięki temu pojedyncza baza kodu działa poprawnie w różnych środowiskach, zamieniając wartości parametrów.

SQL

CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id, COUNT(txn_id) AS txn_count, SUM(amount) AS total_amount
FROM ${source_catalog}.sales.transactions
GROUP BY account_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import count, sum

@dp.materialized_view
def transaction_summary():
    source_catalog = spark.conf.get("source_catalog")
    return spark.read.table(f"{source_catalog}.sales.transactions") \
        .groupBy("account_id") \
        .agg(
            count("txn_id").alias("txn_count"),
            sum("amount").alias("total_amount")
        )

Aby uzyskać więcej informacji, zobacz Używanie parametrów z pipeline'ami.

Wybierz odpowiedni tryb potoku dla każdego środowiska

Tryby aktualizacji programowania i produkcji

Potoki działają w trybie deweloperskim lub produkcyjnym. Wybierz tryb pasujący do celu.

W trybie programowania potok ponownie używa długotrwałego klastra w ramach aktualizacji i nie ponawia próby w przypadku błędów. Przyspiesza to cykl iteracji podczas tworzenia i testowania kodu potoku, ponieważ szczegóły błędu są wyświetlane natychmiast bez oczekiwania na ponowne uruchomienie klastra.

W trybie produkcyjnym klaster jest zamykany natychmiast po zakończeniu każdej aktualizacji, co zmniejsza koszty obliczeń. Potok stosuje również eskalację prób ponawiania, w tym ponowne uruchamianie klastra, w celu automatycznego obsługiwania przejściowych awarii infrastruktury. Użyj trybu produkcyjnego dla wszystkich zaplanowanych przebiegów potoków.

Wyzwalany kontra tryb przetwarzania potokowego

Tryb wyzwalany przetwarza wszystkie dostępne dane, a następnie zatrzymuje się. Jest to właściwy wybór dla zdecydowanej większości rurkopłowów: tych, które są uruchamiane zgodnie z harmonogramem (co godzinę, codziennie lub na żądanie) i nie wymagają świeżości danych w czasie poniżej minuty.

Tryb ciągły utrzymuje klaster w trybie aktywnym i przetwarza nowe dane po ich nadejściu. Jest to odpowiednie tylko wtedy, gdy przypadek użycia wymaga opóźnienia w zakresie sekund do minut. Ze względu na to, że tryb ciągły wymaga zawsze włączonego klastra, jest znacznie droższy niż tryb wyzwalania.

Aby uzyskać więcej informacji, zobacz Wyzwalany i ciągły tryb potoku i Konfigurowanie potoków.

Używanie płynnego klastrowania dla układu danych

Klastrowanie dynamiczne zastępuje partycjonowanie statyczne i ZORDER dla optymalizacji układu danych w tabelach Delta. W przeciwieństwie do partycjonowania, które wymaga wcześniejszego wyboru kolumny partycji i może prowadzić do przechyłu danych, gdy wartości są nierównomiernie rozproszone, klastrowanie płynne jest automatycznie dostrajane, odporne na przechył i przyrostowe — tylko dane wymagające reorganizacji są przekształcane przy każdym uruchomieniu.

Zmień kolumny klastrowania w dowolnym momencie bez ponownego zapisywania pełnej tabeli w miarę rozwoju wzorców zapytań.

Zdefiniuj kolumny klastrowania w definicji tabeli przesyłania strumieniowego:

SQL

CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY (event_date, region)
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");

Python

from pyspark import pipelines as dp

@dp.table(cluster_by=["event_date", "region"])
def events():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "parquet") \
        .load("/volumes/raw/events")

Jeśli nie masz pewności, które kolumny mają być klastrowane, użyj polecenia CLUSTER BY AUTO , aby umożliwić usłudze Databricks automatyczne wybieranie optymalnych kolumn klastrowania na podstawie obciążenia zapytania.

Aby uzyskać więcej informacji, zobacz Streaming tables (tabele przesyłane strumieniowo) i Use liquid clustering for tables (Używanie klastrowania Liquid dla tabel).

Zarządzanie potokami z CI/CD i pakietami zasobów Databricks

Kontrola wersji kodu źródłowego potoku i używanie Pakietów zasobów Databricks do zarządzania wdrożeniami w różnych środowiskach.

Aby uzyskać więcej informacji, zobacz Tworzenie potoku kontrolowanego przez źródło, Konwertowanie potoku na projekt pakietu zasobów usługi Databricks i Używanie parametrów z potokami.

Przechowuj kod potoku w kontroli wersji

Przechowuj wszystkie pliki źródłowe potoku (Python i SQL) wraz z konfiguracją pakietu w repozytorium Git. Kontrola wersji pełnego projektu zapewnia pełną historię zmian, ułatwia współpracę i umożliwia weryfikowanie zmian w środowisku deweloperskim przed podwyższeniem ich do środowiska produkcyjnego.

Databricks zaleca Databricks Asset Bundles do zarządzania tym przepływem pracy. Pakiet określa konfigurację potoku za pomocą YAML wraz z kodem źródłowym, a interfejs CLI (interfejs wiersza polecenia) umożliwia weryfikację, wdrażanie i uruchamianie potoków bezpośrednio z terminala lub z systemu ciągłej integracji/ciągłego wdrażania.

Użyj celów pakietu do izolacji środowiska

Pakiety umożliwiają wiele obiektów docelowych (na przykład dev, staging, prod), z których każdy ma własny zestaw modyfikacji dla nazw katalogów, zasad klastra, adresów powiadomień i innych ustawień. Połącz elementy docelowe pakietu z parametrami potoku, aby wstrzyknąć prawidłowe wartości specyficzne dla środowiska w czasie wdrażania, utrzymując kod źródłowy bez stałych środowiskowych.

Typowy przepływ pracy wygląda następująco:

  1. Deweloper pracuje nad gałęzią funkcjonalności, wdrażając w osobistą linię rozwoju w katalogu deweloperskim.
  2. Po połączeniu z główną gałęzią, system CI uruchamia databricks bundle validate i databricks bundle deploy --target staging w celu zweryfikowania i wdrożenia potoku CI w środowisku przejściowym.
  3. Po zakończeniu testowania system ciągłej integracji jest wdrażany w środowisku produkcyjnym za pomocą polecenia databricks bundle deploy --target prod.

Najlepsze rozwiązania dotyczące przesyłania strumieniowego

Wykorzystaj te wzorce do zarządzania stanami, kontrolowania danych pojawiających się z opóźnieniem i zachowania niezawodności potoków przesyłania strumieniowego.

Aby uzyskać więcej informacji, zobacz Optymalizowanie przetwarzania stanowego za pomocą znaków wodnych, Odzyskiwanie potoku po awarii punktu kontrolnego przesyłania strumieniowego i Uzupełnianie danych historycznych przy użyciu potoków.

Używanie znaków wodnych na potrzeby operacji stanowych

Znaki wodne wiążą stan, który potok danych przechowuje w pamięci podczas stanowych operacji przesyłania strumieniowego, takich jak agregacje okienne i deduplikacja. Bez znaku wodnego stan staje się nieograniczony, ponieważ potok danych gromadzi dane dla każdego możliwego klucza, co w końcu powoduje błędy wyczerpania pamięci w długotrwałych potokach danych.

Znak wodny określa kolumnę czasową i próg tolerancji dla opóźnionych danych. Rekordy, które docierają po przekroczeniu progu, zostaną porzucone. Wybierz próg, który równoważy tolerancję dla danych opóźnionych względem kosztu pamięci przechowywania tego stanu.

Poniższy przykład oblicza agregację jednominutowego okna wirowania z trzyminutowym znakiem wodnym:

SQL

CREATE OR REFRESH STREAMING TABLE event_counts AS
SELECT window(event_time, '1 minute') AS time_window, region, COUNT(*) AS cnt
FROM STREAM(events_raw)
  WATERMARK event_time DELAY OF INTERVAL 3 MINUTES
GROUP BY time_window, region;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import window

@dp.table
def event_counts():
    return (
        spark.readStream.table("events_raw")
            .withWatermark("event_time", "3 minutes")
            .groupBy(window("event_time", "1 minute"), "region")
            .count()
    )

Uwaga / Notatka

Aby upewnić się, że agregacje są przetwarzane przyrostowo, a nie w pełni ponownie przeliczane dla każdej aktualizacji, należy zdefiniować znacznik czasowy.

Omówienie stanu przesyłania strumieniowego i pełnego odświeżania

Stan przesyłania strumieniowego jest przyrostowy: potok tworzy i utrzymuje stan między aktualizacjami, a nie rekompiluje się od podstaw za każdym razem. To sprawia, że stanowe przesyłanie strumieniowe jest wydajne, ale oznacza to również, że jeśli zmienisz logikę zapytania stanowego (na przykład modyfikując próg znaku wodnego lub zmieniając kolumny agregacji), istniejący stan nie jest już zgodny z nową logiką. W takim przypadku należy wykonać pełne odświeżanie, aby ponownie przetworzyć wszystkie dane historyczne przy użyciu nowej logiki i ponownie skompilować stan od podstaw.

Pełne odświeżanie może również prowadzić do utraty danych, jeśli źródło nie zachowuje danych historycznych. Na przykład źródło danych Kafka z krótkim okresem retencji może mieć dostępne tylko ostatnie kilka minut danych w momencie odświeżania, na skutek czego tabela zawiera znacznie mniej danych niż przedtem. Uważnie zaplanuj zmiany logiki zapytań stanowych, szczególnie w przypadku strumieni o dużej ilości, w przypadku których pełne odświeżanie jest kosztowne lub gdy źródło ma ograniczone przechowywanie danych. Korzystanie z architektury medalionu pomaga w tworzeniu tabel brązowych z minimalną transformacją i umożliwia przeliczanie tabel srebrnych lub złotych na podstawie tabel brązowych z pełną historią.

Łączenia strumień-strumień

Sprzężenia strumień-strumień wymagają znaku wodnego po obu stronach sprzężenia oraz warunku sprzężenia ograniczonego czasowo. Interwał czasu w warunku sprzężenia informuje aparat przesyłania strumieniowego, gdy nie są możliwe dalsze dopasowania, co pozwala mu wykluczyć stan, który nie może być już zgodny. Jeśli pominięto znaki wodne lub warunek związany z czasem, stan rośnie bez ograniczeń.

Poniższy przykład łączy zdarzenia wyświetleń reklam z zdarzeniami kliknięć, co wymaga, aby kliknięcie nastąpiło w ciągu trzech minut od wyświetlenia:

SQL

CREATE OR REFRESH STREAMING TABLE impression_clicks AS
SELECT imp.ad_id, imp.impression_time, clk.click_time
FROM STREAM(ad_impressions)
    WATERMARK impression_time DELAY OF INTERVAL 3 MINUTES AS imp
JOIN STREAM(user_clicks)
    WATERMARK click_time DELAY OF INTERVAL 3 MINUTES AS clk
ON imp.ad_id = clk.ad_id
  AND clk.click_time BETWEEN imp.impression_time
    AND imp.impression_time + INTERVAL 3 MINUTES;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import expr

dp.create_streaming_table("impression_clicks")

@dp.append_flow(target="impression_clicks")
def join_impressions_and_clicks():
    impressions = spark.readStream.table("ad_impressions") \
        .withWatermark("impression_time", "3 minutes")
    clicks = spark.readStream.table("user_clicks") \
        .withWatermark("click_time", "3 minutes")
    return impressions.alias("imp").join(
        clicks.alias("clk"),
        expr("""
            imp.ad_id = clk.ad_id AND
            clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL 3 MINUTES
        """),
        "leftOuter"
    )

Po dołączeniu strumienia do tabeli statycznej (złącze migawkowe) migawka tabeli statycznej jest odświeżana na początku każdej mikropartii. Oznacza to, że późno przychodzące rekordy wymiarów nie są stosowane retrospekcyjnie do faktów, które zostały już przetworzone. Jeśli wymagane jest działanie wsteczne, użyj zmaterializowanego widoku lub zmień strukturę potoku.

Optymalizowanie wydajności potoku

Zastosuj te techniki, aby zmniejszyć koszty obliczeń i przyspieszyć aktualizacje procesów potokowych.

Aby uzyskać więcej informacji, zobacz Materialized views (Zmaterializowane widoki) i Optimize stateful processing with watermarks (Optymalizowanie przetwarzania stanowego za pomocą znaków wodnych).

Unikaj małych plików

Wyzwalanie potoku zbyt często na źródle o niskiej częstotliwości danych zapisuje dużą liczbę małych plików w chmurowej pamięci masowej. Pliki w małym rozmiarze obniżają wydajność odczytu, ponieważ każdy plik wymaga oddzielnego wyszukiwania metadanych i zapytań wejścia/wyjścia, a interfejsy API magazynu w chmurze ograniczają wydajność operacji wylistowywania na dużą skalę. Aby tego uniknąć, należy wybrać interwał wyzwalacza zgodny z woluminem danych: uruchamianie wyzwalanych potoków zgodnie z harmonogramem, które pozwala na gromadzenie się znaczących ilości danych między aktualizacjami, a nie stale.

Obsługa niesymetryczności danych

Nierównomierność danych występuje, gdy wartości w kluczu łączenia lub grupowania są nierównomiernie rozłożone między partycje, powodując niewielką liczbę zadań do przetworzenia większości danych. Spowoduje to utworzenie hotspotów, które zwiększają czas aktualizacji od początku do końca. Użyj klastrowania cieczy, aby rozwiązać rozrzut w przechowywanych tabelach. W przypadku występowania skosu podczas obliczeń w locie, dodawaj losowy sufiks koszyka do wysoce skosowanych kluczy przed zgrupowaniem i agregacją w dwóch etapach.

Aby uzyskać więcej informacji, zobacz Używaj klastrowania cieczy do układu danych.

Użyj odświeżania przyrostowego dla zmaterializowanych widoków

Jeśli używasz zmaterializowanego widoku dla dużej agregacji, Lakeflow Spark Declarative Pipelines próbuje odświeżyć go przyrostowo — przetwarzając tylko zmiany wejściowe od ostatniej aktualizacji, zamiast obliczać ponownie cały zestaw wyników. Odświeżanie przyrostowe jest znacznie tańsze niż ponowne uruchamianie zapytania od podstaw w każdym wyzwalaczu potoku. Aby zmaksymalizować prawdopodobieństwo, że zmaterializowany widok może być odświeżany przyrostowo, napisz proste, deterministyczne zapytania agregacji i unikaj konstrukcji, które uniemożliwiają przetwarzanie przyrostowe, takie jak funkcje niedeterministyczne.

Zobacz Odświeżanie przyrostowe, aby uzyskać zmaterializowane widoki.

Optymalizowanie sprzężeń

W przypadku sprzężeń, gdzie jedna strona jest małą tabelą wymiarową, dodaj instrukcję broadcastu, aby poinformować Spark, aby zbroadcastować mniejszą tabelę do wszystkich wykonawców, zamiast wykonywać sprzężenie z tasowaniem.

SQL

CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT o.*, /*+ BROADCAST(p) */ p.product_name, p.category
FROM orders o
JOIN products p ON o.product_id = p.product_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast

@dp.materialized_view
def enriched_orders():
    orders = spark.read.table("orders")
    products = spark.read.table("products")
    return orders.join(broadcast(products), "product_id")

W przypadku sprzężeń zbliżeniowych szeregów czasowych (na przykład znalezienie najbliższego zdarzenia w zakresie czasu) użyj warunku połączenia zakresu i upewnij się, że po obu stronach znajduje się watermark w przypadku łączenia strumieni, lub rozważ wstępne grupowanie zdarzeń w koszyki czasowe przed ich połączeniem.

Monitorowanie potoków

Dziennik zdarzeń potoku jest podstawowym prymitywem dotyczącym obserwowalności w potokach deklaratywnych Lakeflow Spark. Każde uruchomienie potoku zapisuje rekordy ustrukturyzowane w dzienniku zdarzeń obejmujące postęp wykonywania, wyniki oczekiwań dotyczących jakości danych, pochodzenie danych i szczegóły błędu. Dziennik zdarzeń to tabela Delta, którą można przeszukiwać bezpośrednio.

Aby wysłać zapytanie do dziennika zdarzeń bez znajomości podstawowej ścieżki magazynu, użyj funkcji typu tabela event_log() w udostępnionym klastrze lub magazynie SQL:

SELECT * FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 100;

Twórz pulpity nawigacyjne dotyczące jakości danych, wysyłając zapytanie do dziennika zdarzeń pod kątem metryk oczekiwań. Kolumna details zawiera zagnieżdżoną strukturę JSON z wynikami zaliczenia/niezaliczenia dla każdego ograniczenia, której można użyć do śledzenia trendów jakości w czasie i powiadamiania o regresjach.

W przypadku alertów opartych na zdarzeniach użyj hooków zdarzeń, aby uruchamiać niestandardowe webhooki lub usługi powiadomień (takie jak Slack lub PagerDuty), gdy potok przetwarzania ulegnie awarii lub gdy próg jakości danych zostanie przekroczony. Hooki zdarzeń to funkcje języka Python, które uruchamiają się w odpowiedzi na zdarzenia w potoku.

Aby uzyskać więcej informacji, zobacz Monitorowanie potoków, dziennik zdarzeń potoku i Definiowanie niestandardowego monitorowania potoków za pomocą punktów zaczepienia zdarzeń.

Korzystanie z obliczeń bezserwerowych

Databricks zaleca obliczenia bezserwerowe dla nowych rur. W przypadku bezserwerowej konfiguracji klastra nie ma ręcznej konfiguracji — usługa Databricks automatycznie zarządza infrastrukturą. Potoki bezserwerowe używają rozszerzonego skalowania automatycznego, które można skalować zarówno w poziomie (więcej funkcji wykonawczych), jak i pionowo (większy rozmiar funkcji wykonawczej) w odpowiedzi na zapotrzebowanie na obciążenia. Potoki bezserwerowe zawsze używają katalogu Unity, więc śledzenie zarządzania i pochodzenia jest domyślnie wbudowane.

Aby uzyskać więcej informacji, zobacz Skonfiguruj potok bezserwerowy.

Organizowanie potoków przy użyciu architektury medalonu

Architektura medalionu organizuje dane w trzy warstwy logiczne — brązowe, srebrne i złote — z których każda ma odrębny cel. Mapowanie typów zestawów danych w deklaratywnych potokach Lakeflow Spark do właściwej warstwy zapewnia jasność odpowiedzialności każdej warstwy i ułatwia obsługę potoków.

  • Brąz: użyj tabel przesyłania strumieniowego, aby pozyskiwać nieprzetworzone dane z magazynu w chmurze, magistrali komunikatów lub źródeł CDC. Tabele z brązu zachowują nieprzetworzone dane źródłowe z minimalną transformacją, dzięki czemu warstwy srebra lub złota mogą ponownie przetwarzać dane ze źródła w warstwie z brązu, jeśli wymagania się zmienią.
  • Silver: użyj tabel przesyłania strumieniowego do przekształceń na poziomie wiersza przyrostowego (filtrowanie, czyszczenie i analizowanie). Użyj zmaterializowanych widoków, gdy logika warstwy srebrnej obejmuje łączenie wzbogacające względem tabel wymiarów lub złożone agregacje, które mogą korzystać z przyrostowego odświeżania.
  • Złoto: użyj zmaterializowanych widoków, aby wstępnie obliczyć agregacje, metryki i podsumowania stosowane w pulpitach nawigacyjnych, narzędziach raportowania i dla konsumentów końcowych.

Oddzielanie pozyskiwania (brązowego) i transformacji (srebrnej i złotej) w odrębne potoki DAG, gdy tylko jest to możliwe. Oddzielenie warstw umożliwia niezależne planowanie, monitorowanie i rozwiązywanie problemów każdej warstwy, a awaria potoku przekształcania nie blokuje dopływu nowych danych do warstwy brązowej.

Aby uzyskać więcej informacji, zobacz Streaming tables and Materialized views ( Tabele przesyłania strumieniowego i zmaterializowane widoki).