Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tento článek popisuje, jak můžete pomocí kanálů deklarovat transformace datových sad a určit způsob zpracování záznamů pomocí logiky dotazu. Obsahuje také příklady běžných transformačních vzorů pro vytváření kanálů.
Datovou sadu můžete definovat pro jakýkoli dotaz, který vrací datový rámec. Jako transformace v deklarativních kanálech Lakeflow Spark můžete použít vestavěné operace Apache Spark, uživatelské definované funkce (UDF), vlastní logiku a modely MLflow. Po nahrání dat do vašeho kanálu můžete definovat nové datové sady proti vstupním zdrojům a vytvářet nové streamované tabulky, materializovaná zobrazení a pohledy.
Informace o efektivním provádění stavového zpracování v kanálu najdete v tématu Optimalizace stavového zpracování pomocí vodoznaků.
Kdy je vhodné použít zobrazení, materializovaná zobrazení a streamované tabulky
Při implementaci dotazů v rámci datového toku vyberte nejlepší typ datové sady, aby byla zajištěna jejich efektivita a udržovatelnost.
Zvažte použití zobrazení k provedení následujících kroků:
- Rozdělte velký nebo složitý dotaz, který chcete použít k jednodušší správě dotazů.
- Ověřte průběžné výsledky pomocí očekávání.
- Snižte náklady na úložiště a výpočetní prostředky pro výsledky, které nemusíte uchovávat. Vzhledem k tomu, že tabulky jsou materializované, vyžadují další výpočetní prostředky a prostředky úložiště.
Zvažte použití materializovaného zobrazení v následujících případech:
- Tabulku spotřebovávají více podřízených dotazů. Vzhledem k tomu, že zobrazení se počítají na vyžádání, je zobrazení znovu vypočteno při každém dotazování zobrazení.
- Tabulku spotřebovávají jiné kanály, úlohy nebo dotazy. Vzhledem k tomu, že zobrazení nejsou materializovaná, můžete je použít pouze ve stejném potrubí.
- Chcete zobrazit výsledky dotazu během vývoje. Vzhledem k tomu, že tabulky jsou materializované a dají se zobrazit a dotazovat mimo kanál, může použití tabulek během vývoje pomoct ověřit správnost výpočtů. Po ověření převeďte dotazy, které nevyžadují materializaci do zobrazení.
Zvažte použití streamované tabulky v následujících případech:
- Dotaz je definován proti zdroji dat, který se nepřetržitě nebo přírůstkově zvětšuje.
- Výsledky dotazu by se měly vypočítat přírůstkově.
- Kanál potřebuje vysokou propustnost a nízkou latenci.
Poznámka:
Streamované tabulky jsou vždy definovány vzhledem ke zdrojům streamování. K aplikaci aktualizací z informačních kanálů CDC můžete také použít streamovací zdroje s AUTO CDC ... INTO. Viz rozhraní API AUTO CDC: Zjednodušte zachytávání změn dat pomocí pipelin.
Vyloučení tabulek z cílového schématu
Pokud je nutné vypočítat zprostředkující tabulky, které nejsou určené pro externí spotřebu, můžete zabránit jejich publikování do schématu pomocí klíčového slova TEMPORARY. Dočasné tabulky stále ukládají a zpracovávají data podle sémantiky deklarativních kanálů Sparku Lakeflow, ale neměly by být přístupné mimo aktuální kanál. Dočasná tabulka přetrvává po celou dobu životnosti datového potrubí, které ji vytvoří. K deklaraci dočasných tabulek použijte následující syntaxi:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Python
@dp.table(
temporary=True)
def temp_table():
return ("...")
Kombinování streamovaných tabulek a materializovaných zobrazení v jednom kanálu
Streamované tabulky přebírají záruky zpracování z Apache Spark Structured Streaming a jsou nakonfigurované tak, aby zpracovávaly dotazy ze zdrojů dat pouze pro připojení, kde se nové řádky vždy vkládají do zdrojové tabulky, spíše než upraveny.
Poznámka:
Ačkoli streamované tabulky ve výchozím nastavení vyžadují zdroje dat pouze pro přidávání, pokud je zdrojem dat jiná streamovací tabulka, která vyžaduje aktualizace nebo odstranění, můžete toto chování přepsat pomocí příznaku "skipChangeCommits".
Běžný model streamování zahrnuje zpracování zdrojových dat k vytvoření počátečních datových sad v rámci zpracování. Tyto počáteční datové sady se běžně označují jako bronzové tabulky a často provádějí jednoduché transformace.
Naproti tomu konečné tabulky v datovém kanálu, běžně označované jako zlaté tabulky, často vyžadují složité agregace nebo čtení z cílů AUTO CDC ... INTO operace. Vzhledem k tomu, že tyto operace inherentně vytvářejí aktualizace místo přidávání, nejsou podporovány jako vstupy do streamovaných tabulek. Tyto transformace jsou vhodnější pro materializovaná zobrazení.
Kombinováním streamovaných tabulek a materializovaných zobrazení do jednoho kanálu můžete kanál zjednodušit, vyhnout se nákladnému opakovanému příjmu dat nebo opětovnému zpracování nezpracovaných dat a mít plný výkon SQL k výpočtu složitých agregací přes efektivně zakódovanou a filtrovanou datovou sadu. Následující příklad ukazuje tento typ smíšeného zpracování:
Poznámka:
Tyto příklady používají Auto Loader k načtení souborů z cloudového úložiště. Pokud chcete načíst soubory s funkcí Auto Loader v kanálu s povoleným katalogem Unity, musíte použít externí umístění. Další informace o používání katalogu Unity s kanály najdete v tématu Použití katalogu Unity s kanály.
Python
@dp.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dp.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)
@dp.materialized_view
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.read.table("streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
"abfss://path/to/raw/data",
format => "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id
Přečtěte si další informace o používání Auto Loaderu k přírůstkovému načítání souborů JSON z úložiště Azure.
statické sloučení streamů
Spojení typu proud-statická tabulka jsou dobrou volbou při denormalizaci souvislého proudu pouze přidávaných dat s především statickou tabulkou dimenzí.
Při každé aktualizaci potrubí se nové záznamy z datového proudu připojí k nejaktuálnějšímu snímku statické tabulky. Pokud jsou záznamy přidány nebo aktualizovány ve statické tabulce po zpracování odpovídajících dat z tabulky streamování, výsledné záznamy se nepřepočítávají, pokud se neprovede úplná aktualizace.
V kanálech nakonfigurovaných pro aktivované spuštění vrátí statická tabulka výsledky po spuštění aktualizace. V pipelinech nakonfigurovaných pro průběžné spouštění se při každém zpracování aktualizace dotazuje nejnovější verze statické tabulky.
Následuje příklad stream-statického spojení:
Python
@dp.table
def customer_sales():
return spark.readStream.table("sales").join(spark.read.table("customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
INNER JOIN LEFT customers USING (customer_id)
Efektivní výpočet agregací
Streamované tabulky můžete použít k přírůstkové výpočtu jednoduchých distribuačních agregací, jako je počet, minimum, maximum nebo součet a algebraické agregace, jako jsou průměr nebo směrodatná odchylka. Databricks doporučuje přírůstkovou agregaci pro dotazy s omezeným počtem skupin, jako je například dotaz s klauzulí GROUP BY country. Při každé aktualizaci se čtou jenom nová vstupní data.
Další informace o psaní dotazů deklarativních kanálů Sparku Lakeflow, které provádějí přírůstkové agregace, najdete v tématu Provádění okenních agregací s časovými vodoznaky.
Použití modelů MLflow v deklarativních kanálech Sparku Lakeflow
Poznámka:
Pokud chcete používat modely MLflow v kanálu s podporou katalogu Unity, musí být váš kanál nakonfigurovaný tak, aby používal kanál preview. Pokud chcete použít kanál current, musíte nakonfigurovat kanál tak, aby se publikoval do metastoru Hive.
V pipelinech můžete použít modely natrénované pomocí MLflow. Modely MLflow se v Azure Databricks považují za transformace, což znamená, že pracují se vstupem datového rámce Sparku a vrací výsledky jako datový rámec Sparku. Vzhledem k tomu, že deklarativní kanály Sparku Lakeflow definují datové sady proti datovým rámcům, můžete převést úlohy Apache Sparku, které používají MLflow, na kanály s pouhými několika řádky kódu. Další informace o MLflow najdete v tématu MLflow pro životní cyklus modelu ML.
Pokud už máte skript Pythonu, který volá model MLflow, můžete tento kód přizpůsobit pipeline pomocí dekorátoru @dp.table nebo @dp.materialized_view a zajistit, aby funkce byly definovány tak, aby vracely výsledky transformace. Deklarativní kanály Lakeflow Spark ve výchozím nastavení MLflow neinstalují, proto ověřte, že jste nainstalovali knihovny MLFlow s %pip install mlflow a na začátek svého zdrojového kódu jste naimportovali mlflow a dp. Úvod do syntaxe kanálu najdete v tématu Vývoj kódu kanálu pomocí Pythonu.
Pokud chcete v kanálech používat modely MLflow, proveďte následující kroky:
- Získejte ID spuštění a název modelu MLflow. ID spuštění a název modelu se používají k vytvoření URI modelu MLflow.
- Pomocí identifikátoru URI definujte UDF Sparku pro načtení modelu MLflow.
- Zavolejte UDF v definicích tabulky, aby se použil model MLflow.
Následující příklad ukazuje základní syntaxi pro tento vzor:
%pip install mlflow
from pyspark import pipelines as dp
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dp.materialized_view
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
Jako úplný příklad definuje následující kód UDF Sparku pojmenovaný loaded_model_udf, který načte model MLflow natrénovaný na data úvěrového rizika. Datové sloupce použité k předpovědi jsou předávány jako argument do uživatelsky definované funkce (UDF). Tabulka loan_risk_predictions vypočítá předpovědi pro každý řádek v loan_risk_input_data.
%pip install mlflow
from pyspark import pipelines as dp
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dp.materialized_view(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Zachovat ruční smazání nebo aktualizace
Deklarativní kanály Sparku pro Lakeflow umožňují ručně odstranit nebo aktualizovat záznamy z tabulky a provést operaci aktualizace pro opětovné dokončování podřízených tabulek.
Ve výchozím nastavení kanály přepočítávají výsledky tabulky na základě vstupních dat při každé aktualizaci, takže je nutné zajistit, aby se odstraněný záznam znovu nenačítal ze zdrojových dat. Nastavením vlastnosti tabulky pipelines.reset.allowed na false zabráníte aktualizacím tabulky, ale nezabráníte přírůstkovým zápisům do tabulek nebo novým datům v toku do tabulky.
Následující diagram znázorňuje příklad pomocí dvou streamovaných tabulek:
-
raw_user_tableingestuje nezpracovaná uživatelská data ze zdroje. -
bmi_tablepřírůstkově vypočítá skóre BMI pomocí váhy a výšky zraw_user_table.
Chcete ručně odstranit nebo aktualizovat záznamy uživatelů z raw_user_table a znovu zkompilovat bmi_table.
Následující kód ukazuje nastavení vlastnosti tabulky pipelines.reset.allowed na false tak, aby zakázal úplnou aktualizaci pro raw_user_table, aby zamýšlené úpravy byly zachovány v čase, ale podřízené tabulky se přepočítají, když je spuštěna aktualizace datového toku.
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);