Anmerkung
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen, dich anzumelden oder die Verzeichnisse zu wechseln.
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen , die Verzeichnisse zu wechseln.
Sie können Daten aus jeder beliebigen Datenquelle laden, die von Apache Spark auf Azure Databricks mithilfe von Pipelines unterstützt wird. Sie können Datasets (Tabellen und Ansichten) in Lakeflow Spark Declarative Pipelines für jede Abfrage definieren, die einen Spark DataFrame zurückgibt, einschließlich Streaming von DataFrames und Pandas für Spark DataFrames. Bei Datenaufnahmeaufgaben empfiehlt Databricks die Verwendung von Streamingtabellen für die meisten Anwendungsfälle. Streaming-Tabellen eignen sich gut zum Einlesen von Daten aus dem Cloud-Objektspeicher mit dem Auto Loader oder von Nachrichtenbussen wie Kafka.
Hinweis
- Nicht alle Datenquellen verfügen über SQL-Unterstützung für die Aufnahme. Sie können SQL- und Python-Quellen in Pipelines kombinieren, um Python zu verwenden, wo sie benötigt wird, und SQL für andere Vorgänge in derselben Pipeline.
- Ausführliche Informationen zum Arbeiten mit Bibliotheken, die nicht standardmäßig in Lakeflow Spark Declarative Pipelines verpackt sind, finden Sie unter Verwalten von Python-Abhängigkeiten für Pipelines.
- Allgemeine Informationen zu Ingestion in Azure Databricks finden Sie unter Standard-Connectors in Lakeflow Connect.
Die folgenden Beispiele veranschaulichen einige gängige Muster.
Laden aus einer vorhandenen Tabelle
Laden Sie Daten aus einer beliebigen vorhandenen Tabelle in Azure Databricks. Sie können die Daten mithilfe einer Abfrage transformieren oder die Tabelle zur weiteren Verarbeitung in Ihrer Pipeline laden.
Im folgenden Beispiel werden Daten aus einer vorhandenen Tabelle gelesen:
Python
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Laden von Dateien aus dem Cloudobjektspeicher
Databricks empfiehlt die Verwendung des automatischen Ladens in Pipelines für die meisten Datenaufnahmeaufgaben aus dem Cloudobjektspeicher oder von Dateien in einem Unity-Katalogvolume. Auto Loader und Pipelines sind so konzipiert, dass sie inkrementell und idempotent immer größer werdende Daten laden, sobald sie im Cloud-Speicher eingehen.
Siehe Auto Loader und Daten aus Objektspeicher laden.
Im folgenden Beispiel werden Daten aus dem Cloudspeicher mithilfe des automatischen Ladeprogramms gelesen:
Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
In den folgenden Beispielen wird das automatische Laden verwendet, um Datasets aus CSV-Dateien in einem Unity-Katalogvolume zu erstellen:
Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Hinweis
- Wenn Sie das automatische Ladeprogramm mit Dateibenachrichtigungen verwenden und eine vollständige Aktualisierung für Ihre Pipeline oder Streamingtabelle ausführen, müssen Sie Ihre Ressourcen manuell bereinigen. Sie können den CloudFilesResourceManager in einem Notizbuch verwenden, um die Bereinigung durchzuführen.
- Um Dateien mit Autoloader in einer Pipeline mit Unity Catalog-Aktivierung zu laden, müssen Sie externe Speicherorte verwenden. Weitere Informationen zur Verwendung des Unity-Katalogs mit Pipelines finden Sie unter Verwenden des Unity-Katalogs mit Pipelines.
Laden von Daten aus einem Nachrichtenbus
Sie können Pipelines so konfigurieren, dass Daten aus Nachrichtenbussen aufgenommen werden. Databricks empfiehlt die Verwendung von Streamingtabellen mit fortlaufender Ausführung und verbesserter automatischer Skalierung, um die effizienteste Erfassung für das latenzarme Laden von Nachrichtenbussen zu erzielen. Siehe Optimieren der Clusternutzung von Lakeflow Spark Declarative Pipelines mit automatischer Skalierung.
Der folgende Code konfiguriert z. B. eine Streamingtabelle zum Aufnehmen von Daten aus Kafka mithilfe der funktion read_kafka :
Python
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Informationen zum Erfassen aus anderen Nachrichtenbusquellen finden Sie unter:
- Kinesis: read_kinesis
- Thema veröffentlichen/abonnieren: read_pubsub
- Pulsar: Befehl read_pulsar
Laden von Daten aus Azure Event Hubs
Azure Event Hubs ist ein Datenstreamingdienst, der eine Apache Kafka-kompatible Schnittstelle bereitstellt. Sie können den Structured Streaming Kafka Connector verwenden, der in der Lakeflow Spark Declarative Pipelines Runtime enthalten ist, um Nachrichten aus Azure Event Hubs zu laden. Weitere Informationen zum Laden und Verarbeiten von Nachrichten aus Azure Event Hubs finden Sie unter Verwenden von Azure Event Hubs als Pipelinedatenquelle.
Laden von Daten aus externen Systemen
Lakeflow Spark Declarative Pipelines unterstützt das Laden von Daten aus jeder Datenquelle, die von Azure Databricks unterstützt wird. Siehe Herstellen einer Verbindung mit Datenquellen und externen Diensten. Sie können externe Daten auch mithilfe des Lakehouse-Verbunds für unterstützte Datenquellen laden. Da der Lakehouse-Verbund Databricks Runtime 13.3 LTS oder höher erfordert, muss Ihre Pipeline zur Verwendung des Lakehouse-Verbunds für die Verwendung des Vorschaukanals konfiguriert werden.
Einige Datenquellen haben keine gleichwertige Unterstützung in SQL. Wenn Sie lakehouse Federation nicht mit einer dieser Datenquellen verwenden können, können Sie Python zum Aufnehmen von Daten aus der Quelle verwenden. Sie können Python- und SQL-Quelldateien derselben Pipeline hinzufügen. Im folgenden Beispiel wird eine materialisierte Ansicht deklariert, um auf den aktuellen Zustand von Daten in einer Remote-PostgreSQL-Tabelle zuzugreifen:
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Laden kleiner oder statischer Datensätze aus Cloud-Objektspeicher
Sie können kleine oder statische Datasets mithilfe der Apache Spark-Ladesyntax laden. Lakeflow Spark Declarative Pipelines unterstützt alle Dateiformate, die von Apache Spark auf Azure Databricks unterstützt werden. Eine vollständige Liste finden Sie unter "Datenformatoptionen".
Die folgenden Beispiele veranschaulichen das Laden von JSON zum Erstellen einer Tabelle:
Python
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Hinweis
Die read_files SQL-Funktion ist für alle SQL-Umgebungen in Azure Databricks üblich. Es ist das empfohlene Muster für den direkten Dateizugriff mithilfe von SQL in Pipelines. Weitere Informationen finden Sie unter Optionen.
Laden von Daten aus einer benutzerdefinierten Python-Datenquelle
Mit benutzerdefinierten Python-Datenquellen können Sie Daten in benutzerdefinierten Formaten laden. Sie können Code schreiben, um aus einer bestimmten externen Datenquelle zu lesen und in eine bestimmte externe Datenquelle zu schreiben, oder vorhandenen Python-Code in Ihren vorhandenen Systemen nutzen, um Daten aus Ihren eigenen internen Systemen zu lesen. Weitere Details zum Entwickeln von Python-Datenquellen finden Sie unter PySpark benutzerdefinierte Datenquellen.
Um eine benutzerdefinierte Python-Datenquelle zum Laden von Daten in eine Pipeline zu verwenden, registrieren Sie sie mit einem Formatnamen, wie z. B. my_custom_datasource, und lesen Sie daraus:
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
Konfigurieren einer Streamingtabelle, sodass Änderungen in einer Quellstreamingtabelle ignoriert werden
Hinweis
- Das Flag
skipChangeCommitsfunktioniert nur, wennspark.readStreamdie Funktionoption()verwendet. Das Flag kann nicht in einerdp.read_stream()-Funktion verwendet werden. - Sie können das
skipChangeCommitsFlag nicht verwenden, wenn die Quellstreamingtabelle als Ziel einer create_auto_cdc_flow()- Funktion definiert ist.
Für Streamingtabellen sind standardmäßig reine Anfügequellen erforderlich. Wenn eine Streamingtabelle eine andere als Quelle verwendet und die Quellstreamingtabelle Updates oder Löschungen erfordert, etwa im Rahmen des Rechts auf Vergessenwerden gemäß DSGVO, kann beim Lesen der Quellstreamingtabelle das Flag skipChangeCommits gesetzt werden, um diese Änderungen zu ignorieren. Weitere Informationen zu diesem Flag finden Sie unter Ignorieren von Updates und Löschungen.
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Sicherer Zugriff auf Speicheranmeldeinformationen mit Geheimnissen in einer Pipeline
Sie können Azure Databricks-Schlüssel verwenden, um Anmeldeinformationen wie Zugriffstasten oder Kennwörter zu speichern. Verwenden Sie zum Konfigurieren des Geheimnisses in Ihrer Pipeline eine Spark-Eigenschaft in der Clusterkonfiguration der Pipelineeinstellungen. Siehe Konfigurieren der klassischen Berechnung für Pipelines.
Im folgenden Beispiel wird ein geheimer Schlüssel zum Speichern eines Zugriffsschlüssels verwendet, der zum Lesen von Eingabedaten aus einem AdLS-Speicherkonto (Azure Data Lake Storage) mithilfe des automatischen Ladens erforderlich ist. Sie können diese Methode verwenden, um jeden geheimen Schlüssel zu konfigurieren, der von Ihrer Pipeline benötigt wird, z. B. AWS-Schlüssel für den Zugriff auf S3 oder das Kennwort für einen Apache Hive-Metastore.
Weitere Informationen zum Arbeiten mit Azure Data Lake Storage finden Sie unter Herstellen einer Verbindung mit Azure Data Lake Storage und Blob Storage.
Hinweis
Sie müssen das spark.hadoop. Präfix dem spark_conf Konfigurationsschlüssel hinzufügen, der den geheimen Wert festlegt.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Replace
-
<storage-account-name>durch den Namen des ADLS-Speicherkontos. -
<scope-name>durch den Namen des Azure Databricks-Geheimnisbereichs. -
<secret-name>mit dem Namen des Schlüssels, der den Zugriffsschlüssel des Azure-Speicherkontos enthält.
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Replace
-
<container-name>mit dem Namen des Azure-Speicherkontocontainers, in dem die Eingabedaten gespeichert werden. -
<storage-account-name>durch den Namen des ADLS-Speicherkontos. -
<path-to-input-dataset>durch den Pfad zum Eingabedatensatz.