Freigeben über


Daten mit Lakeflow-Deklarativpipelines laden

Sie können Daten aus jeder beliebigen Datenquelle laden, die von Apache Spark in Azure Databricks unterstützt wird, indem Sie Lakeflow Declarative Pipelines verwenden. Sie können Datasets (Tabellen und Ansichten) in Lakeflow Declarative Pipelines für jede Abfrage definieren, die ein Spark DataFrame zurückgibt, einschließlich Streaming-DataFrames und Pandas für Spark DataFrames. Bei Datenaufnahmeaufgaben empfiehlt Databricks die Verwendung von Streamingtabellen für die meisten Anwendungsfälle. Streaming-Tabellen eignen sich gut zum Einlesen von Daten aus dem Cloud-Objektspeicher mit dem Auto Loader oder von Nachrichtenbussen wie Kafka.

Hinweis

  • Nicht alle Datenquellen verfügen über SQL-Unterstützung. Sie können SQL- und Python-Notizbücher in Lakeflow Declarative Pipelines kombinieren, um SQL für alle Vorgänge zu verwenden, die über die Aufnahme hinausgehen.
  • Ausführliche Informationen zum Arbeiten mit Bibliotheken, die nicht standardmäßig in Lakeflow Declarative Pipelines verpackt sind, finden Sie unter "Verwalten von Python-Abhängigkeiten für Lakeflow Declarative Pipelines".
  • Allgemeine Informationen zu Ingestion in Azure Databricks finden Sie unter Standard-Connectors in Lakeflow Connect.

Die folgenden Beispiele veranschaulichen einige gängige Muster.

Laden aus einer vorhandenen Tabelle

Laden Sie Daten aus einer beliebigen vorhandenen Tabelle in Azure Databricks. Sie können die Daten mithilfe einer Abfrage transformieren oder die Tabelle zur weiteren Verarbeitung in Ihrer Pipeline laden.

Im folgenden Beispiel werden Daten aus einer vorhandenen Tabelle gelesen:

Python

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

Laden von Dateien aus dem Cloudobjektspeicher

Databricks empfiehlt die Verwendung von Auto Loader mit Lakeflow Declarative Pipelines für die meisten Datenaufnahmeaufgaben aus dem Cloudobjektspeicher oder aus Dateien in einem Unity-Katalogvolume. Auto Loader und Lakeflow Declarative Pipelines sind so konzipiert, dass sie fortlaufend und idempotent ständig wachsende Daten laden, sobald diese im Cloudspeicher eintreffen.

Siehe Auto Loader und Daten aus Objektspeicher laden.

Im folgenden Beispiel werden Daten aus dem Cloudspeicher mithilfe des automatischen Ladeprogramms gelesen:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

In den folgenden Beispielen wird das automatische Laden verwendet, um Datasets aus CSV-Dateien in einem Unity-Katalogvolume zu erstellen:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

Hinweis

  • Wenn Sie das automatische Ladeprogramm mit Dateibenachrichtigungen verwenden und eine vollständige Aktualisierung für Ihre Pipeline oder Streamingtabelle ausführen, müssen Sie Ihre Ressourcen manuell bereinigen. Sie können den CloudFilesResourceManager in einem Notizbuch verwenden, um die Bereinigung durchzuführen.
  • Um Dateien mit Autoloader in einer Pipeline mit Unity Catalog-Aktivierung zu laden, müssen Sie externe Speicherorte verwenden. Weitere Informationen zur Verwendung des Unity-Katalogs mit Lakeflow Declarative Pipelines finden Sie unter „Verwenden Sie Unity-Katalog mit Ihren Lakeflow-Deklarativen Pipelines“.

Laden von Daten aus einem Nachrichtenbus

Sie können Lakeflow Declarative Pipelines so konfigurieren, dass Daten aus Nachrichtenbussen aufgenommen werden. Databricks empfiehlt die Verwendung von Streamingtabellen mit fortlaufender Ausführung und verbesserter automatischer Skalierung, um die effizienteste Erfassung für das latenzarme Laden von Nachrichtenbussen zu erzielen. Weitere Informationen finden Sie unter Optimieren der Clusternutzung von deklarativen Lakeflow-Pipelines mit automatischer Skalierung.

Der folgende Code konfiguriert z. B. eine Streamingtabelle zum Aufnehmen von Daten aus Kafka mithilfe der funktion read_kafka :

Python

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

Informationen zum Erfassen aus anderen Nachrichtenbusquellen finden Sie unter:

Laden von Daten aus Azure Event Hubs

Azure Event Hubs ist ein Datenstreamingdienst, der eine Apache Kafka-kompatible Schnittstelle bereitstellt. Sie können den Structured Streaming Kafka Connector verwenden, der in der Lakeflow Declarative Pipelines Runtime enthalten ist, um Nachrichten aus Azure Event Hubs zu laden. Weitere Informationen zum Laden und Verarbeiten von Nachrichten aus Azure Event Hubs finden Sie unter Verwenden von Azure Event Hubs als Lakeflow Declarative Pipelines-Datenquelle.

Laden von Daten aus externen Systemen

