Sdílet prostřednictvím


Načtení dat v kanálech

Data z libovolného zdroje dat podporovaného Apache Sparkem v Azure Databricks můžete načíst pomocí kanálů. Datové sady (tabulky a zobrazení) můžete definovat v deklarativních kanálech Spark Lakeflow pro jakýkoliv dotaz, který vrací datový rámec Spark, včetně streamovaných datových rámců a použití knihovny Pandas pro datové rámce Spark. Pro úlohy příjmu dat doporučuje Databricks používat streamované tabulky pro většinu případů použití. Streamované tabulky jsou vhodné pro příjem dat z cloudového úložiště objektů pomocí Auto Loaderu nebo ze sběrnic zpráv, jako Kafka.

Poznámka:

  • Ne všechny zdroje dat mají podporu SQL pro příjem dat. Zdroje SQL a Pythonu můžete kombinovat v kanálech, abyste mohli používat Python tam, kde je potřeba, a SQL pro jiné operace ve stejném kanálu.
  • Podrobnosti o práci s knihovnami, které nejsou zabalené v deklarativních kanálech Sparku Lakeflow, najdete v tématu Správa závislostí Pythonu pro kanály.
  • Obecné informace o příjmu dat v Azure Databricks najdete v tématu Standardní konektory v Lakeflow Connect.

Níže uvedené příklady ukazují některé běžné vzory.

Načtení z existující tabulky

Načtěte data z jakékoli existující tabulky v Azure Databricks. Data můžete transformovat pomocí dotazu nebo načíst tabulku pro další zpracování ve zpracovatelském řetězci.

Následující příklad načte data z existující tabulky:

Python

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

Načtení souborů z cloudového úložiště objektů

Databricks doporučuje používat Auto Loader v datových tocích pro většinu zátěží příjmu dat z cloudového objektového úložiště nebo ze souborů ve svazku Unity Catalog. Automatické nástroje pro nahrávání a datové kanály jsou navrženy tak, aby postupně a idempotentně načítaly stále rostoucí množství dat, jakmile dorazí do cloudového úložiště.

Podívejte se na Co je automatický zavaděč? a Načtěte data z úložiště objektů.

Následující příklad načítá data z cloudového úložiště pomocí Auto Loader:

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

Následující příklady používají Auto Loader k vytváření datových sad ze souborů CSV v oddílu katalogu Unity.

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

Poznámka:

  • Pokud používáte Auto Loader s oznámeními o souborech a spustíte úplnou aktualizaci pipeline nebo streamovací tabulky, musíte prostředky ručně vyčistit. K vyčištění můžete použít CloudFilesResourceManager v poznámkovém bloku.
  • Pokud chcete načíst soubory s funkcí Auto Loader v kanálu s povoleným katalogem Unity, musíte použít externí umístění. Další informace o používání katalogu Unity s kanály najdete v tématu Použití katalogu Unity s kanály.

Načtení dat ze sběrnice zpráv

Kanály můžete nakonfigurovat tak, aby ingestovali data z sběrnic zpráv. Databricks doporučuje používat tabulky streamování s průběžným spouštěním a vylepšeným automatickým škálováním, aby se zajistilo co nejefektivnější příjem dat při načítání z sběrnic zpráv s nízkou latencí. Viz Optimalizace využití clusteru deklarativních kanálů Sparku Lakeflow pomocí automatického škálování.

Následující kód například pomocí funkce read_kafka nakonfiguruje streamovací tabulku pro příjem dat ze systému Kafka:

Python

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

Chcete-li přijímat data z jiných zdrojů sběrnice zpráv, přečtěte si:

Načtení dat z Azure Event Hubs

Azure Event Hubs je služba streamování dat, která poskytuje rozhraní kompatibilní s Apache Kafka. K načtení zpráv ze služby Azure Event Hubs můžete použít konektor Kafka strukturovaného streamování, který je součástí modulu runtime deklarativních kanálů Sparku Lakeflow. Další informace o načítání a zpracování zpráv ze služby Azure Event Hubs najdete v tématu Použití služby Azure Event Hubs jako zdroje dat kanálu.

Načtení dat z externích systémů

Deklarativní kanály Sparku Lakeflow podporují načítání dat z libovolného zdroje dat podporovaného službou Azure Databricks. Viz Připojení ke zdrojům dat a externím službám. Externí data můžete načíst také s využitím federace Lakehouse pro podporované zdroje dat. Protože Lakehouse Federation vyžaduje Databricks Runtime 13.3 LTS nebo vyšší, vaše pipeline musí být pro použití Lakehouse Federation konfigurována tak, aby používala předběžný kanál.

