Transformera data med Delta Live Tables

Den här artikeln beskriver hur du kan använda Delta Live Tables för att deklarera transformeringar på datauppsättningar och ange hur poster bearbetas via frågelogik. Den innehåller också några exempel på vanliga transformeringsmönster som kan vara användbara när du skapar Delta Live Tables-pipelines.

Du kan definiera en datauppsättning mot alla frågor som returnerar en DataFrame. Du kan använda inbyggda Apache Spark-åtgärder, UDF:er, anpassad logik och MLflow-modeller som transformeringar i din Delta Live Tables-pipeline. När data har matats in i din Delta Live Tables-pipeline kan du definiera nya datauppsättningar mot överordnade källor för att skapa nya strömmande tabeller, materialiserade vyer och vyer.

Information om hur du effektivt utför tillståndskänslig bearbetning med Delta Live Tables finns i Optimera tillståndskänslig bearbetning i Delta Live Tables med vattenstämplar.

När du ska använda vyer, materialiserade vyer och strömmande tabeller

För att säkerställa att dina pipelines är effektiva och underhållsbara väljer du den bästa datamängdstypen när du implementerar dina pipelinefrågor.

Överväg att använda en vy när:

  • Du har en stor eller komplex fråga som du vill dela upp i enklare att hantera frågor.
  • Du vill verifiera mellanliggande resultat med hjälp av förväntningar.
  • Du vill minska lagrings- och beräkningskostnaderna och inte kräva materialisering av frågeresultat. Eftersom tabeller materialiseras behöver de ytterligare beräknings- och lagringsresurser.

Överväg att använda en materialiserad vy när:

  • Flera underordnade frågor använder tabellen. Eftersom vyer beräknas på begäran beräknas vyn varje gång vyn efterfrågas.
  • Andra pipelines, jobb eller frågor använder tabellen. Eftersom vyer inte materialiseras kan du bara använda dem i samma pipeline.
  • Du vill visa resultatet av en fråga under utvecklingen. Eftersom tabeller materialiseras och kan visas och efterfrågas utanför pipelinen kan du använda tabeller under utvecklingen för att verifiera att beräkningarna är korrekta. När du har verifierat konverterar du frågor som inte kräver materialisering till vyer.

Överväg att använda en strömningstabell när:

  • En fråga definieras mot en datakälla som växer kontinuerligt eller inkrementellt.
  • Frågeresultat bör beräknas stegvis.
  • Högt dataflöde och låg svarstid önskas för pipelinen.

Kommentar

Strömmande tabeller definieras alltid mot strömmande källor. Du kan också använda strömmande källor med APPLY CHANGES INTO för att tillämpa uppdateringar från CDC-feeds. Se API:et TILLÄMPA ÄNDRINGAR: Förenkla insamlingen av ändringsdata i Delta Live Tables.

Kombinera strömmande tabeller och materialiserade vyer i en enda pipeline

Strömmande tabeller ärver bearbetningsgarantierna för Apache Spark Structured Streaming och är konfigurerade för att bearbeta frågor från tilläggsdatakällor, där nya rader alltid infogas i källtabellen i stället för att ändras.

Kommentar

Även om strömmande tabeller som standard kräver tilläggsdatakällor, kan du åsidosätta det här beteendet med flaggan skipChangeCommits när en strömmande källa är en annan strömmande tabell som kräver uppdateringar eller borttagningar.

Ett vanligt strömningsmönster omfattar inmatning av källdata för att skapa de första datauppsättningarna i en pipeline. Dessa inledande datauppsättningar kallas ofta bronstabeller och utför ofta enkla transformeringar.

De sista tabellerna i en pipeline, som ofta kallas guldtabeller , kräver däremot ofta komplicerade aggregeringar eller läsning från källor som är mål för en APPLY CHANGES INTO åtgärd. Eftersom dessa åtgärder skapar uppdateringar i stället för tillägg stöds de inte som indata till strömmande tabeller. Dessa transformeringar passar bättre för materialiserade vyer.

Genom att blanda strömmande tabeller och materialiserade vyer i en enda pipeline kan du förenkla din pipeline, undvika kostsam återinmatning eller ombearbetning av rådata och ha den fulla kraften i SQL för att beräkna komplexa aggregeringar över en effektivt kodad och filtrerad datauppsättning. I följande exempel visas den här typen av blandad bearbetning:

Kommentar

I de här exemplen används automatisk inläsning för att läsa in filer från molnlagring. För att ladda filer med Auto Loader i en Unity Catalog-aktiverad pipeline måste du använda externa platser. Mer information om hur du använder Unity Catalog med Delta Live Tables finns i Använda Unity Catalog med dina Delta Live Tables-pipelines.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Läs mer om hur du använder Auto Loader för att effektivt läsa JSON-filer från Azure Storage för inkrementell bearbetning.

Strömstatiska kopplingar

