Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
È possibile caricare dati da qualsiasi origine dati supportata da Apache Spark in Azure Databricks usando le pipeline. È possibile definire set di dati (tabelle e viste) in Pipeline dichiarative di Lakeflow Spark su qualsiasi query che restituisca un dataframe Spark, inclusi i dataframe di streaming e Pandas per i dataframe Spark. Per le attività di inserimento dati, Databricks consiglia di usare le tabelle di streaming per la maggior parte dei casi d'uso. Le tabelle di streaming sono valide per l'inserimento di dati dall'archiviazione di oggetti cloud tramite il caricatore automatico o da bus di messaggi come Kafka.
Annotazioni
- Non tutte le origini dati supportano SQL per l'inserimento. È possibile combinare origini SQL e Python nelle pipeline per usare Python dove è necessario e SQL per altre operazioni nella stessa pipeline.
- Per informazioni dettagliate sull'uso delle librerie non incluse in pacchetti nelle pipeline dichiarative di Lakeflow Spark per impostazione predefinita, vedere Gestire le dipendenze Python per le pipeline.
- Per informazioni generali sull'inserimento in Azure Databricks, vedere Connettori Standard in Lakeflow Connect.
Gli esempi seguenti illustrano alcuni modelli comuni.
Caricare da una tabella esistente
Caricare dati da qualsiasi tabella esistente in Azure Databricks. È possibile trasformare i dati usando una query o caricare la tabella per un'ulteriore elaborazione nella pipeline.
L'esempio seguente legge i dati da una tabella esistente:
Pitone
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Caricare file dall'archiviazione di oggetti cloud
Databricks consiglia di usare il caricatore automatico nelle pipeline per la maggior parte delle attività di inserimento dati dall'archiviazione di oggetti cloud o da file in un volume di Unity Catalog. Il caricatore automatico e le pipeline sono progettati per caricare in modo incrementale e idempotente i dati in continua crescita man mano che arrivano nell'archiviazione cloud.
Vedere Che cos'è il caricatore automatico? e Caricare i dati dall'archivio oggetti.
L'esempio seguente legge i dati dall'archiviazione cloud usando il caricatore automatico:
Pitone
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
Gli esempi seguenti usano Il caricatore automatico per creare set di dati da file CSV in un volume del catalogo Unity:
Pitone
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Annotazioni
- Se si usa Il caricatore automatico con le notifiche dei file ed è necessario eseguire un aggiornamento completo per la pipeline o la tabella di streaming, è necessario pulire manualmente le risorse. È possibile usare il CloudFilesResourceManager in un notebook per eseguire la pulizia.
- Per caricare i file con Auto Loader in una pipeline abilitata per Unity Catalog, è necessario usare percorsi esterni. Per altre informazioni sull'uso di Unity Catalog con le pipeline, vedere Usare Il catalogo unity con le pipeline.
Caricare dati da un bus di messaggi
È possibile configurare le pipeline per acquisire i dati dai bus di messaggi. Databricks consiglia di usare tabelle di streaming con esecuzione continua e scalabilità automatica avanzata per fornire l'inserimento più efficiente per il caricamento a bassa latenza dai bus di messaggi. Vedere Ottimizzare l'utilizzo del cluster delle pipeline dichiarative di Lakeflow Spark con scalabilità automatica.
Ad esempio, il codice seguente configura una tabella di streaming per inserire dati da Kafka, usando la funzione read_kafka :
Pitone
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Per acquisire dati da altre sorgenti del bus di messaggi, vedere:
- Read_kinesis
- Argomento pub/sub: read_pubsub
- Pulsar: read_pulsar
Caricare dati da Hub eventi di Azure
Hub eventi di Azure è un servizio di streaming di dati che fornisce un'interfaccia compatibile con Apache Kafka. È possibile usare il connettore Structured Streaming Kafka, incluso nel runtime delle pipeline dichiarative di Lakeflow Spark, per caricare messaggi da Hub eventi di Azure. Per altre informazioni sul caricamento e l'elaborazione dei messaggi da Hub eventi di Azure, vedere Usare Hub eventi di Azure come origine dati della pipeline.
Caricare dati da sistemi esterni
Le pipeline dichiarative di Lakeflow Spark supportano il caricamento di dati da qualsiasi origine dati supportata da Azure Databricks. Vedere Connettersi a origini dati e servizi esterni. È possibile anche caricare dati esterni utilizzando Lakehouse Federation per le origini dati supportate . Poiché Lakehouse Federation richiede Databricks Runtime 13.3 LTS o versione successiva, per usare Lakehouse Federation la pipeline deve essere configurata per usare il canale di anteprima .
Alcune origini dati non hanno supporto equivalente in SQL. Se non è possibile usare Lakehouse Federation con una di queste origini dati, è possibile usare Python per inserire dati dall'origine. È possibile aggiungere file di origine Python e SQL alla stessa pipeline. L'esempio seguente dichiara una vista materializzata per accedere allo stato corrente dei dati in una tabella PostgreSQL remota:
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Caricare set di dati statici o di piccole dimensioni dall'archiviazione di oggetti cloud
È possibile caricare set di dati statici o di piccole dimensioni usando la sintassi di caricamento di Apache Spark. Le pipeline dichiarative di Lakeflow Spark supportano tutti i formati di file supportati da Apache Spark in Azure Databricks. Per un elenco completo, vedere le opzioni di formato dati .
Gli esempi seguenti illustrano il caricamento di JSON per creare una tabella:
Pitone
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Annotazioni
La funzione SQL read_files è comune a tutti gli ambienti SQL in Azure Databricks. È il modello consigliato per l'accesso diretto ai file usando SQL nelle pipeline. Per altre informazioni, vedere Opzioni .
Caricare dati da un'origine dati personalizzata Python
Le origini dati personalizzate python consentono di caricare i dati in formati personalizzati. È possibile scrivere codice da cui leggere e scrivere in un'origine dati esterna specifica o sfruttare il codice Python esistente nei sistemi esistenti per leggere i dati dai propri sistemi interni. Per altri dettagli sullo sviluppo di origini dati Python, vedere Origini dati personalizzate PySpark.
Per utilizzare un'origine dati personalizzata con Python per caricare i dati in una pipeline, registrala con un nome di formato, ad esempio my_custom_datasource, quindi leggervi da esso.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
Configurare una tabella di streaming per ignorare le modifiche in una tabella di streaming di origine
Annotazioni
- Il flag
skipChangeCommitsfunziona solo conspark.readStreamusando la funzioneoption(). Non è possibile usare questo flag in una funzionedp.read_stream(). - Non è possibile usare il
skipChangeCommitsflag quando la tabella di streaming di origine è definita come destinazione di una funzione create_auto_cdc_flow().
Per impostazione predefinita, le tabelle di streaming richiedono fonti a sola aggiunta. Quando una tabella di streaming usa un'altra tabella di streaming come origine e la tabella di streaming di origine richiede aggiornamenti o eliminazioni, ad esempio l'elaborazione del GDPR "diritto all'oblio", il flag skipChangeCommits può essere impostato durante la lettura della tabella di streaming di origine per ignorare tali modifiche. Per altre informazioni su questo flag, vedere Ignorare gli aggiornamenti e le eliminazioni.
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
accedere in modo sicuro alle credenziali di archiviazione con chiavi segrete in una pipeline
È possibile usare i segreti di Azure Databricks per archiviare credenziali come chiavi di accesso o password. Per configurare il segreto nella pipeline, utilizzare una proprietà Spark nella configurazione del cluster della pipeline. Vedere Configurare il calcolo classico per le pipeline.
L'esempio seguente usa un segreto per archiviare una chiave di accesso necessaria per leggere i dati di input da un account di archiviazione di Azure Data Lake Storage (ADLS) usando il caricatore automatico. È possibile usare questo stesso metodo per configurare qualsiasi segreto richiesto dalla pipeline, ad esempio le chiavi AWS per accedere a S3 o la password a un metastore Apache Hive.
Per ulteriori informazioni sull'uso di Azure Data Lake Storage, vedere Connettersi ad Azure Data Lake Storage e Blob Storage.
Annotazioni
È necessario aggiungere il prefisso spark.hadoop. alla chiave di configurazione spark_conf che imposta il valore del segreto.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Sostituire
-
<storage-account-name>con il nome dell'account di archiviazione ADLS. -
<scope-name>con il nome dello scope segreto di Azure Databricks. -
<secret-name>con il nome della chiave contenente la chiave di accesso dell'account di archiviazione di Azure.
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Sostituire
-
<container-name>con il nome del contenitore dell'account di archiviazione di Azure in cui sono archiviati i dati di input. -
<storage-account-name>con il nome dell'account di archiviazione ADLS. -
<path-to-input-dataset>con il percorso del set di dati di input.