Share via


Aanbevolen procedures voor Lakeflow Spark-declaratieve pijplijnen

Op deze pagina worden aanbevolen patronen beschreven voor het ontwerpen, bouwen en gebruiken van pijplijnen met Lakeflow Spark-declaratieve pijplijnen. Pas deze richtlijnen toe bij het starten van een nieuwe pijplijn of het verbeteren van een bestaande pijplijn.

Het juiste gegevenssettype kiezen

Lakeflow Spark Declaratieve Pijplijnen biedt drie typen gegevenssets: streamingtabellen, materiële weergaven en tijdelijke weergaven. Het kiezen van het juiste type voor elke laag van uw pijplijn voorkomt onnodige rekenkosten en zorgt ervoor dat uw code gemakkelijk te beredeneren is.

Streamingtabellen zijn de juiste keuze voor gegevensopname en streamingtransformaties met lage latentie. Elke invoerrij wordt slechts eenmaal gelezen en verwerkt, waardoor deze ideaal zijn voor werkbelastingen met alleen toevoegbewerkingen, gegevens met een hoog volume en gebeurtenisgestuurde verwerking vanuit cloudopslag of berichtenbussen.

Gerealiseerde weergaven zijn de juiste keuze voor complexe transformaties en analytische query's. Hun resultaten worden vooraf berekend en up-to-date gehouden met incrementele vernieuwing, zodat de query's snel zijn. U kunt de gegevens niet rechtstreeks wijzigen in een gerealiseerde weergave. De querydefinitie bepaalt de uitvoer.

Tijdelijke weergaven zijn weergaven met pijplijnbereik die uw transformatielogica organiseren zonder gegevens naar opslag te materialiseren. Gebruik deze voor tussenliggende stappen die geen eigen tabel nodig hebben.

De volgende tabel geeft een overzicht van het gebruik van elk type:

Gebruiksituatie Aanbevolen type Reden
Opname vanuit cloudopslag of een berichtenbus Tabel voor streaming Verwerkt elke record één keer; verwerkt werkbelastingen met een hoog volume en alleen toevoegwerkbelastingen.
CDC-streams (invoegen, updates, verwijderen) Tabel voor streaming Wordt gebruikt als doel van APPLY CHANGES INTO voor geordend en ontdubbeld CDC-opname.
Complexe aggregaties en join-bewerkingen Gerealiseerde weergave Incrementeel vernieuwd; vermijdt volledige hercomputatie voor elke update.
Versnelling van dashboardquery’s Gerealiseerde weergave Met vooraf berekende resultaten verlopen queries sneller dan tegen onbewerkte tabellen.
Tussenliggende transformaties (geen downstreamlezers) Tijdelijke weergave Organiseert pijplijnlogica zonder dat er opslagkosten in rekening worden gebracht.

Voor meer informatie, zie Streaming-tabellen, materiële weergaven en Lakeflow Spark Declaratieve Pijplijnen-concepten.

Declaratieve CDC gebruiken in plaats van imperatieve MERGE

Het implementeren van change data capture (CDC) met imperatieve SQL-instructies MERGE vereist aanzienlijke aangepaste code om gebeurtenisvolgorde, ontdubbeling, gedeeltelijke updates en schemaontwikkeling correct te verwerken. Elk van deze problemen moet onafhankelijk worden opgelost en de resulterende code is moeilijk te onderhouden en te testen.

Lakeflow Spark Declarative Pijplijnen biedt de APPLY CHANGES INTO instructie (SQL) en de apply_changes() functie (Python), waarmee volgorde, ontdubbeling, out-of-order gebeurtenissen en schema-evolutie declaratief worden verwerkt. U beschrijft de vorm van de wijzigingengegevensstroom en de doeltabel — de overgebleven verwerking wordt door de pijplijn afgehandeld. APPLY CHANGES INTO ondersteunt zowel SCD-type 1 (overschrijven) als SCD Type 2 (behoud van geschiedenis).

