Sdílet prostřednictvím


Transformace dat pomocí dynamických tabulek Delta

Tento článek popisuje, jak můžete pomocí rozdílových živých tabulek deklarovat transformace datových sad a určit způsob zpracování záznamů pomocí logiky dotazu. Obsahuje také některé příklady běžných vzorů transformace, které můžou být užitečné při vytváření kanálů Delta Live Tables.

Datovou sadu můžete definovat pro jakýkoli dotaz, který vrací datový rámec. Jako transformace v kanálu Delta Live Tables můžete použít integrované operace Apache Sparku, funkce definované uživatelem, vlastní logiku a modely MLflow. Po ingestování dat do kanálu Delta Live Tables můžete definovat nové datové sady pro nadřazené zdroje a vytvářet nové streamované tabulky, materializovaná zobrazení a zobrazení.

Informace o efektivním provádění stavového zpracování pomocí rozdílových živých tabulek najdete v tématu Optimalizace stavového zpracování v rozdílových živých tabulkách s vodoznaky.

Kdy použít zobrazení, materializovaná zobrazení a streamované tabulky

Pokud chcete zajistit, aby vaše kanály byly efektivní a udržovatelné, vyberte při implementaci dotazů kanálu nejlepší typ datové sady.

Zvažte použití zobrazení v následujících případech:

  • Máte velký nebo složitý dotaz, který chcete rozdělit na snadněji spravované dotazy.
  • Chcete ověřit průběžné výsledky s využitím očekávání.
  • Chcete snížit náklady na úložiště a výpočetní prostředky a nevyžaduje materializaci výsledků dotazů. Vzhledem k tomu, že tabulky jsou materializované, vyžadují další výpočetní prostředky a prostředky úložiště.

Zvažte použití materializovaného zobrazení v následujících případech:

  • Tabulku spotřebovávají více podřízených dotazů. Vzhledem k tomu, že zobrazení se počítají na vyžádání, je zobrazení znovu vypočteno při každém dotazování zobrazení.
  • Tabulku spotřebovávají jiné kanály, úlohy nebo dotazy. Vzhledem k tomu, že zobrazení nejsou materializovaná, můžete je použít pouze ve stejném kanálu.
  • Chcete zobrazit výsledky dotazu během vývoje. Vzhledem k tomu, že tabulky jsou materializované a dají se zobrazit a dotazovat mimo kanál, může použití tabulek během vývoje pomoct ověřit správnost výpočtů. Po ověření převeďte dotazy, které nevyžadují materializaci do zobrazení.

Zvažte použití streamované tabulky v následujících případech:

  • Dotaz je definován proti zdroji dat, který se nepřetržitě nebo přírůstkově zvětšuje.
  • Výsledky dotazu by se měly vypočítat přírůstkově.
  • Pro kanál je požadovaná vysoká propustnost a nízká latence.

Poznámka:

streamované tabulky jsou vždy definovány proti zdrojům streamování. K instalaci aktualizací z informačních kanálů CDC můžete použít také zdroje APPLY CHANGES INTO streamování. Viz rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek.

Kombinování streamovaných tabulek a materializovaných zobrazení v jednom kanálu

streamované tabulky dědí záruky zpracování strukturovaného streamování Apache Sparku a jsou nakonfigurované tak, aby zpracovávaly dotazy ze zdrojů dat jen pro připojení, kde se nové řádky vždy vkládají do zdrojové tabulky, a ne upravovat.

Poznámka:

I když streamované tabulky ve výchozím nastavení vyžadují zdroje dat jen pro připojení, pokud je zdrojem streamování jiná streamovací tabulka, která vyžaduje aktualizace nebo odstranění, můžete toto chování přepsat příznakem skipChangeCommits.

Běžný model streamování zahrnuje ingestování zdrojových dat pro vytvoření počátečních datových sad v kanálu. Tyto počáteční datové sady se běžně označují jako bronzové tabulky a často provádějí jednoduché transformace.

Naproti tomu konečné tabulky v kanálu, běžně označované jako zlaté tabulky, často vyžadují složité agregace nebo čtení ze zdrojů, které jsou cílem APPLY CHANGES INTO operace. Vzhledem k tomu, že tyto operace vytvářejí aktualizace spíše než připojení, nejsou podporovány jako vstupy do streamovaných tabulek. Tyto transformace jsou vhodnější pro materializovaná zobrazení.

Kombinováním streamovaných tabulek a materializovaných zobrazení do jednoho kanálu můžete kanál zjednodušit, vyhnout se nákladnému opakovanému příjmu dat nebo opětovnému zpracování nezpracovaných dat a mít plný výkon SQL k výpočtu složitých agregací přes efektivně zakódovanou a filtrovanou datovou sadu. Následující příklad ukazuje tento typ smíšeného zpracování:

Poznámka:

Tyto příklady používají k načtení souborů z cloudového úložiště automatický zavaděč. Pokud chcete načíst soubory s automatickým zavaděčem v kanálu s povoleným katalogem Unity, musíte použít externí umístění. Další informace o používání katalogu Unity s dynamickými tabulkami Delta najdete v tématu Použití katalogu Unity s kanály Delta Live Tables.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Přečtěte si další informace o použití automatického zavaděče k efektivnímu čtení souborů JSON z úložiště Azure pro přírůstkové zpracování.

