Sdílet prostřednictvím


Načtení dat pomocí tabulek Delta Live Tables

Pomocí tabulek Delta Live Tables můžete načíst data z libovolného zdroje dat podporovaného Apache Spark v Azure Databricks. Datové sady (tabulky a zobrazení) v tabulkách Delta Live Tables můžete definovat proti libovolnému dotazu, který vrací datový rámec Spark, včetně streamovaných datových rámců DataFrame a Pandas pro datové rámce Spark. Databricks doporučuje pro úlohy příjmu dat používat ve většině případů použití streamovaných tabulek. Streamované tabulky jsou vhodné pro příjem dat z cloudového úložiště objektů pomocí automatického zavaděče nebo ze sběrnic zpráv, jako je Kafka. Následující příklady ukazují některé běžné vzory.

Důležité

Ne všechny zdroje dat mají podporu SQL. Poznámkové bloky SQL a Python můžete kombinovat v kanálu Delta Live Tables, abyste používali SQL pro všechny operace nad rámec příjmu dat.

Podrobnosti o práci s knihovnami, které nejsou ve výchozím nastavení zabalené v Delta Live Tables, najdete v tématu Správa závislostí Pythonu pro kanály Delta Live Tables.

Načtení souborů z cloudového úložiště objektů

Databricks doporučuje používat automatický zavaděč s rozdílovými živými tabulkami pro většinu úloh příjmu dat z cloudového úložiště objektů. Automatické zavaděče a dynamické tabulky Delta jsou navržené tak, aby postupně a idempotentním způsobem načítá stále rostoucí data při příchodu do cloudového úložiště. Následující příklady používají automatický zavaděč k vytvoření datových sad ze souborů CSV a JSON:

Poznámka:

Pokud chcete načíst soubory s automatickým zavaděčem v kanálu s povoleným katalogem Unity, musíte použít externí umístění. Další informace o používání katalogu Unity s dynamickými tabulkami Delta najdete v tématu Použití katalogu Unity s kanály Delta Live Tables.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Podívejte se , co je automatický zavaděč? a syntaxe SQL automatického zavaděče.

Upozorňující

Pokud používáte automatický zavaděč s oznámeními o souborech a spustíte úplnou aktualizaci kanálu nebo tabulky streamování, musíte prostředky ručně vyčistit. K vyčištění můžete použít CloudFilesResourceManager v poznámkovém bloku.

Načtení dat ze sběrnice zpráv

Kanály Delta Live Tables můžete nakonfigurovat tak, aby ingestovali data z sběrnic zpráv se streamovanými tabulkami. Databricks doporučuje kombinovat streamované tabulky s průběžným spouštěním a vylepšeným automatickým škálováním, aby se zajistilo nejúčinnější příjem dat pro načítání z sběrnic zpráv s nízkou latencí. Viz Optimalizace využití clusteru kanálů Delta Live Tables s vylepšeným automatickým škálováním.

Následující kód například nakonfiguruje streamovací tabulku pro příjem dat ze systému Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Podřízené operace můžete napsat v čistém SQL, aby se na těchto datech prováděly transformace streamování, jako v následujícím příkladu:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Příklad práce se službou Event Hubs najdete v tématu Použití služby Azure Event Hubs jako zdroje dat Delta Live Tables.

Viz Konfigurace streamovaných zdrojů dat.

Načtení dat z externích systémů

Delta Live Tables podporuje načítání dat z libovolného zdroje dat podporovaného službou Azure Databricks. Viz Připojení ke zdrojům dat. Můžete také načíst externí data pomocí federace Lakehouse pro podporované zdroje dat. Vzhledem k tomu, že federace Lakehouse vyžaduje Databricks Runtime 13.3 LTS nebo vyšší, musí být pro použití kanálu Lakehouse Federation nakonfigurovaný kanál Preview.

Některé zdroje dat nemají v SQL ekvivalentní podporu. Pokud se službou Lakehouse Federation nemůžete použít některý z těchto zdrojů dat, můžete k ingestování dat ze zdroje použít poznámkový blok Pythonu. Zdrojový kód Pythonu a SQL můžete přidat do stejného kanálu Delta Live Tables. Následující příklad deklaruje materializované zobrazení pro přístup k aktuálnímu stavu dat ve vzdálené tabulce PostgreSQL:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Načtení malých nebo statických datových sad z cloudového úložiště objektů

Malé nebo statické datové sady můžete načíst pomocí syntaxe načtení Apache Sparku. Delta Live Tables podporuje všechny formáty souborů podporované Apache Sparkem v Azure Databricks. Úplný seznam najdete v tématu Možnosti formátu dat.

Následující příklady ukazují načtení JSON pro vytvoření tabulek Delta Live Tables:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Poznámka:

Konstruktor SELECT * FROM format.`path`; SQL je společný pro všechna prostředí SQL v Azure Databricks. Tento vzor se doporučuje pro přímý přístup k souborům pomocí SQL s Delta Live Tables.

Zabezpečený přístup k přihlašovacím údajům úložiště s tajnými kódy v kanálu

Tajné kódy Azure Databricks můžete použít k ukládání přihlašovacích údajů, jako jsou přístupové klíče nebo hesla. Ke konfiguraci tajného kódu v kanálu použijte vlastnost Spark v konfiguraci clusteru nastavení kanálu. Viz Konfigurace výpočetních prostředků pro kanál Delta Live Tables.

Následující příklad používá tajný klíč k uložení přístupového klíče potřebného ke čtení vstupních dat z účtu úložiště Azure Data Lake Storage Gen2 (ADLS Gen2) pomocí automatického zavaděče. Stejnou metodu můžete použít ke konfiguraci jakéhokoli tajného kódu vyžadovaného vaším kanálem, například klíčů AWS pro přístup k S3 nebo k heslu k metastoru Apache Hive.

Další informace o práci s Azure Data Lake Storage Gen2 najdete v tématu Připojení k Azure Data Lake Storage Gen2 a Blob Storage.

Poznámka:

Předponu spark.hadoop. musíte přidat ke konfiguračnímu spark_conf klíči, který nastaví hodnotu tajného klíče.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Nahradit

  • <storage-account-name> s názvem účtu úložiště ADLS Gen2.
  • <scope-name> s názvem oboru tajného kódu Azure Databricks.
  • <secret-name> s názvem klíče, který obsahuje přístupový klíč účtu úložiště Azure.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Nahradit

  • <container-name> s názvem kontejneru účtu úložiště Azure, který ukládá vstupní data.
  • <storage-account-name> s názvem účtu úložiště ADLS Gen2.
  • <path-to-input-dataset> s cestou ke vstupní datové sadě.

Načtení dat z Azure Event Hubs

Azure Event Hubs je služba streamování dat, která poskytuje rozhraní kompatibilní s Apache Kafka. K načtení zpráv ze služby Azure Event Hubs můžete použít konektor Kafka strukturovaného streamování, který je součástí modulu runtime Delta Live Tables. Další informace o načítání a zpracování zpráv ze služby Azure Event Hubs najdete v tématu Použití služby Azure Event Hubs jako zdroje dat Delta Live Tables.