Delen via


Code voor declaratieve pijplijnen voor Lakeflow Spark ontwikkelen met SQL

Lakeflow Spark-declaratieve pijplijnen (SDP) introduceert verschillende nieuwe SQL-trefwoorden en -functies voor het definiëren van gerealiseerde weergaven en streamingtabellen in pijplijnen. SQL-ondersteuning voor het ontwikkelen van pijplijnen bouwt voort op de basisbeginselen van Spark SQL en voegt ondersteuning toe voor structured streaming-functionaliteit.

Gebruikers die bekend zijn met PySpark DataFrames geven mogelijk de voorkeur aan het ontwikkelen van pijplijncode met Python. Python biedt ondersteuning voor uitgebreidere tests en bewerkingen die lastig te implementeren zijn met SQL, zoals metaprogrammeringsbewerkingen. Zie Pijplijncode ontwikkelen met Python.

Raadpleeg de Pipeline SQL-taalreferentie voor een volledige verwijzing naar de syntaxis van de pijplijn.

Basisbeginselen van SQL voor pijplijnontwikkeling

SQL-code waarmee pijplijngegevenssets worden gemaakt, maakt gebruik van de CREATE OR REFRESH syntaxis om gerealiseerde weergaven en streamingtabellen te definiëren op basis van queryresultaten.

Het STREAM trefwoord geeft aan of de gegevensbron waarnaar wordt verwezen in een SELECT-component moet worden gelezen met semantiek voor streaming.

Lees- en schrijfbewerkingen zijn standaard ingesteld op de catalogus en het schema dat is opgegeven tijdens de pijplijnconfiguratie. Zie De doelcatalogus en het schema instellen.

De broncode van de pijplijn verschilt van SQL-scripts: SDP evalueert alle gegevenssetdefinities voor alle broncodebestanden die zijn geconfigureerd in een pijplijn en bouwt een gegevensstroomgrafiek voordat query's worden uitgevoerd. De volgorde van query's die in de bronbestanden worden weergegeven, definieert de volgorde van code-evaluatie, maar niet de volgorde van de uitvoering van query's.

Een gerealiseerde weergave maken met SQL

In het volgende codevoorbeeld ziet u de basissyntaxis voor het maken van een gerealiseerde weergave met SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Een streamingtabel maken met SQL

In het volgende codevoorbeeld ziet u de basissyntaxis voor het maken van een streamingtabel met SQL. Bij het lezen van een bron voor een streamingtabel geeft het STREAM trefwoord aan dat streaming-semantiek voor de bron moet worden gebruikt. Gebruik het STREAM trefwoord niet bij het creëren van een materialized view.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Opmerking

Gebruik het trefwoord STREAM om streaming-semantiek te gebruiken om uit de bron te lezen. Als de leesbewerking een wijziging of verwijdering van een bestaande record tegenkomt, wordt er een fout gegenereerd. Het is het veiligst om te lezen uit statische of alleen bij te voegen bronnen. Als u gegevens wilt opnemen die wijzigingen doorvoeren, kunt u Python en de SkipChangeCommits optie gebruiken om fouten te verwerken.

Gegevens laden uit objectopslag

Pijplijnen ondersteunen het laden van gegevens uit alle indelingen die worden ondersteund door Azure Databricks. Zie opties voor gegevensindeling.

Opmerking

In deze voorbeelden wordt gebruik gemaakt van gegevens die via de automatisch aan uw werkruimte gekoppelde /databricks-datasets beschikbaar zijn. Databricks raadt aan volumepaden of cloud-URI's te gebruiken om te verwijzen naar gegevens die zijn opgeslagen in cloudobjectopslag. Zie Wat zijn Unity Catalog-volumes?

Databricks raadt aan om automatisch laden en streamingtabellen te gebruiken bij het configureren van incrementele opnameworkloads voor gegevens die zijn opgeslagen in de opslag van cloudobjecten. Zie Wat is Auto Loader?

SQL maakt gebruik van de read_files-functie om de functionaliteit voor automatisch laden aan te roepen. U moet ook het STREAM trefwoord gebruiken om een streaming-leesbewerking met read_fileste configureren.

Hier volgt een beschrijving van de syntaxis voor read_files in SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM STREAM read_files(
    "<file-path>",
    [<option-key> => <option_value>, ...]
  )

Opties voor Auto Loader zijn sleutel-waardeparen. Zie Optiesvoor meer informatie over ondersteunde indelingen en opties.

In het volgende voorbeeld wordt een streamingtabel gemaakt op basis van JSON-bestanden met behulp van automatisch laden:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

De functie read_files ondersteunt ook batch-semantiek om gerealiseerde weergaven te maken. In het volgende voorbeeld wordt batch-semantiek gebruikt om een JSON-map te lezen en een gerealiseerde weergave te maken:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

Gegevens valideren met verwachtingen

U kunt verwachtingen gebruiken om beperkingen voor gegevenskwaliteit in te stellen en af te dwingen. Zie Gegevenskwaliteit beheren met de verwachtingen van pijplijnen.