Statické spojení streamu

Statické spojení datových proudů jsou dobrou volbou při denormalizaci souvislého proudu dat jen pro připojení s primárně tabulkou statických dimenzí.

Při každé aktualizaci kanálu se nové záznamy ze streamu připojí k nejaktuálnějšímu snímku statické tabulky. Pokud jsou záznamy přidány nebo aktualizovány ve statické tabulce po zpracování odpovídajících dat z tabulky streamování, výsledné záznamy se nepřepočítávají, pokud se neprovede úplná aktualizace.

V kanálech nakonfigurovaných pro aktivované spuštění vrátí statická tabulka výsledky po spuštění aktualizace. V kanálech nakonfigurovaných pro průběžné spouštění se pokaždé, když tabulka zpracuje aktualizaci, dotazuje se nejnovější verze statické tabulky.

Následuje příklad statického spojení streamu:

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

Efektivní výpočet agregací

Streamované tabulky můžete použít k přírůstkové výpočtu jednoduchých distribuačních agregací, jako je počet, minimum, maximum nebo součet a algebraické agregace, jako jsou průměr nebo směrodatná odchylka. Databricks doporučuje přírůstkovou agregaci pro dotazy s omezeným počtem skupin, například dotaz s klauzulí GROUP BY country . Při každé aktualizaci se čtou jenom nová vstupní data.

Další informace o psaní dotazů Delta Live Tables, které provádějí přírůstkové agregace, najdete v tématu Provádění agregací s vodoznaky v otevírně.

Použití modelů MLflow v kanálu Delta Live Tables

Poznámka:

Pokud chcete používat modely MLflow v kanálu s podporou katalogu Unity, musí být váš kanál nakonfigurovaný tak, aby používal kanál preview . Pokud chcete kanál použít current , musíte nakonfigurovat kanál tak, aby se publikoval do metastoru Hive.

V kanálech Delta Live Tables můžete používat modely natrénované pomocí MLflow. Modely MLflow se v Azure Databricks považují za transformace, což znamená, že pracují se vstupem datového rámce Sparku a vrací výsledky jako datový rámec Sparku. Vzhledem k tomu, že rozdílové živé tabulky definují datové sady proti datovým rámcům, můžete převést úlohy Apache Sparku, které využívají MLflow na tabulky Delta Live Tables s pouhými několika řádky kódu. Další informace o MLflow najdete v tématu Správa životního cyklu ML pomocí MLflow.

Pokud už máte poznámkový blok Pythonu, který volá model MLflow, můžete tento kód přizpůsobit rozdílovým živým tabulkám pomocí dekorátoru @dlt.table a zajistit, aby funkce byly definovány tak, aby vracely výsledky transformace. Delta Live Tables ve výchozím nastavení neinstaluje MLflow, proto se ujistěte, že jste %pip install mlflow je a importujete a naimportujete mlflow do dlt horní části poznámkového bloku. Úvod do syntaxe rozdílových živých tabulek najdete v tématu Implementace kanálu Delta Live Tables s Pythonem.

Pokud chcete používat modely MLflow v Delta Live Tables, proveďte následující kroky:

  1. Získejte ID spuštění a název modelu MLflow. ID spuštění a název modelu se používají k vytvoření identifikátoru URI modelu MLflow.
  2. Pomocí identifikátoru URI definujte UDF Sparku pro načtení modelu MLflow.
  3. Zavolejte UDF v definicích tabulky, aby se použil model MLflow.

Následující příklad ukazuje základní syntaxi pro tento vzor:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Jako úplný příklad následující kód definuje UDF Sparku s názvem loaded_model_udf , který načte model MLflow natrénovaný na data úvěrového rizika. Datové sloupce použité k předpovědím se předávají jako argument funkce definované uživatelem. Tabulka vypočítá předpovědi pro každý řádek v loan_risk_input_datatabulce loan_risk_predictions .

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Zachování ručních odstranění nebo aktualizací

Delta Live Tables umožňuje ručně odstranit nebo aktualizovat záznamy z tabulky a provést operaci aktualizace pro přepočítání podřízených tabulek.

Delta Live Tables ve výchozím nastavení přepočítá výsledky tabulky na základě vstupních dat při každé aktualizaci kanálu, takže je nutné zajistit, aby odstraněný záznam nebyl znovu načten ze zdrojových dat. pipelines.reset.allowed Nastavením vlastnosti tabulky zabráníte false aktualizacím tabulky, ale nezabráníte přírůstkovým zápisům do tabulek nebo zabráníte tomu, aby se do tabulky přetékala nová data.

Následující diagram znázorňuje příklad pomocí dvou streamovaných tabulek:

  • raw_user_table ingestuje nezpracovaná uživatelská data ze zdroje.
  • bmi_table přírůstkově vypočítá skóre BMI pomocí váhy a výšky z raw_user_table.

Chcete ručně odstranit nebo aktualizovat záznamy uživatele z raw_user_table a znovu zkompilovat bmi_table.

Zachování datového diagramu

Následující kód ukazuje nastavení pipelines.reset.allowed vlastnosti tabulky tak, aby false byla zakázána úplná aktualizace raw_user_table tak, aby se požadované změny uchovály v průběhu času, ale podřízené tabulky se přepočítají při spuštění aktualizace kanálu:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);