Některé zdroje dat nemají v SQL ekvivalentní podporu. Pokud se službou Lakehouse Federation nemůžete použít některý z těchto zdrojů dat, můžete pomocí Pythonu ingestovat data ze zdroje. Zdrojové soubory Pythonu a SQL můžete přidat do stejného kanálu. Následující příklad deklaruje materializované zobrazení pro přístup k aktuálnímu stavu dat ve vzdálené tabulce PostgreSQL:

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Načtení malých nebo statických datových sad z cloudového úložiště objektů

Malé nebo statické datové sady můžete načíst pomocí syntaxe načtení Apache Sparku. Deklarativní kanály Sparku Lakeflow podporují všechny formáty souborů podporované Apache Sparkem v Azure Databricks. Úplný seznam najdete v tématu Možnosti formátu dat.

Následující příklady ukazují načtení JSON pro vytvoření tabulky:

Python

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

Poznámka:

Funkce read_files SQL je společná pro všechna prostředí SQL v Azure Databricks. Tento vzor se doporučuje pro přímý přístup k souborům pomocí SQL v kanálech. Další informace naleznete v tématu Možnosti.

Načtení dat z vlastního zdroje dat Pythonu

Vlastní zdroje dat Pythonu umožňují načíst data ve vlastních formátech. Můžete napsat kód pro čtení a zápis do konkrétního externího zdroje dat nebo využít existující kód Pythonu ve stávajících systémech ke čtení dat z vlastních interních systémů. Další podrobnosti o vývoji zdrojů dat Pythonu najdete v tématu Vlastní zdroje dat PySpark.

Pokud chcete k načtení dat do kanálu použít vlastní zdroj dat Pythonu, zaregistrujte ho s názvem formátu, například my_custom_datasourcea pak ho načtěte:

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

Konfigurace streamované tabulky tak, aby ignorovala změny ve zdrojové streamovací tabulce

Poznámka:

  • Příznak skipChangeCommits funguje jenom s spark.readStream pomocí funkce option(). Tento příznak nelze použít ve dp.read_stream() funkci.
  • Příznak skipChangeCommits nelze použít, když je streamovací tabulka zdroje definována jako cíl funkce create_auto_cdc_flow().

Ve výchozím nastavení streamované tabulky vyžadují zdroje pouze pro přidávání. Pokud streamovací tabulka používá jako zdroj jinou streamovací tabulku a zdrojová streamovací tabulka vyžaduje aktualizace nebo odstranění, například zpracování gdpr "právo na zapomenutí", můžete při čtení zdrojové tabulky streamování nastavit příznak skipChangeCommits, aby se tyto změny ignorovaly. Další informace o tomto příznaku naleznete viz Ignorovat aktualizace a odstraňování.

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Bezpečný přístup k přihlašovacím údajům úložiště s tajemstvími v rámci pipeline

Můžete použít Azure Databricks tajemství k ukládání přihlašovacích údajů, jako jsou přístupové klíče nebo hesla. K nastavení tajemství ve vašem přenosovém potrubí použijte vlastnost prostředí Spark v konfiguraci clusteru nastavení potrubí. Viz Konfigurace klasického výpočtu pro pipeliny.

Následující příklad používá tajný klíč k uložení přístupového klíče potřebného ke čtení vstupních dat z účtu úložiště Azure Data Lake Storage (ADLS) pomocí automatického zavaděče. Stejnou metodu můžete použít ke konfiguraci jakéhokoli tajného kódu vyžadovaného vaším kanálem, například klíčů AWS pro přístup k S3 nebo k heslu k metastoru Apache Hive.

Další informace o práci se službou Azure Data Lake Storage najdete v tématu Připojení ke službě Azure Data Lake Storage ablob Storage.

Poznámka:

Do konfiguračního klíče spark.hadoop., který nastaví hodnotu tajného klíče, musíte přidat předponu spark_conf.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

Nahradit

  • <storage-account-name> s názvem účtu úložiště ADLS.
  • <scope-name> s názvem tajné oblasti Azure Databricks.
  • <secret-name> s názvem klíče obsahujícího přístupový klíč účtu úložiště Azure.
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Nahradit

  • <container-name> s názvem kontejneru účtu úložiště Azure, který ukládá vstupní data.
  • <storage-account-name> s názvem účtu úložiště ADLS.
  • <path-to-input-dataset> s cestou ke vstupní datové sadě.