Zie Change data capture en snapshots en The AUTO CDC API's: Gegevens vastleggen met pijplijnen vereenvoudigen voor meer informatie.

Gegevenskwaliteit afdwingen met verwachtingen

Verwachtingen zijn waar/onwaar SQL-expressies die worden toegepast op elke rij die door een gegevensset wordt doorgegeven. Wanneer een rij niet aan de voorwaarde voldoet, reageert de pijplijn volgens het door u geconfigureerde schendingsbeleid. Verwachtingen verzenden metrische gegevens naar het gebeurtenislogboek van de pijplijn, ongeacht het beleid, zodat u trends in de gegevenskwaliteit in de loop van de tijd kunt bijhouden.

Een schendingsbeleid kiezen

Er zijn drie schendingsbeleidsregels beschikbaar. Kies degene die overeenkomt met uw tolerantie voor slechte gegevens:

  • waarschuwing (standaard): records die ongeldig zijn, worden naar de doeltabel geschreven en gemarkeerd in metingen. Gebruik dit beleid wanneer u alle gegevens wilt vastleggen, maar inzicht wilt krijgen in kwaliteitsproblemen.
  • drop: Records die niet geldig zijn, worden verwijderd voordat ze worden geschreven. Gebruik dit wanneer slechte rijen worden verwacht en niet downstream mag worden doorgegeven.
  • mislukt: de pijplijnupdate stopt op de eerste ongeldige record. Gebruik dit voor kritieke gegevens waarbij een slechte record een ernstig upstreamprobleem aangeeft.

In de volgende voorbeelden ziet u elk beleid dat wordt toegepast op een streamingtabel:

SQL

-- Warn: write invalid records but track them in metrics
CREATE OR REFRESH STREAMING TABLE orders_raw (
  CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL)
) AS SELECT * FROM STREAM read_files("/volumes/raw/orders", format => "json");

