Načtení dat v kanálech

Data z libovolného zdroje dat podporovaného Apache Sparkem v Azure Databricks můžete načíst pomocí kanálů. Datové sady – tabulky a zobrazení – můžete definovat v deklarativní pipeline Lakeflow Spark u jakéhokoli dotazu, který vrací datový rámec Spark, včetně streamovaných datových rámců a Pandas pro datové rámce Spark. Pro úlohy příjmu dat doporučuje Databricks používat streamované tabulky pro většinu případů použití. Streamovací tabulky jsou užitečné pro příjem dat z cloudového úložiště objektů pomocí Auto Loaderu nebo ze sběrnic zpráv, jako je Kafka.

Ne všechny zdroje dat mají podporu SQL pro příjem dat. Zdroje SQL a Python ale můžete kombinovat ve stejném kanálu, abyste mohli Python použít tam, kde je to potřeba. Podrobnosti o práci s knihovnami, které nejsou zabalené v deklarativních kanálech Sparku Lakeflow, najdete v tématu Správa závislostí Pythonu pro kanály. Obecné informace o příjmu dat v Azure Databricks najdete v tématu Standardní konektory v Lakeflow Connect.

Následující příklady ukazují některé běžné vzory načítání dat.

Načtení z existující tabulky

Načtěte data z jakékoli existující tabulky v Azure Databricks. Tabulku můžete načíst pro další zpracování v datovém kanálu nebo data transformovat pomocí dotazu.

Python

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

Načtení souborů z cloudového úložiště objektů

Databricks doporučuje používat Auto Loader v datových tocích pro většinu zátěží příjmu dat z cloudového objektového úložiště nebo ze souborů ve svazku Unity Catalog. Automatické nástroje pro nahrávání a datové kanály jsou navrženy tak, aby postupně a idempotentně načítaly stále rostoucí množství dat, jakmile dorazí do cloudového úložiště. Podívejte se na Co je automatický zavaděč? a Načtěte data z úložiště objektů.

Následující příklad čte data z cloudového úložiště pomocí Auto Loader.

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

Následující příklady používají Auto Loader k vytvoření datových sad ze souborů CSV na úložišti Unity Catalog.

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

Poznámka:

  • Pokud používáte Auto Loader s oznámeními o souborech a spustíte úplnou aktualizaci pipeline nebo streamovací tabulky, musíte prostředky ručně vyčistit. K vyčištění můžete použít CloudFilesResourceManager v poznámkovém bloku.
  • Pokud chcete načíst soubory s funkcí Auto Loader v kanálu s povoleným katalogem Unity, musíte použít externí umístění. Další informace o používání katalogu Unity s kanály najdete v tématu Použití katalogu Unity s kanály.

Ověřování v cloudovém úložišti

Auto Loader používá externí umístění katalogu Unity ke autentizaci pomocí cloudového úložiště. Musíte nakonfigurovat externí umístění pro cestu k úložišti, ze které chcete číst, a udělit READ FILES oprávnění uživateli, který úlohu spouští.

Pokud chcete integrovat z Azure Data Lake Storage, nakonfigurujte externí lokalitu, která je podložena přihlašovacími údaji úložiště a odkazuje na kontejner úložiště. Další informace najdete v tématu Připojení ke cloudovému úložišti objektů pomocí katalogu Unity.

Načtení dat ze sběrnice zpráv

Kanály můžete nakonfigurovat tak, aby ingestovali data z sběrnic zpráv. Databricks doporučuje používat tabulky streamování s průběžným spouštěním a vylepšeným automatickým škálováním, aby se zajistilo co nejefektivnější příjem dat při načítání z sběrnic zpráv s nízkou latencí. Další informace najdete v tématu Optimalizace využití clusteru deklarativních kanálů Sparku Lakeflow pomocí automatického škálování.

Následující kód například nakonfiguruje streamovací tabulku pro příjem dat ze systému Kafka pomocí funkce read_kafka .

Python

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

Chcete-li přijímat data z jiných zdrojů sběrnice zpráv, přečtěte si:

Načíst data z Azure Event Hubs

Azure Event Hubs je služba streamování dat, která poskytuje rozhraní kompatibilní s Apache Kafka. K načtení zpráv ze služby Azure Event Hubs můžete použít konektor Kafka strukturovaného streamování, který je součástí modulu runtime deklarativních kanálů Sparku Lakeflow. Další informace o načítání a zpracování zpráv ze služby Azure Event Hubs najdete v tématu Použití služby Azure Event Hubs jako zdroje dat kanálu.

Načtení dat z externích systémů

