Ładowanie danych w potokach

Dane można ładować z dowolnego źródła danych obsługiwanego przez platformę Apache Spark w usłudze Azure Databricks przy użyciu potoków. Można definiować zestawy danych — tabele i widoki — w deklaratywnych potokach Lakeflow Spark przeciwko dowolnemu zapytaniu, które zwraca Spark DataFrame, w tym przesyłającemu strumieniowo DataFrames oraz Pandas dla Spark DataFrames. Dla procesów integracyjnych danych usługa Databricks zaleca używanie tabel przesyłania strumieniowego w większości przypadków użycia. Tabele przepływu strumieniowego są przydatne do pozyskiwania danych z magazynu obiektów w chmurze przy użyciu Auto Loader lub z kolejek wiadomości, takich jak Kafka.

Nie wszystkie źródła danych mają wsparcie SQL do przetwarzania danych wejściowych. Można jednak mieszać źródła SQL i Python w tym samym potoku, aby używać Python tam, gdzie jest to konieczne. Aby uzyskać szczegółowe informacje na temat pracy z bibliotekami, które nie są pakowane w potokach deklaratywnych platformy Spark w usłudze Lakeflow, zobacz Zarządzanie zależnościami języka Python dla potoków. Aby uzyskać ogólne informacje na temat pozyskiwania danych w usłudze Azure Databricks, zobacz Łączniki standardowe w programie Lakeflow Connect.

W poniższych przykładach przedstawiono niektóre typowe wzorce ładowania danych.

Ładowanie z istniejącej tabeli

Załaduj dane z dowolnej istniejącej tabeli w usłudze Azure Databricks. Dane można przekształcić przy użyciu zapytania lub załadować tabelę dla dalszego przetwarzania w ramach kanału przetwarzania.

Python

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

ładowanie plików z magazynu obiektów w chmurze

Usługa Databricks zaleca używanie Auto Loader w potokach w przypadku większości zadań pozyskiwania danych z magazynu obiektów w chmurze lub z plików w woluminie Unity Catalog. Automatyczny ładowacz i potoki są przeznaczone do przyrostowego i idempotentnego ładowania stale rosnących danych w miarę ich napływu do magazynu w chmurze. Zobacz Co to jest moduł automatycznego ładowania? i Załaduj dane z magazynu obiektów.

Poniższy przykład odczytuje dane z magazynu w chmurze przy użyciu automatycznego modułu ładującego.

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

W poniższych przykładach użyto Auto Loader do tworzenia zestawów danych z plików CSV w Unity Catalog.

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

Uwaga / Notatka

Zaloguj się do chmury

Auto Loader używa lokalizacji zewnętrznych Unity Catalog do uwierzytelniania w zasobach magazynu w chmurze. Musisz skonfigurować lokalizację zewnętrzną dla ścieżki magazynu, z której chcesz odczytać, i przyznać READ FILES użytkownikowi uprawnienia.

Aby pobierać dane z Azure Data Lake Storage, skonfiguruj lokalizację zewnętrzną wspieraną przez poświadczenia dostępu do magazynu, które odwołują się do kontenera danych. Aby uzyskać więcej informacji, zobacz Nawiązywanie połączenia z magazynem obiektów w chmurze z użyciem Unity Catalog.

Ładowanie danych z magistrali komunikatów

Pipeline'y można skonfigurować do pozyskiwania danych z magistrali komunikatów. Usługa Databricks zaleca używanie tabel strumieniowych z ciągłym wykonywaniem i ulepszonym skalowaniem automatycznym w celu zapewnienia najbardziej wydajnego pozyskiwania na potrzeby ładowania niskich opóźnień z szyn komunikatów. Aby uzyskać więcej informacji, zobacz artykuł Optymalizowanie wykorzystania klastra Potoków Deklaratywnych Spark w usłudze Lakeflow dzięki automatycznemu skalowaniu.

Na przykład poniższy kod konfiguruje tabelę przesyłania strumieniowego w celu pozyskiwania danych z Kafki za pomocą funkcji read_kafka.

Python

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

Aby pobierać dane z innych źródeł magistrali komunikatów, zobacz:

Ładowanie danych z Azure Event Hubs

Azure Event Hubs to usługa przesyłania strumieniowego danych, która zapewnia interfejs zgodny z platformą Apache Kafka. Aby załadować komunikaty z usługi Azure Event Hubs, możesz użyć łącznika Structured Streaming dla Kafki, zawartego w środowisku uruchomieniowym Lakeflow Spark Declarative Pipelines. Aby dowiedzieć się więcej na temat ładowania i przetwarzania komunikatów z usługi Azure Event Hubs, zobacz Używanie usługi Azure Event Hubs jako źródła danych potoku.

Ładowanie danych z systemów zewnętrznych

Potoki deklaratywne Lakeflow Spark obsługują ładowanie danych z dowolnego źródła danych obsługiwanego przez Azure Databricks. Zobacz Łączenie ze źródłami danych i usługami zewnętrznymi. Możesz również załadować dane zewnętrzne za pomocą usługi Lakehouse Federation dla obsługiwanych źródeł danych . Ponieważ środowisko Lakehouse Federation wymaga Databricks Runtime w wersji 13.3 LTS lub nowszej, aby z niego korzystać, skonfiguruj potok do używania kanału w wersji zapoznawczej.

