Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Lakeflow Spark Declarative Pipelines (SDP) introduce diversi nuovi costrutti di codice Python per la definizione di viste materializzate e tabelle di streaming nelle pipeline. Il supporto python per lo sviluppo di pipeline si basa sulle nozioni di base del dataframe PySpark e delle API di streaming strutturato.
Per gli utenti che non hanno familiarità con Python e i dataframe, Databricks consiglia di usare l'interfaccia SQL. Vedere Sviluppare codice di pipeline dichiarative di Lakeflow Spark con SQL.
Per un riferimento completo alla sintassi Python di Lakeflow SDP, vedere Riferimento sul linguaggio Python per pipeline dichiarative di Lakeflow Spark.
Nozioni di base su Python per lo sviluppo di pipeline
Il codice Python che crea set di dati pipline deve restituire dataframe.
Tutte le API Python per pipeline dichiarative di Lakeflow Spark vengono implementate nel modulo pyspark.pipelines. Il codice della pipeline implementato con Python deve importare in modo esplicito il modulo pipelines all'inizio del codice sorgente Python. Negli esempi viene usato il comando di importazione seguente e viene usato dp negli esempi per fare riferimento a pipelines.
from pyspark import pipelines as dp
Annotazioni
Apache Spark™ include pipeline dichiarative a partire da Spark 4.1, disponibili tramite il pyspark.pipelines modulo. Databricks Runtime estende queste funzionalità open source con API e integrazioni aggiuntive per l'uso della produzione gestita.
Il codice scritto con il modulo open source pipelines viene eseguito senza modifiche in Azure Databricks. Le funzionalità seguenti non fanno parte di Apache Spark:
dp.create_auto_cdc_flowdp.create_auto_cdc_from_snapshot_flow@dp.expect(...)@dp.temporary_view
La pipeline legge e scrive per impostazione predefinita il catalogo e lo schema specificati durante la configurazione della pipeline. Vedere Impostare il catalogo e lo schema di destinazione.
Il codice Python specifico della pipeline differisce da altri tipi di codice Python in modo critico: il codice della pipeline Python non chiama direttamente le funzioni che eseguono l'inserimento e la trasformazione dei dati per creare set di dati. SDP interpreta invece le funzioni decorator del dp modulo in tutti i file di codice sorgente configurati in una pipeline e compila un grafico del flusso di dati.
Importante
Per evitare comportamenti imprevisti durante l'esecuzione della pipeline, non includere codice che potrebbe avere effetti collaterali nelle funzioni che definiscono i set di dati. Per ulteriori informazioni, consultare il riferimento Python .
Creare una vista materializzata o una tabella di streaming con Python
Usare @dp.table per creare una tabella di streaming dai risultati di una lettura in streaming. Usare @dp.materialized_view per creare una vista materializzata dai risultati di una lettura batch.
Per impostazione predefinita, i nomi della vista materializzata e della tabella di streaming sono derivati dai nomi delle funzioni. L'esempio di codice seguente illustra la sintassi di base per la creazione di una vista materializzata e di una tabella di streaming:
Annotazioni
Entrambe le funzioni fanno riferimento alla stessa tabella nel catalogo samples e usano la stessa funzione decoratore. Questi esempi evidenziano che l'unica differenza nella sintassi di base per le viste materializzate e le tabelle di streaming consiste nell'usare spark.read rispetto a spark.readStream.
Non tutte le origini dati supportano le letture in streaming. Alcune origini dati devono essere sempre elaborate con la semantica dello streaming.
from pyspark import pipelines as dp
@dp.materialized_view()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dp.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Opzionalmente, è possibile specificare il nome della tabella utilizzando l'argomento name nel decoratore @dp.table. L'esempio seguente illustra questo modello per una vista materializzata e una tabella di streaming:
from pyspark import pipelines as dp
@dp.materialized_view(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dp.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Caricare dati dall'archiviazione di oggetti
Le pipeline supportano il caricamento dei dati da tutti i formati supportati da Azure Databricks. Vedere Opzioni di formato dati.
Annotazioni
Questi esempi utilizzano i dati disponibili sotto il /databricks-datasets, montato automaticamente nel tuo spazio di lavoro. Databricks consiglia di usare i percorsi del volume o gli URI cloud per fare riferimento ai dati archiviati nell'archiviazione di oggetti cloud. Consulta Che cosa sono i volumi di Unity Catalog?.
Databricks consiglia di usare il caricatore automatico e le tabelle di streaming quando si configurano carichi di lavoro di inserimento incrementali sui dati archiviati nell'archiviazione di oggetti cloud. Consulta Che cos'è il caricatore automatico?.
L'esempio seguente crea una tabella di streaming da file JSON usando il caricatore automatico:
from pyspark import pipelines as dp
@dp.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
L'esempio seguente usa la semantica batch per leggere una directory JSON e creare una vista materializzata:
from pyspark import pipelines as dp
@dp.materialized_view()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Convalidare i dati con le aspettative
È possibile usare le aspettative per impostare e applicare vincoli di qualità dei dati. Vedi Gestisci la qualità dei dati con le aspettative della pipeline.
Il codice seguente usa @dp.expect_or_drop per definire un'aspettativa denominata valid_data che elimina i record null durante l'inserimento dati:
from pyspark import pipelines as dp
@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Esegui query su viste materializzate e tabelle di streaming definite nella tua pipeline
L'esempio seguente definisce quattro set di dati:
- Tabella di streaming denominata
ordersche carica i dati JSON. - Vista materializzata denominata
customersche carica i dati CSV. - Una vista materializzata denominata
customer_ordersche unisce i record dai set di datiordersecustomers, converte il timestamp dell'ordine in una data e seleziona i campicustomer_id,order_number,stateeorder_date. - Vista materializzata denominata
daily_orders_by_stateche aggrega il conteggio giornaliero degli ordini per ogni stato.
Annotazioni
Quando si eseguono query su viste o tabelle nella pipeline, è possibile specificare direttamente il catalogo e lo schema oppure usare le impostazioni predefinite configurate nella pipeline. In questo esempio, le tabelle orders, customerse customer_orders vengono scritte e lette dal catalogo e dallo schema predefiniti configurati per la pipeline.
La modalità di pubblicazione legacy usa lo schema LIVE per eseguire query su altre viste materializzate e tabelle di streaming definite nella pipeline. Nelle nuove pipeline la sintassi dello schema LIVE viene ignorata automaticamente. Visualizza lo schema IN DIRETTA (legacy) .
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dp.materialized_view()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dp.materialized_view()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dp.materialized_view()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Creare tabelle in un ciclo di for
È possibile usare cicli di for Python per creare più tabelle a livello di codice. Ciò può essere utile quando si hanno molte origini dati o set di dati di destinazione che variano in base solo a pochi parametri, con conseguente minore quantità di codice totale per mantenere e diminuire la ridondanza del codice.
Il ciclo for valuta la logica in ordine seriale, ma una volta completata la pianificazione per i set di dati, la pipeline esegue la logica in parallelo.
Importante
Quando si usa questo modello per definire i set di dati, assicurarsi che l'elenco di valori passati al ciclo for sia sempre aggiuntivo. Se un set di dati definito in precedenza in una pipeline viene omesso da un'esecuzione futura della pipeline, tale set di dati viene eliminato automaticamente dallo schema di destinazione.
Nell'esempio seguente vengono create cinque tabelle che filtrano gli ordini dei clienti in base all'area. In questo caso, il nome dell'area viene usato per impostare il nome delle viste materializzate di destinazione e per filtrare i dati di origine. Le viste temporanee vengono usate per definire i join dalle tabelle di origine impiegate per creare le viste materializzate finali.
from pyspark import pipelines as dp
from pyspark.sql.functions import collect_list, col
@dp.temporary_view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dp.temporary_view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dp.materialized_view(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
Di seguito è riportato un esempio del grafico del flusso di dati per questa pipeline:
Risoluzione dei problemi: for ciclo crea molte tabelle con gli stessi valori
Il modello di esecuzione differita utilizzato dalle pipeline per valutare il codice Python richiede che la tua logica faccia riferimento direttamente ai singoli valori quando viene invocata la funzione decorata da @dp.materialized_view().
Nell'esempio seguente vengono illustrati due approcci corretti per definire le tabelle con un ciclo for. In entrambi gli esempi, a ogni nome di tabella dell'elenco tables viene fatto riferimento in modo esplicito all'interno della funzione decorata da @dp.materialized_view().
from pyspark import pipelines as dp
# Create a parent function to set local variables
def create_table(table_name):
@dp.materialized_view(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dp.materialized_view()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dp.materialized_view(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
Nell'esempio seguente non fa riferimento ai valori correttamente. Questo esempio crea tabelle con nomi distinti, ma tutte le tabelle caricano i dati dall'ultimo valore nel ciclo for:
from pyspark import pipelines as dp
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dp.materialized(name=t_name)
def create_table():
return spark.read.table(t_name)
Eliminare definitivamente i record da una vista materializzata o da una tabella di streaming
Per eliminare definitivamente i record da una vista materializzata o da una tabella di streaming con vettori di eliminazione abilitati, ad esempio per la conformità al GDPR, è necessario eseguire operazioni aggiuntive sulle tabelle Delta sottostanti dell'oggetto. Per garantire l'eliminazione dei record da una vista materializzata, vedere Eliminare definitivamente i record da una vista materializzata con vettori di eliminazione abilitati. Per garantire l'eliminazione dei record da una tabella di streaming, vedere Eliminare definitivamente i record da una tabella di streaming.