Lakeflow Declarative Pipelines unterstützt das Laden von Daten aus jeder Datenquelle, die von Azure Databricks unterstützt wird. Siehe Herstellen einer Verbindung mit Datenquellen und externen Diensten. Sie können externe Daten auch mithilfe des Lakehouse-Verbunds für unterstützte Datenquellen laden. Da der Lakehouse-Verbund Databricks Runtime 13.3 LTS oder höher erfordert, muss Ihre Pipeline zur Verwendung des Lakehouse-Verbunds für die Verwendung des Vorschaukanals konfiguriert werden.

Einige Datenquellen haben keine gleichwertige Unterstützung in SQL. Wenn Sie lakehouse Federation nicht mit einer dieser Datenquellen verwenden können, können Sie ein Python-Notizbuch verwenden, um Daten aus der Quelle aufzunehmen. Sie können Python- und SQL-Quellcode zur gleichen Pipeline hinzufügen. Im folgenden Beispiel wird eine materialisierte Ansicht deklariert, um auf den aktuellen Zustand von Daten in einer Remote-PostgreSQL-Tabelle zuzugreifen:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Laden kleiner oder statischer Datensätze aus Cloud-Objektspeicher

Sie können kleine oder statische Datasets mithilfe der Apache Spark-Ladesyntax laden. Lakeflow Declarative Pipelines unterstützt alle Dateiformate, die von Apache Spark auf Azure Databricks unterstützt werden. Eine vollständige Liste finden Sie unter "Datenformatoptionen".

Die folgenden Beispiele veranschaulichen das Laden von JSON zum Erstellen von Lakeflow Declarative Pipelines-Tabellen:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

Hinweis

Die read_files SQL-Funktion ist für alle SQL-Umgebungen in Azure Databricks üblich. Es ist das empfohlene Muster für den direkten Dateizugriff mithilfe von SQL mit Lakeflow Declarative Pipelines. Weitere Informationen finden Sie unter Optionen.

Konfigurieren einer Streamingtabelle, sodass Änderungen in einer Quellstreamingtabelle ignoriert werden

Hinweis

  • Das Flag skipChangeCommits funktioniert nur, wenn spark.readStream die Funktion option() verwendet. Das Flag kann nicht in einer dlt.read_stream()-Funktion verwendet werden.
  • Sie können das skipChangeCommits Flag nicht verwenden, wenn die Quellstreamingtabelle als Ziel einer create_auto_cdc_flow()- Funktion definiert ist.

Für Streamingtabellen sind standardmäßig reine Anfügequellen erforderlich. Wenn eine Streamingtabelle eine andere als Quelle verwendet und die Quellstreamingtabelle Updates oder Löschungen erfordert, etwa im Rahmen des Rechts auf Vergessenwerden gemäß DSGVO, kann beim Lesen der Quellstreamingtabelle das Flag skipChangeCommits gesetzt werden, um diese Änderungen zu ignorieren. Weitere Informationen zu diesem Flag finden Sie unter Ignorieren von Updates und Löschungen.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Sicherer Zugriff auf Speicheranmeldeinformationen mit Geheimnissen in einer Pipeline

Sie können Azure Databricks-Schlüssel verwenden, um Anmeldeinformationen wie Zugriffstasten oder Kennwörter zu speichern. Verwenden Sie zum Konfigurieren des Geheimnisses in Ihrer Pipeline eine Spark-Eigenschaft in der Clusterkonfiguration der Pipelineeinstellungen. Siehe auch Konfigurieren der Rechenleistung für deklarative Lakeflow-Pipelines.

Im folgenden Beispiel wird ein geheimer Schlüssel zum Speichern eines Zugriffsschlüssels verwendet, der zum Lesen von Eingabedaten aus einem AdLS-Speicherkonto (Azure Data Lake Storage) mithilfe des automatischen Ladens erforderlich ist. Sie können diese Methode verwenden, um jeden geheimen Schlüssel zu konfigurieren, der von Ihrer Pipeline benötigt wird, z. B. AWS-Schlüssel für den Zugriff auf S3 oder das Kennwort für einen Apache Hive-Metastore.

Weitere Informationen zum Arbeiten mit Azure Data Lake Storage finden Sie unter Herstellen einer Verbindung mit Azure Data Lake Storage und Blob Storage.

Hinweis

Sie müssen das spark.hadoop. Präfix dem spark_conf Konfigurationsschlüssel hinzufügen, der den geheimen Wert festlegt.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

Ersetzen:

  • <storage-account-name> durch den Namen des ADLS-Speicherkontos.
  • <scope-name> durch den Namen des Azure Databricks-Geheimnisbereichs.
  • <secret-name> mit dem Namen des Schlüssels, der den Zugriffsschlüssel des Azure-Speicherkontos enthält.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Ersetzen:

  • <container-name> mit dem Namen des Azure-Speicherkontocontainers, in dem die Eingabedaten gespeichert werden.
  • <storage-account-name> durch den Namen des ADLS-Speicherkontos.
  • <path-to-input-dataset> durch den Pfad zum Eingabedatensatz.