De volgende code definieert een verwachting met de naam valid_data waarmee records worden verwijderd die null zijn tijdens gegevensopname:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Gematerialiseerde weergaven en streamingtabellen opvragen die in uw pijplijn zijn gedefinieerd

In het volgende voorbeeld worden vier gegevenssets gedefinieerd:

  • Een streamingtabel met de naam orders waarmee JSON-gegevens worden geladen.
  • Een gerealiseerde weergave met de naam customers waarmee CSV-gegevens worden geladen.
  • Een gerealiseerde weergave genaamd customer_orders die records uit de orders- en customers-gegevenssets koppelt, de order-tijdstempel naar een datum omzet en de velden customer_id, order_number, stateen order_date selecteert.
  • Een gerealiseerde weergave met de naam daily_orders_by_state waarmee het dagelijkse aantal orders voor elke status wordt geaggregeerd.

Opmerking

Wanneer u query's uitvoert op weergaven of tabellen in uw pijplijn, kunt u de catalogus en het schema rechtstreeks opgeven of kunt u de standaardinstellingen gebruiken die zijn geconfigureerd in uw pijplijn. In dit voorbeeld worden de orders, customersen customer_orders tabellen geschreven en gelezen uit de standaardcatalogus en het standaardschema dat is geconfigureerd voor uw pijplijn.

Traditionele publicatiemodus maakt gebruik van het LIVE schema om een query uit te voeren op andere gematerialiseerde weergaven en streamingtabellen die zijn gedefinieerd in uw pijplijn. In nieuwe pijplijnen wordt de syntaxis van het LIVE schema stilletjes genegeerd. Zie LIVE-schema (verouderd).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

Een persoonlijke tabel definiëren

U kunt de PRIVATE-clausule gebruiken bij het maken van een gematerialiseerde weergave of een streamingtabel. Wanneer u een persoonlijke tabel maakt, maakt u de tabel, maar maakt u de metagegevens voor de tabel niet. Met PRIVATE de component wordt SDP geïnstrueerd om een tabel te maken die beschikbaar is voor de pijplijn, maar die niet buiten de pijplijn mag worden geopend. Om de verwerkingstijd te verminderen, wordt een privétabel bewaard gedurende de levensduur van de pijplijn die deze maakt, en niet slechts één update.

Privétabellen kunnen dezelfde naam hebben als tabellen in de catalogus. Als u een niet-gekwalificeerde naam opgeeft voor een tabel in een pijplijn, wordt de persoonlijke tabel gebruikt als er zowel een persoonlijke tabel als een catalogustabel met die naam is.

Privétabellen werden eerder aangeduid als tijdelijke tabellen.

Records definitief verwijderen uit een gerealiseerde weergave of streamingtabel

Als u records definitief wilt verwijderen uit een streamingtabel waarvoor verwijderingsvectoren zijn ingeschakeld, zoals voor AVG-naleving, moeten aanvullende bewerkingen worden uitgevoerd op de onderliggende Delta-tabellen van het object. Zie Records definitief verwijderen uit een streamingtabelom ervoor te zorgen dat records uit een streamingtabel worden verwijderd.

Gematerialiseerde weergaven weerspiegelen altijd de gegevens in de onderliggende tabellen wanneer ze opnieuw opgebouwd worden. Als u gegevens in een gerealiseerde weergave wilt verwijderen, moet u de gegevens uit de bron verwijderen en de gerealiseerde weergave vernieuwen.

Waarden parameteriseren die worden gebruikt bij het declareren van tabellen of weergaven met SQL

Gebruik SET om een configuratiewaarde op te geven in een query die een tabel of weergave declareert, inclusief Spark-configuraties. Elke tabel of weergave die u in een bronbestand definieert nadat de SET instructie toegang heeft tot de gedefinieerde waarde. Spark-configuraties die zijn opgegeven met behulp van de SET-instructie, worden gebruikt bij het uitvoeren van de Spark-query voor een tabel of weergave na de SET-instructie. Als u een configuratiewaarde in een query wilt lezen, gebruikt u de syntaxis van tekenreeksinterpolatie ${}. In het volgende voorbeeld wordt een Spark-configuratiewaarde met de naam startDate ingesteld en wordt die waarde in een query gebruikt:

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Als u meerdere configuratiewaarden wilt opgeven, gebruikt u een afzonderlijke SET-instructie voor elke waarde.

Beperkingen

De PIVOT component wordt niet ondersteund. De pivot-bewerking in Spark vereist het gretige laden van invoergegevens om het uitvoerschema te berekenen. Deze mogelijkheid wordt niet ondersteund in pijplijnen.

Opmerking

De CREATE OR REFRESH LIVE TABLE syntaxis voor het maken van een gerealiseerde weergave is afgeschaft. Gebruik in plaats daarvan CREATE OR REFRESH MATERIALIZED VIEW.