-- Drop: discard invalid records before writing
CREATE OR REFRESH STREAMING TABLE orders_clean (
  CONSTRAINT non_negative_amount EXPECT (amount >= 0) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(orders_raw);

-- Fail: stop the pipeline on any invalid record
CREATE OR REFRESH STREAMING TABLE orders_critical (
  CONSTRAINT required_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS SELECT * FROM STREAM(orders_clean);

Python

from pyspark import pipelines as dp

# Warn: write invalid records but track them in metrics
@dp.table
@dp.expect("valid_order_id", "order_id IS NOT NULL")
def orders_raw():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .load("/volumes/raw/orders")

# Drop: discard invalid records before writing
@dp.table
@dp.expect_or_drop("non_negative_amount", "amount >= 0")
def orders_clean():
    return spark.readStream.table("orders_raw")

# Fail: stop the pipeline on any invalid record
@dp.table
@dp.expect_or_fail("required_customer_id", "customer_id IS NOT NULL")
def orders_critical():
    return spark.readStream.table("orders_clean")

Ongeldige records in quarantaine plaatsen

Wanneer u afgekeurde records wilt behouden voor onderzoek in plaats van ze zonder melding te verwijderen, gebruikt u een quarantainepatroon. Routeer rijen die geen validatie uitvoeren naar een afzonderlijke streamingtabel met behulp van twee stromen: een die ongeldige rijen uit de hoofdtabel verwijdert en een seconde die alleen de ongeldige rijen naar een quarantainetabel schrijft. Hiermee kunt u slechte gegevens onderzoeken, corrigeren en opnieuw verwerken zonder uw schone gegevensset te besmetten.

Zie De aanbevelingen en geavanceerde patronen van verwachtingen voor een gedetailleerd voorbeeld van het quarantainepatroon.

Zie Gegevenskwaliteit beheren met pijplijnwachtingen voor meer informatie over verwachtingen.

Uw pijplijnen parameteriseren

Pijplijnen hebben standaardcatalogus- en schema-instellingen, dus code die in dezelfde catalogus en hetzelfde schema leest en schrijft, werkt in omgevingen zonder parameters. Als uw pijplijn echter moet verwijzen naar een tweede catalogus of een tweede schema, bijvoorbeeld het lezen uit een gedeelde broncatalogus die verschilt tussen ontwikkeling en productie, voorkomt u dat deze namen rechtstreeks in uw broncode worden gecodeerd. In plaats daarvan definieert u deze als parameters voor pijplijnconfiguratie (sleutel-waardeparen die zijn ingesteld in de pijplijninstellingen) en verwijst u ernaar in uw code. Hierdoor kan één codebase correct worden uitgevoerd in omgevingen door de parameterwaarden te wisselen.

SQL

CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id, COUNT(txn_id) AS txn_count, SUM(amount) AS total_amount
FROM ${source_catalog}.sales.transactions
GROUP BY account_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import count, sum

@dp.materialized_view
def transaction_summary():
    source_catalog = spark.conf.get("source_catalog")
    return spark.read.table(f"{source_catalog}.sales.transactions") \
        .groupBy("account_id") \
        .agg(
            count("txn_id").alias("txn_count"),
            sum("amount").alias("total_amount")
        )

Zie Parameters gebruiken met pijplijnen voor meer informatie.

De juiste pijplijnmodus kiezen voor elke omgeving

Ontwikkelings- en productie-updatemodi

Pijplijnen draaien in ofwel ontwikkelingsmodus of productiemodus. Kies de modus die overeenkomt met uw doel.

In de ontwikkelingsmodus gebruikt de pijplijn een langlopend cluster over updates en wordt er geen nieuwe poging uitgevoerd op fouten. Hierdoor wordt de iteratiecyclus versneld wanneer u pijplijncode ontwerpt en test, omdat u onmiddellijk foutdetails krijgt zonder te wachten op het opnieuw opstarten van het cluster.

In de productiemodus wordt het cluster onmiddellijk afgesloten nadat elke update is voltooid, waardoor de rekenkosten worden verlaagd. De pijplijn implementeert ook escalatie van herhaalde pogingen, inclusief het opnieuw opstarten van het cluster, om automatisch tijdelijke infrastructuurfouten af te handelen. Gebruik de productiemodus voor alle geplande pijplijnuitvoeringen.

Geactiveerd versus doorlopende pijplijnmodus

De geactiveerde modus verwerkt alle beschikbare gegevens en stopt vervolgens. Het is de juiste keuze voor de overgrote meerderheid van de pijplijnen: de pijplijnen die worden uitgevoerd volgens een planning (elk uur, dagelijks of op aanvraag) en geen versheid van gegevens van subminuten vereisen.

In de continue modus blijft het cluster actief en worden nieuwe gegevens verwerkt zodra deze binnenkomen. Dit is alleen geschikt wanneer voor uw use-case latentie in het bereik van seconden tot minuten is vereist. Omdat de continue modus een always-on-cluster vereist, is het aanzienlijk duurder dan de geactiveerde modus.

Zie Triggered versus continue pijplijnmodus en Pijplijnen configureren voor meer informatie.

Liquid clustering gebruiken voor gegevensindeling

Liquid clustering vervangt statische partitionering en ZORDER voor het optimaliseren van de gegevensindeling in Delta-tabellen. In tegenstelling tot partitionering, waarvoor u van tevoren een partitiekolom moet kiezen en gegevensscheefgroei kan veroorzaken wanneer waarden ongelijkmatig worden verdeeld, is dynamisch clusteren zelfregulerend, bestand tegen scheefgroei en incrementeel. Alleen de gegevens die opnieuw moeten worden georganiseerd, worden bij elke uitvoering herschreven.

Wijzig de clusterkolommen op elk gewenst moment zonder de volledige tabel opnieuw te schrijven naarmate querypatronen zich ontwikkelen.

Clusteringkolommen definiëren in de definitie van uw streamingtabel:

SQL

CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY (event_date, region)
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");

Python

from pyspark import pipelines as dp

@dp.table(cluster_by=["event_date", "region"])
def events():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "parquet") \
        .load("/volumes/raw/events")

Als u niet zeker weet op welke kolommen u wilt clusteren, kunt u CLUSTER BY AUTO gebruiken om Databricks de optimale clusteringkolommen te laten selecteren op basis van de werklast van uw query's.

Voor meer informatie, zie Streaming-tabellen en Gebruik Liquid Clustering voor tabellen.

Pijplijnen beheren met CI/CD- en Databricks-assetbundels

Versiebeheer voor de broncode van uw pijplijn en gebruik Databricks Asset Bundles om implementaties in verschillende omgevingen te beheren.

Zie Een door bron beheerde pijplijn maken, een pijplijn converteren naar een Databricks Asset Bundle-project en parameters gebruiken met pijplijnen voor meer informatie.

Pijplijncode opslaan in versiebeheer

Sla alle pijplijnbronbestanden (Python en SQL) naast uw bundelconfiguratie op in een Git-opslagplaats. Versiebeheer van het volledige project geeft u een volledige geschiedenis van wijzigingen, maakt samenwerking eenvoudiger en kunt u wijzigingen in een ontwikkelomgeving valideren voordat u ze naar productie promoveert.

Databricks raadt Databricks Asset Bundles aan voor het beheren van deze werkstroom. Een bundel definieert uw pijplijnconfiguratie in YAML naast uw broncode en met de databricks bundle CLI kunt u pijplijnen valideren, implementeren en uitvoeren vanuit uw terminal of een CI/CD-systeem.

Bundeldoelen gebruiken voor omgevingsisolatie

Bundels maken meerdere doelen mogelijk (bijvoorbeeld dev, staging, prod), elk met een eigen set onderdrukkingen voor catalogusnamen, clusterbeleid, meldingsadressen en andere instellingen. Combineer bundeldoelen met pijplijnparameters om de juiste omgevingsspecifieke waarden te injecteren tijdens de implementatie, waardoor uw broncode vrij blijft van omgevingsconstanten.

Een typische werkstroom ziet er als volgt uit:

  1. Een ontwikkelaar werkt aan een functievertakking en implementeert in een persoonlijke ontwikkelingspijplijn in een ontwikkelaarscatalogus.
  2. Bij samenvoegen met de hoofdvertakking wordt een CI-systeem uitgevoerd databricks bundle validate en databricks bundle deploy --target staging de pijplijn gevalideerd en geïmplementeerd in een faseringsomgeving.
  3. Zodra de tests zijn geslaagd, wordt het CI-systeem naar productie uitgerold met databricks bundle deploy --target prod.

Aanbevolen procedures voor streaming

Gebruik deze patronen om de status te beheren, late gegevens te beheren en streamingpijplijnen betrouwbaar te houden.

Zie Stateful verwerking optimaliseren met watermerken, Een pijplijn herstellen van een stroomcontrolepuntfout en historische gegevens invullen met pijplijnen voor meer informatie.

Gebruik watermerken voor stateful bewerkingen

Watermerken beperken de status die de pijplijn in het geheugen houdt tijdens stateful streamingbewerkingen, zoals gefensterde aggregaties en deduplicatie. Zonder een watermerk groeit de status onbeperkt doordat de pijplijn gegevens verzamelt voor elke mogelijke sleutel, wat uiteindelijk tot geheugenfouten leidt bij langdurig draaiende pijplijnen.

Een watermerk geeft een tijdstempelkolom en een tolerantiedrempel voor late gegevens op. Records die binnenkomen nadat de drempelwaarde is verstreken, worden verwijderd. Kies een drempelwaarde waarmee uw tolerantie voor late gegevens wordt afgeschreven van de geheugenkosten om die status open te houden.

In het volgende voorbeeld wordt een tumblingvensteraggregatie van één minuut berekend met een watermerk van drie minuten:

SQL

CREATE OR REFRESH STREAMING TABLE event_counts AS
SELECT window(event_time, '1 minute') AS time_window, region, COUNT(*) AS cnt
FROM STREAM(events_raw)
  WATERMARK event_time DELAY OF INTERVAL 3 MINUTES
GROUP BY time_window, region;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import window

@dp.table
def event_counts():
    return (
        spark.readStream.table("events_raw")
            .withWatermark("event_time", "3 minutes")
            .groupBy(window("event_time", "1 minute"), "region")
            .count()
    )

Opmerking

Om ervoor te zorgen dat aggregaties incrementeel worden verwerkt in plaats van volledig opnieuw te compileren bij elke update, moet u een watermerk definiëren.

Inzicht in streamingstatus en volledig vernieuwen

Streaming-staat is incrementeel: de pijplijn bouwt en onderhoudt de staat door updates heen in plaats van telkens opnieuw te berekenen. Dit maakt stateful streaming efficiënt, maar betekent ook dat als u de logica van een stateful query wijzigt (bijvoorbeeld het wijzigen van een drempelwaarde voor watermerken of het wijzigen van aggregatiekolommen), de bestaande status niet meer compatibel is met de nieuwe logica. In dit geval moet u een volledige vernieuwing uitvoeren om alle historische gegevens opnieuw te verwerken met de nieuwe logica en de status helemaal opnieuw opbouwen.

Een volledige vernieuwing kan ook leiden tot gegevensverlies als de bron geen historische gegevens bewaart. Een Kafka-bron met een korte bewaarperiode kan bijvoorbeeld alleen de laatste paar minuten aan gegevens beschikbaar zijn op het moment van vernieuwen, wat resulteert in een tabel die veel minder gegevens bevat dan voorheen. Plan de logica voor opgeslagen query’s zorgvuldig, vooral voor streams met een hoge volume waarbij een volledige vernieuwing kostbaar is of waarbij de bron beperkte gegevensretentie heeft. Het gebruik van de medaillonarchitectuur helpt door bronzen tabellen te maken met minimale transformatie en stelt zilveren of gouden tabellen in staat om opnieuw te berekenen van de bronzen tabellen met volledige geschiedenis.

Stream-stream-koppelingen

Stream-stream-joins vereisen een watermerk aan beide zijden van de join en een tijdgebonden join-voorwaarde. Het tijdsinterval in de joinvoorwaarde vertelt de streaming-engine wanneer er geen verdere overeenkomsten mogelijk zijn, zodat de status kan worden verwijderd die niet meer kan worden vergeleken. Als u de watermerken of de tijdgebonden voorwaarde weglaat, groeit de status zonder grenzen.

In het volgende voorbeeld worden gebeurtenissen voor advertentie-impressies samengevoegd met klik-gebeurtenissen, waardoor de klik binnen drie minuten na de indruk moet plaatsvinden:

SQL

CREATE OR REFRESH STREAMING TABLE impression_clicks AS
SELECT imp.ad_id, imp.impression_time, clk.click_time
FROM STREAM(ad_impressions)
    WATERMARK impression_time DELAY OF INTERVAL 3 MINUTES AS imp
JOIN STREAM(user_clicks)
    WATERMARK click_time DELAY OF INTERVAL 3 MINUTES AS clk
ON imp.ad_id = clk.ad_id
  AND clk.click_time BETWEEN imp.impression_time
    AND imp.impression_time + INTERVAL 3 MINUTES;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import expr

dp.create_streaming_table("impression_clicks")

@dp.append_flow(target="impression_clicks")
def join_impressions_and_clicks():
    impressions = spark.readStream.table("ad_impressions") \
        .withWatermark("impression_time", "3 minutes")
    clicks = spark.readStream.table("user_clicks") \
        .withWatermark("click_time", "3 minutes")
    return impressions.alias("imp").join(
        clicks.alias("clk"),
        expr("""
            imp.ad_id = clk.ad_id AND
            clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL 3 MINUTES
        """),
        "leftOuter"
    )

Wanneer u een stream koppelt aan een statische tabel (een momentopnamedeelname), wordt de momentopname van de statische tabel aan het begin van elke microbatch vernieuwd. Dit betekent dat latere dimensierecords niet met terugwerkende kracht worden toegepast op feiten die al zijn verwerkt. Als retroactieve toepassing is vereist, gebruikt u een gerealiseerde weergave of herstructureert u de pijplijn.

Pijplijnprestaties optimaliseren

Pas deze technieken toe om de rekenkosten te verlagen en pijplijnupdates te versnellen.

Zie Gerealiseerde weergaven en Stateful verwerking optimaliseren met watermerken voor meer informatie.

Kleine bestanden voorkomen

Als u een pijplijn te vaak activeert op een bron met een laag volume, wordt een groot aantal kleine bestanden naar de cloudopslag geschreven. Kleine bestanden verminderen de uitvoerprestaties omdat voor elk bestand een afzonderlijke zoekopdracht naar metadata en een I/O-roundtrip vereist is, en API's voor cloudopslag de bewerkingen bij opschaling beperken. U kunt dit voorkomen door een triggerinterval te kiezen dat overeenkomt met uw gegevensvolume: geactiveerde pijplijnen uitvoeren volgens een schema waarmee een zinvolle hoeveelheid gegevens kan worden verzameld tussen updates, in plaats van continu.

Omgaan met scheefheid van gegevens

Scheeftrekken van gegevens treedt op wanneer waarden in een join- of groupBy-sleutel ongelijk verdeeld zijn over partities, waardoor een klein aantal taken het merendeel van de gegevens verwerkt. Hiermee worden hotspots gecreëerd waardoor de end-to-end update tijd toeneemt. Gebruik vloeistofclustering om scheeftrekken in opgeslagen tabellen aan te pakken. Voor vervorming die optreedt tijdens tussentijdse berekening, randomiseer zeer vervormde sleutels door een willekeurig bucketachtervoegsel toe te voegen voordat ze in twee fasen worden gegroepeerd en samengevoegd.

Zie Liquid Clustering gebruiken voor gegevensindeling voor meer informatie.

Incrementeel vernieuwen gebruiken voor gerealiseerde weergaven

Wanneer u een gerealiseerde weergave gebruikt voor een grote aggregatie, probeert Lakeflow Spark-declaratieve pijplijnen deze incrementeel te vernieuwen. Alleen de upstreamwijzigingen worden verwerkt sinds de laatste update in plaats van de volledige resultatenset opnieuw te berekenen. Incrementele vernieuwing is aanzienlijk goedkoper dan het opnieuw uitvoeren van de query bij iedere pijplijnactivering. Om de kans te maximaliseren dat een gerealiseerde weergave incrementeel kan worden vernieuwd, schrijft u eenvoudige, deterministische aggregatiequery's en vermijdt u constructies die incrementele verwerking voorkomen, zoals niet-deterministische functies.

Zie Stapsgewijze vernieuwing voor gematerialiseerde weergaven.

Joins optimaliseren

Voor joins waarbij één kant een kleine dimensie tabel is, voegt u een broadcast hint toe om Spark te instrueren de kleinere tabel naar alle uitvoerders te broadcasten in plaats van een shuffle join uit te voeren.

SQL

CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT o.*, /*+ BROADCAST(p) */ p.product_name, p.category
FROM orders o
JOIN products p ON o.product_id = p.product_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast

@dp.materialized_view
def enriched_orders():
    orders = spark.read.table("orders")
    products = spark.read.table("products")
    return orders.join(broadcast(products), "product_id")

Voor nabijheidskoppelingen van tijdreeksen (bijvoorbeeld het vinden van de dichtstbijzijnde gebeurtenis binnen een tijdsbereik), gebruikt u een voorwaarde voor een bereik-koppeling en zorgt u ervoor dat beide zijden een watermerk hebben bij het samenvoegen van gegevensstromen, of overweeg het vooraf in klassen indelen van gebeurtenissen in tijdvakjes voordat u koppelt.

Uw pijplijnen bewaken

Het gebeurtenislogboek van de pijplijn is het primaire waarneembaarheidsprimitief in Lakeflow Spark-declaratieve pijplijnen. Elke pijplijnuitvoering schrijft gestructureerde records naar het gebeurtenislogboek over de voortgang van de uitvoering, resultaten van de gegevenskwaliteit, gegevensherkomst en foutdetails. Het gebeurtenislogboek is een Delta-tabel die u rechtstreeks kunt opvragen.

Als u een query wilt uitvoeren op het gebeurtenislogboek zonder het onderliggende opslagpad te kennen, gebruikt u de event_log() tabelwaardefunctie in een gedeeld cluster of SQL Warehouse:

SELECT * FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 100;

Bouw dashboards voor gegevenskwaliteit door een query uit te voeren op het gebeurtenislogboek voor de verwachte metrische gegevens. De details kolom bevat een geneste JSON-structuur met pass/fail-tellingen voor elke beperking, die u kunt gebruiken voor het bijhouden van kwaliteitstrends in de loop van de tijd en waarschuwingen voor regressies.

Voor gebeurtenisgestuurde waarschuwingen gebruikt u gebeurtenishook om aangepaste webhooks of meldingsservices (zoals Slack of PagerDuty) te activeren wanneer een pijplijn mislukt of wanneer een drempelwaarde voor gegevenskwaliteit wordt overschreden. Gebeurtenishooks zijn Python-functies die worden uitgevoerd als reactie op pijplijn-gebeurtenissen.

Zie voor meer informatie pijplijnen bewaken, pijplijn-evenementlogboek en aangepaste bewaking van pijplijnen definiëren met gebeurtenishooks.

Serverloze rekenkracht gebruiken

Databricks raadt serverloze berekeningen aan voor nieuwe pijplijnen. Met serverloos is er geen handmatige clusterconfiguratie. Databricks beheert de infrastructuur automatisch. Serverloze pijplijnen maken gebruik van verbeterde automatische schaalaanpassing die zowel horizontaal (meer uitvoerders) als verticaal (grotere uitvoerders) kan worden geschaald als reactie op de vraag naar werkbelastingen. Serverloze pijplijnen maken altijd gebruik van Unity Catalog, dus governance en herkomsttracering zijn standaard ingebouwd.

Zie Een serverloze pijplijn configureren voor meer informatie.

Pijplijnen organiseren met de medaille-architectuur

Met de medaillestructuur worden gegevens ingedeeld in drie logische lagen , brons, zilver en goud , elk met een duidelijk doel. Door Lakeflow Spark-gegevenssettypen declaratieve pijplijnen toe te wijzen aan de juiste laag, blijven de verantwoordelijkheden van elke laag duidelijk en kunnen pijplijnen eenvoudiger worden onderhouden.

  • Brons: Gebruik streamingtabellen om onbewerkte gegevens op te nemen uit cloudopslag, berichtenbussen of CDC-bronnen. Brontabellen behouden de onbewerkte brongegevens met minimale transformatie, waardoor zilver- of goudlagen opnieuw kunnen worden verwerkt vanuit de bron in de bronlaag als de vereisten veranderen.
  • Zilver: Gebruik streamingtabellen voor incrementele transformaties op rijniveau (filteren, opschonen en parseren). Gebruik gematerialiseerde weergaven wanneer er logica op de silver-layer plaatsvindt die verrijkingskoppelingen tegen dimensietabellen of complexe aggregaties omvat, die profiteren van incrementele verversing.
  • Gold: Gebruik gerealiseerde weergaven om aggregaties, metrische gegevens en samenvattingen vooraf te berekenen voor dashboards, rapportagehulpprogramma's en downstreamgebruikers.

Afzonderlijke opname (brons) en transformatie (zilver en goud) in afzonderlijke pijplijn-DAG's waar mogelijk. Door de lagen los te koppelen, kunt u elke laag onafhankelijk plannen, bewaken en oplossen. Een fout in een transformatiepijplijn blokkeert niet dat nieuwe gegevens in brons kunnen landen.

Zie Streamingtabellen en gematerialiseerde weergaven voor meer informatie.