Deklarativní kanály Sparku Lakeflow podporují načítání dat z libovolného zdroje dat podporovaného službou Azure Databricks. Viz Připojení ke zdrojům dat a externím službám. Externí data můžete načíst také s využitím federace Lakehouse pro podporované zdroje dat. Vzhledem k tomu, že federace Lakehouse vyžaduje Databricks Runtime 13.3 LTS nebo vyšší, pro použití Lakehouse Federation nakonfigurujte svou datovou pipeline, aby využívala kanál preview.

Některé zdroje dat nemají ekvivalentní podporu SQL. Pokud se službou Lakehouse Federation nemůžete použít některý z těchto zdrojů dat, můžete pomocí Pythonu ingestovat data ze zdroje. Zdrojové soubory Pythonu a SQL můžete přidat do stejného kanálu. Následující příklad deklaruje materializované zobrazení pro přístup k aktuálnímu stavu dat ve vzdálené tabulce PostgreSQL.

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Načtení malých nebo statických datových sad z cloudového úložiště objektů

Malé nebo statické datové sady můžete načíst pomocí syntaxe načtení Apache Sparku. Deklarativní kanály Sparku Lakeflow podporují všechny formáty souborů podporované Apache Sparkem v Azure Databricks. Úplný seznam najdete v tématu Možnosti formátu dat.

Následující příklady ukazují načtení JSON pro vytvoření tabulky.

Python

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

Poznámka:

Funkce read_files SQL je společná pro všechna prostředí SQL v Azure Databricks. Tento vzor se doporučuje pro přímý přístup k souborům pomocí SQL v kanálech. Další informace naleznete v tématu Možnosti.

Načtení dat z vlastního zdroje dat Pythonu

Vlastní zdroje dat Pythonu umožňují načíst data ve vlastních formátech. Můžete napsat kód pro čtení a zápis do konkrétního externího zdroje dat nebo použít existující Python kód ke čtení dat z vlastních interních systémů. Další podrobnosti o vývoji zdrojů dat Pythonu najdete v tématu Vlastní zdroje dat PySpark.

Následující příklad zaregistruje vlastní zdroj dat s názvem formátu my_custom_datasource a čte z něj v dávkovém i streamovaném režimu.

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

Konfigurace streamované tabulky tak, aby ignorovala změny ve zdrojové streamovací tabulce

Ve výchozím nastavení streamované tabulky vyžadují zdroje pouze pro přidávání. Pokud zdrojová streamovací tabulka vyžaduje aktualizace nebo odstranění ( například pro zpracování práva na zapomenutí GDPR), použijte skipChangeCommits příznak k ignorování těchto změn. Tento příznak funguje jen s spark.readStream při použití funkce option() a nelze ho použít, pokud je streamovací tabulka zdrojem pro funkci create_auto_cdc_flow(). Další informace najdete v tématu Zpracování změn zdrojových tabulek Delta.

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Bezpečný přístup k přihlašovacím údajům úložiště s tajemstvími v rámci pipeline

Můžete použít Azure Databricks tajemství k ukládání přihlašovacích údajů, jako jsou přístupové klíče nebo hesla. K nastavení tajemství ve vašem přenosovém potrubí použijte vlastnost prostředí Spark v konfiguraci clusteru nastavení potrubí. Viz Konfigurace klasického výpočtu pro pipeliny.

Následující příklad používá tajný údaj k uložení přístupového klíče, který je nezbytný ke čtení vstupních dat z účtu úložiště Azure Data Lake Storage pomocí Auto Loaderu. Stejnou metodu můžete použít ke konfiguraci jakéhokoli tajného kódu vyžadovaného vaším kanálem, například klíčů AWS pro přístup k S3 nebo k heslu k metastoru Apache Hive.

Další informace o práci se službou Azure Data Lake Storage najdete v tématu Připojení ke službě Azure Data Lake Storage ablob Storage.

Poznámka:

Do konfiguračního klíče spark.hadoop., který nastaví hodnotu tajného klíče, musíte přidat předponu spark_conf.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

V této ukázce kódu nahraďte následující hodnoty.

Zástupný symbol Nahradit za
<container-name> Název kontejneru účtu úložiště Azure.
<storage-account-name> Název účtu úložiště ADLS.
<path> Cesta pro výstupní data a metadata kanálu.
<scope-name> Název oboru tajného kódu Azure Databricks.
<secret-name> Název klíče obsahujícího přístupový klíč účtu úložiště Azure.
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

V této ukázce kódu nahraďte následující hodnoty.

Zástupný symbol Nahradit za
<container-name> Název kontejneru účtu úložiště Azure, který ukládá vstupní data.
<storage-account-name> Název účtu úložiště ADLS.
<path-to-input-dataset> Cesta ke vstupní datové sadě.