Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Sie können Daten aus jeder beliebigen Datenquelle laden, die von Apache Spark auf Azure Databricks mithilfe von Pipelines unterstützt wird. Sie können Datasets – Tabellen und Ansichten – in Lakeflow Spark Declarative Pipelines für jede Abfrage definieren, die einen Spark DataFrame zurückgibt, einschließlich Streaming von DataFrames und Pandas für Spark DataFrames. Bei Datenaufnahmeaufgaben empfiehlt Databricks die Verwendung von Streamingtabellen für die meisten Anwendungsfälle. Streamingtabellen eignen sich zum Erfassen von Daten aus dem Cloud-Objektspeicher mithilfe von Auto Loader oder von Nachrichtenbussen wie Kafka.
Nicht alle Datenquellen verfügen über SQL-Unterstützung für die Aufnahme. Sie können jedoch SQL- und Python-Quellen in derselben Pipeline kombinieren, um bei Bedarf Python zu verwenden. Ausführliche Informationen zum Arbeiten mit Bibliotheken, die nicht standardmäßig in Lakeflow Spark Declarative Pipelines verpackt sind, finden Sie unter Verwalten von Python-Abhängigkeiten für Pipelines. Allgemeine Informationen zu Ingestion in Azure Databricks finden Sie unter Standard-Connectors in Lakeflow Connect.
Die folgenden Beispiele veranschaulichen einige allgemeine Datenlademuster.
Laden aus einer vorhandenen Tabelle
Laden Sie Daten aus einer beliebigen vorhandenen Tabelle in Azure Databricks. Sie können die Daten mithilfe einer Abfrage transformieren oder die Tabelle zur weiteren Verarbeitung in Ihrer Pipeline laden.
Python
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Laden von Dateien aus dem Cloudobjektspeicher
Databricks empfiehlt die Verwendung des automatischen Ladens in Pipelines für die meisten Datenaufnahmeaufgaben aus dem Cloudobjektspeicher oder von Dateien in einem Unity-Katalogvolume. Auto Loader und Pipelines sind so konzipiert, dass sie inkrementell und idempotent immer größer werdende Daten laden, sobald sie im Cloud-Speicher eingehen. Siehe Auto Loader und Daten aus Objektspeicher laden.
Im folgenden Beispiel werden Daten aus dem Cloudspeicher mithilfe des automatischen Ladeprogramms gelesen.
Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
In den folgenden Beispielen wird das automatische Laden verwendet, um Datasets aus CSV-Dateien in einem Unity-Katalogvolume zu erstellen.
Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Hinweis
- Wenn Sie das automatische Ladeprogramm mit Dateibenachrichtigungen verwenden und eine vollständige Aktualisierung für Ihre Pipeline oder Streamingtabelle ausführen, müssen Sie Ihre Ressourcen manuell bereinigen. Sie können den CloudFilesResourceManager in einem Notizbuch verwenden, um die Bereinigung durchzuführen.
- Um Dateien mit Autoloader in einer Pipeline mit Unity Catalog-Aktivierung zu laden, müssen Sie externe Speicherorte verwenden. Weitere Informationen zur Verwendung des Unity-Katalogs mit Pipelines finden Sie unter Verwenden des Unity-Katalogs mit Pipelines.
Authentifizieren bei Cloudspeicher
Auto Loader verwendet externe Speicherorte des Unity Catalog, um die Authentifizierung bei Cloudspeicher durchzuführen. Sie müssen einen externen Speicherort für den Speicherpfad konfigurieren, aus dem Sie lesen möchten, und dem ausführenden Benutzer die READ FILES Berechtigung erteilen.
Um Daten aus Azure Data Lake Storage abzurufen, konfigurieren Sie einen externen Speicherort, der mittels Speicheranmeldeinformationen unterstützt wird und auf einen Speicher-Container verweist. Weitere Informationen finden Sie unter Herstellen einer Verbindung mit dem Cloudobjektspeicher mithilfe des Unity-Katalogs.
Laden von Daten aus einem Nachrichtenbus
Sie können Pipelines so konfigurieren, dass Daten aus Nachrichtenbussen aufgenommen werden. Databricks empfiehlt die Verwendung von Streamingtabellen mit fortlaufender Ausführung und verbesserter automatischer Skalierung, um die effizienteste Erfassung für das latenzarme Laden von Nachrichtenbussen zu erzielen. Weitere Informationen finden Sie unter Optimieren der Clusternutzung von Lakeflow Spark Declarative Pipelines mit automatischer Skalierung.
Der folgende Code konfiguriert beispielsweise eine Streamingtabelle zum Aufnehmen von Daten aus Kafka mithilfe der read_kafka-Funktion .
Python
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Informationen zum Erfassen aus anderen Nachrichtenbusquellen finden Sie unter:
- Kinesis: read_kinesis
- Thema veröffentlichen/abonnieren: read_pubsub
- Pulsar: Befehl read_pulsar
Daten von Azure Event Hubs laden
Azure Event Hubs ist ein Datenstreamingdienst, der eine Apache Kafka-kompatible Schnittstelle bereitstellt. Sie können den Structured Streaming Kafka Connector verwenden, der in der Lakeflow Spark Declarative Pipelines Runtime enthalten ist, um Nachrichten aus Azure Event Hubs zu laden. Weitere Informationen zum Laden und Verarbeiten von Nachrichten aus Azure Event Hubs finden Sie unter Verwenden von Azure Event Hubs als Pipelinedatenquelle.
Laden von Daten aus externen Systemen
Lakeflow Spark Declarative Pipelines unterstützt das Laden von Daten aus jeder Datenquelle, die von Azure Databricks unterstützt wird. Siehe Herstellen einer Verbindung mit Datenquellen und externen Diensten. Sie können externe Daten auch mithilfe des Lakehouse-Verbunds für unterstützte Datenquellen laden. Da die Lakehouse Federation Databricks Runtime 13.3 LTS oder höher erfordert, konfigurieren Sie Ihre Pipeline, um den Vorschaukanal zu verwenden.
Einige Datenquellen verfügen nicht über entsprechende SQL-Unterstützung. Wenn Sie lakehouse Federation nicht mit einer dieser Datenquellen verwenden können, können Sie Python zum Aufnehmen von Daten aus der Quelle verwenden. Sie können Python- und SQL-Quelldateien derselben Pipeline hinzufügen. Im folgenden Beispiel wird eine materialisierte Ansicht deklariert, um auf den aktuellen Datenstatus in einer Remote-PostgreSQL-Tabelle zuzugreifen.
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Laden kleiner oder statischer Datensätze aus dem Cloud-Objekt-Speicher
Sie können kleine oder statische Datasets mithilfe der Apache Spark-Ladesyntax laden. Lakeflow Spark Declarative Pipelines unterstützt alle Dateiformate, die von Apache Spark auf Azure Databricks unterstützt werden. Eine vollständige Liste finden Sie unter "Datenformatoptionen".
Die folgenden Beispiele veranschaulichen das Laden von JSON zum Erstellen einer Tabelle.
Python
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Hinweis
Die read_files SQL-Funktion ist für alle SQL-Umgebungen in Azure Databricks üblich. Es ist das empfohlene Muster für den direkten Dateizugriff mithilfe von SQL in Pipelines. Weitere Informationen finden Sie unter Optionen.
Laden von Daten aus einer benutzerdefinierten Python-Datenquelle
Mit benutzerdefinierten Python-Datenquellen können Sie Daten in benutzerdefinierten Formaten laden. Sie können Code schreiben, um aus einer bestimmten externen Datenquelle zu lesen und in eine bestimmte externe Datenquelle zu schreiben, oder Ihren vorhandenen Python Code verwenden, um Daten aus Ihren eigenen internen Systemen zu lesen. Weitere Details zum Entwickeln von Python-Datenquellen finden Sie unter PySpark benutzerdefinierte Datenquellen.
Im folgenden Beispiel wird eine benutzerdefinierte Datenquelle mit dem Formatnamen my_custom_datasource registriert und sowohl im Batch- als auch im Streamingmodus daraus gelesen.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
Konfigurieren einer Streamingtabelle, sodass Änderungen in einer Quellstreamingtabelle ignoriert werden
Für Streamingtabellen sind standardmäßig reine Anfügequellen erforderlich. Wenn in Ihrer Quellstreamingtabelle Aktualisierungen oder Löschvorgänge anfallen, beispielsweise für die DSGVO-Verarbeitung des Rechts auf Vergessenwerden, verwenden Sie das skipChangeCommits-Flag, um diese Änderungen zu ignorieren. Dieses Flag funktioniert nur beim Einsatz von spark.readStream mit der option() Funktion und kann nicht verwendet werden, wenn die Quellstreamingtabelle das Ziel einer create_auto_cdc_flow() Funktion ist. Weitere Informationen finden Sie unter Behandeln von Änderungen an Delta-Quelltabellen.
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Sicherer Zugriff auf Speicheranmeldeinformationen mit Geheimnissen in einer Pipeline
Sie können Azure Databricks-Schlüssel verwenden, um Anmeldeinformationen wie Zugriffstasten oder Kennwörter zu speichern. Verwenden Sie zum Konfigurieren des Geheimnisses in Ihrer Pipeline eine Spark-Eigenschaft in der Clusterkonfiguration der Pipelineeinstellungen. Siehe Konfigurieren der klassischen Berechnung für Pipelines.
Im folgenden Beispiel wird ein Secret verwendet, um einen Zugriffsschlüssel zu speichern, der zum Lesen von Eingabedaten aus einem Azure Data Lake Storage Speicher-Account mithilfe des Auto Loaders erforderlich ist. Sie können diese Methode verwenden, um jeden geheimen Schlüssel zu konfigurieren, der von Ihrer Pipeline benötigt wird, z. B. AWS-Schlüssel für den Zugriff auf S3 oder das Kennwort für einen Apache Hive-Metastore.
Weitere Informationen zum Arbeiten mit Azure Data Lake Storage finden Sie unter Herstellen einer Verbindung mit Azure Data Lake Storage und Blob Storage.
Hinweis
Sie müssen das spark.hadoop. Präfix dem spark_conf Konfigurationsschlüssel hinzufügen, der den geheimen Wert festlegt.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Ersetzen Sie in diesem Codebeispiel die folgenden Werte.
| Placeholder | Ersetzen mit |
|---|---|
<container-name> |
Der Name des Azure Speicherkontocontainers. |
<storage-account-name> |
Der Name des ADLS-Speicherkontos. |
<path> |
Der Pfad für Pipelineausgabedaten und -metadaten. |
<scope-name> |
Der Name des geheimen Azure Databricks-Bereichs. |
<secret-name> |
Der Name des Schlüssels, der den Zugriffsschlüssel des Azure Speicherkontos enthält. |
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Ersetzen Sie in diesem Codebeispiel die folgenden Werte.
| Placeholder | Ersetzen mit |
|---|---|
<container-name> |
Der Name des Azure Speicherkontocontainers, in dem die Eingabedaten gespeichert werden. |
<storage-account-name> |
Der Name des ADLS-Speicherkontos. |
<path-to-input-dataset> |
Der Pfad zum Eingabedatensatz. |