Niektóre źródła danych nie mają równoważnej obsługi języka SQL. Jeśli nie możesz użyć Lakehouse Federation z jednym z tych źródeł danych, możesz użyć języka Python do przetwarzania danych ze źródła. Pliki źródłowe języka Python i SQL można dodawać do tego samego potoku. Poniższy przykład deklaruje zmaterializowany widok, aby uzyskać dostęp do bieżącego stanu danych w zdalnej tabeli PostgreSQL.

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Ładowanie małych lub statycznych zestawów danych z magazynu obiektów w chmurze

Małe lub statyczne zestawy danych można załadować przy użyciu składni ładowania platformy Apache Spark. Potoki deklaratywne platformy Spark lakeflow obsługują wszystkie formaty plików obsługiwane przez platformę Apache Spark w usłudze Azure Databricks. Aby uzyskać pełną listę, zobacz Opcje formatowania danych.

W poniższych przykładach pokazano ładowanie kodu JSON w celu utworzenia tabeli.

Python

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

Uwaga / Notatka

Funkcja read_files SQL jest wspólna dla wszystkich środowisk SQL w usłudze Azure Databricks. Jest to zalecany wzorzec bezpośredniego dostępu do plików przy użyciu języka SQL w potokach. Aby uzyskać więcej informacji, zobacz opcje .

Ładowanie danych z niestandardowego źródła danych języka Python

Niestandardowe źródła danych języka Python umożliwiają ładowanie danych w formatach niestandardowych. Możesz napisać kod do odczytu i zapisu w określonym zewnętrznym źródle danych lub użyć istniejącego kodu Python do odczytywania danych z własnych systemów wewnętrznych. Aby uzyskać więcej informacji na temat tworzenia źródeł danych języka Python, zobacz Niestandardowe źródła danych PySpark.

Poniższy przykład rejestruje niestandardowe źródło danych z nazwą formatu my_custom_datasource i odczytuje z niego w trybie wsadowym i strumieniowym.

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

Skonfiguruj tabelę streamingową, aby ignorować zmiany w źródłowej tabeli streamingowej

Domyślnie tabele przesyłania strumieniowego wymagają źródeł przeznaczonych wyłącznie do dołączania. Jeśli źródłowa tabela strumieniowa wymaga aktualizacji lub usunięcia, np. w przypadku przetwarzania zgodnie z RODO, aby zrealizować prawo do bycia zapomnianym, użyj flagi skipChangeCommits, aby zignorować te zmiany. Ta flaga działa tylko z funkcją spark.readStreamoption() i nie może być używana, gdy tabela strumieniowania źródłowego jest celem funkcji create_auto_cdc_flow(). Aby uzyskać więcej informacji, zobacz Handle changes to source Delta tables (Obsługa zmian w źródłowych tabelach delty).

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Bezpiecznie uzyskaj dostęp do poświadczeń magazynu za pomocą tajnych wartości w potoku danych

Możesz użyć Azure Databricks do przechowywania tajnych danych, takich jak klucze dostępu lub hasła. Aby skonfigurować tajne dane w potoku, użyj właściwości Spark w konfiguracji klastra w ustawieniach potoku. Zobacz Konfigurowanie klasycznych obliczeń dla potoków.

W poniższym przykładzie użyto tajnego kodu do przechowywania klucza dostępu wymaganego do odczytu danych wejściowych z konta magazynu Azure Data Lake Storage używając Auto Loader. Możesz użyć tej samej metody do skonfigurowania dowolnego tajnego elementu wymaganego przez potok, na przykład kluczy AWS do uzyskania dostępu do S3 lub hasła do magazynu metadanych Apache Hive.

Aby dowiedzieć się więcej na temat pracy z usługą Azure Data Lake Storage, zobacz Connect to Azure Data Lake Storage and Blob Storage.

Uwaga / Notatka

Należy dodać prefiks spark.hadoop. do klucza konfiguracji spark_conf, który ustawia tajną wartość.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

W tym przykładzie kodu zastąp następujące wartości.

Placeholder Zamień na
<container-name> Nazwa kontenera konta magazynu Azure.
<storage-account-name> Nazwa konta magazynu usługi ADLS.
<path> Ścieżka dla danych wyjściowych i metadanych z potoku danych.
<scope-name> Nazwa zakresu tajnych danych Azure Databricks.
<secret-name> Nazwa klucza zawierającego klucz dostępu do konta magazynu Azure.
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

W tym przykładzie kodu zastąp następujące wartości.

Placeholder Zamień na
<container-name> Nazwa kontenera konta magazynu Azure, który przechowuje dane wejściowe.
<storage-account-name> Nazwa konta magazynu Azure Data Lake Storage (ADLS).
<path-to-input-dataset> Ścieżka do wejściowego zestawu danych.