Stream-static-kopplingar är ett bra val när du avnormaliserar en kontinuerlig ström av tilläggsdata med en primärt statisk dimensionstabell.

Med varje pipelineuppdatering kopplas nya poster från strömmen till den senaste ögonblicksbilden av den statiska tabellen. Om poster läggs till eller uppdateras i den statiska tabellen efter att motsvarande data från strömningstabellen har bearbetats beräknas inte de resulterande posterna om såvida inte en fullständig uppdatering utförs.

I pipelines som konfigurerats för utlöst körning returnerar den statiska tabellen resultat när uppdateringen startades. I pipelines som konfigurerats för kontinuerlig körning, varje gång tabellen bearbetar en uppdatering, efterfrågas den senaste versionen av den statiska tabellen.

Följande är ett exempel på en ström-statisk koppling:

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

Beräkna aggregeringar effektivt

Du kan använda strömmande tabeller för att stegvis beräkna enkla fördelningsaggregeringar som antal, min, max eller summa och algebraiska aggregeringar som medelvärde eller standardavvikelse. Databricks rekommenderar inkrementell aggregering för frågor med ett begränsat antal grupper, till exempel en fråga med en GROUP BY country sats. Endast nya indata läse med varje uppdatering.

Mer information om hur du skriver Delta Live Tables-frågor som utför inkrementella aggregeringar finns i Utföra fönsterbaserade aggregeringar med vattenstämplar.

Använda MLflow-modeller i en Delta Live Tables-pipeline

Kommentar

Om du vill använda MLflow-modeller i en Unity Catalog-aktiverad pipeline måste din pipeline konfigureras för att använda preview kanalen. Om du vill använda current kanalen måste du konfigurera din pipeline för att publicera till Hive-metaarkivet.

Du kan använda MLflow-tränade modeller i Delta Live Tables-pipelines. MLflow-modeller behandlas som transformeringar i Azure Databricks, vilket innebär att de agerar på en Spark DataFrame-indata och returnerar resultat som en Spark DataFrame. Eftersom Delta Live Tables definierar datauppsättningar mot DataFrames kan du konvertera Apache Spark-arbetsbelastningar som utnyttjar MLflow till Delta Live Tables med bara några rader kod. Mer information om MLflow finns i ML-livscykelhantering med MLflow.

Om du redan har en Python-notebook-fil som anropar en MLflow-modell kan du anpassa den här koden till Delta Live Tables med hjälp av dekoratören @dlt.table och se till att funktioner definieras för att returnera omvandlingsresultat. Delta Live Tables installerar inte MLflow som standard, så se till att du %pip install mlflow och importerar mlflow och dlt överst i notebook-filen. En introduktion till Delta Live Tables-syntax finns i Exempel: Mata in och bearbeta Data för New York-babynamn.

Utför följande steg för att använda MLflow-modeller i Delta Live Tables:

  1. Hämta körnings-ID:t och modellnamnet för MLflow-modellen. Körnings-ID och modellnamn används för att konstruera URI:n för MLflow-modellen.
  2. Använd URI:n för att definiera en Spark UDF för att läsa in MLflow-modellen.
  3. Anropa UDF i dina tabelldefinitioner för att använda MLflow-modellen.

I följande exempel visas den grundläggande syntaxen för det här mönstret:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Som ett fullständigt exempel definierar följande kod en Spark UDF med namnet loaded_model_udf som läser in en MLflow-modell som tränats på låneriskdata. De datakolumner som används för att göra förutsägelsen skickas som ett argument till UDF. loan_risk_predictions Tabellen beräknar förutsägelser för varje rad i loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Behåll manuella borttagningar eller uppdateringar

Med Delta Live Tables kan du manuellt ta bort eller uppdatera poster från en tabell och utföra en uppdateringsåtgärd för att beräkna om underordnade tabeller.

Som standard beräknar Delta Live Tables tabellresultat baserat på indata varje gång en pipeline uppdateras, så du måste se till att den borttagna posten inte laddas om från källdata. Om du ställer in tabellegenskapen pipelines.reset.allowed för att false förhindra uppdateringar till en tabell, men förhindrar inte inkrementella skrivningar till tabellerna eller förhindrar att nya data flödar in i tabellen.

Följande diagram illustrerar ett exempel med två strömmande tabeller:

  • raw_user_table matar in rådata från en källa.
  • bmi_table beräknar inkrementellt BMI-poäng med hjälp av vikt och höjd från raw_user_table.

Du vill ta bort eller uppdatera användarposter manuellt från raw_user_table och kompilera bmi_tableom .

Behålla datadiagram

Följande kod visar hur du anger tabellegenskapen pipelines.reset.allowed till false för att inaktivera fullständig uppdatering för raw_user_table så att avsedda ändringar behålls över tid, men underordnade tabeller omberäknas när en pipelineuppdatering körs:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);