Načtení dat pomocí tabulek Delta Live Tables
Pomocí tabulek Delta Live Tables můžete načíst data z libovolného zdroje dat podporovaného Apache Spark v Azure Databricks. Datové sady (tabulky a zobrazení) v tabulkách Delta Live Tables můžete definovat proti libovolnému dotazu, který vrací datový rámec Spark, včetně streamovaných datových rámců DataFrame a Pandas pro datové rámce Spark. Databricks doporučuje pro úlohy příjmu dat používat ve většině případů použití streamovaných tabulek. Streamované tabulky jsou vhodné pro příjem dat z cloudového úložiště objektů pomocí automatického zavaděče nebo ze sběrnic zpráv, jako je Kafka. Následující příklady ukazují některé běžné vzory.
Důležité
Ne všechny zdroje dat mají podporu SQL. Poznámkové bloky SQL a Python můžete kombinovat v kanálu Delta Live Tables, abyste používali SQL pro všechny operace nad rámec příjmu dat.
Podrobnosti o práci s knihovnami, které nejsou ve výchozím nastavení zabalené v Delta Live Tables, najdete v tématu Správa závislostí Pythonu pro kanály Delta Live Tables.
Načtení souborů z cloudového úložiště objektů
Databricks doporučuje používat automatický zavaděč s rozdílovými živými tabulkami pro většinu úloh příjmu dat z cloudového úložiště objektů. Automatické zavaděče a dynamické tabulky Delta jsou navržené tak, aby postupně a idempotentním způsobem načítá stále rostoucí data při příchodu do cloudového úložiště. Následující příklady používají automatický zavaděč k vytvoření datových sad ze souborů CSV a JSON:
Poznámka:
Pokud chcete načíst soubory s automatickým zavaděčem v kanálu s povoleným katalogem Unity, musíte použít externí umístění. Další informace o používání katalogu Unity s dynamickými tabulkami Delta najdete v tématu Použití katalogu Unity s kanály Delta Live Tables.
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
Podívejte se , co je automatický zavaděč? a syntaxe SQL automatického zavaděče.
Upozorňující
Pokud používáte automatický zavaděč s oznámeními o souborech a spustíte úplnou aktualizaci kanálu nebo tabulky streamování, musíte prostředky ručně vyčistit. K vyčištění můžete použít CloudFilesResourceManager v poznámkovém bloku.
Načtení dat ze sběrnice zpráv
Kanály Delta Live Tables můžete nakonfigurovat tak, aby ingestovali data z sběrnic zpráv se streamovanými tabulkami. Databricks doporučuje kombinovat streamované tabulky s průběžným spouštěním a vylepšeným automatickým škálováním, aby se zajistilo nejúčinnější příjem dat pro načítání z sběrnic zpráv s nízkou latencí. Viz Optimalizace využití clusteru kanálů Delta Live Tables s vylepšeným automatickým škálováním.
Následující kód například nakonfiguruje streamovací tabulku pro příjem dat ze systému Kafka:
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
Podřízené operace můžete napsat v čistém SQL, aby se na těchto datech prováděly transformace streamování, jako v následujícím příkladu:
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(LIVE.kafka_raw)
WHERE ...
Příklad práce se službou Event Hubs najdete v tématu Použití služby Azure Event Hubs jako zdroje dat Delta Live Tables.
Viz Konfigurace streamovaných zdrojů dat.
Načtení dat z externích systémů
Delta Live Tables podporuje načítání dat z libovolného zdroje dat podporovaného službou Azure Databricks. Viz Připojení ke zdrojům dat. Můžete také načíst externí data pomocí federace Lakehouse pro podporované zdroje dat. Vzhledem k tomu, že federace Lakehouse vyžaduje Databricks Runtime 13.3 LTS nebo vyšší, musí být pro použití kanálu Lakehouse Federation nakonfigurovaný kanál Preview.
Některé zdroje dat nemají v SQL ekvivalentní podporu. Pokud se službou Lakehouse Federation nemůžete použít některý z těchto zdrojů dat, můžete k ingestování dat ze zdroje použít poznámkový blok Pythonu. Zdrojový kód Pythonu a SQL můžete přidat do stejného kanálu Delta Live Tables. Následující příklad deklaruje materializované zobrazení pro přístup k aktuálnímu stavu dat ve vzdálené tabulce PostgreSQL:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Načtení malých nebo statických datových sad z cloudového úložiště objektů
Malé nebo statické datové sady můžete načíst pomocí syntaxe načtení Apache Sparku. Delta Live Tables podporuje všechny formáty souborů podporované Apache Sparkem v Azure Databricks. Úplný seznam najdete v tématu Možnosti formátu dat.
Následující příklady ukazují načtení JSON pro vytvoření tabulek Delta Live Tables:
Python
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
Poznámka:
Konstruktor SELECT * FROM format.`path`;
SQL je společný pro všechna prostředí SQL v Azure Databricks. Tento vzor se doporučuje pro přímý přístup k souborům pomocí SQL s Delta Live Tables.
Zabezpečený přístup k přihlašovacím údajům úložiště s tajnými kódy v kanálu
Tajné kódy Azure Databricks můžete použít k ukládání přihlašovacích údajů, jako jsou přístupové klíče nebo hesla. Ke konfiguraci tajného kódu v kanálu použijte vlastnost Spark v konfiguraci clusteru nastavení kanálu. Viz Konfigurace výpočetních prostředků pro kanál Delta Live Tables.
Následující příklad používá tajný klíč k uložení přístupového klíče potřebného ke čtení vstupních dat z účtu úložiště Azure Data Lake Storage Gen2 (ADLS Gen2) pomocí automatického zavaděče. Stejnou metodu můžete použít ke konfiguraci jakéhokoli tajného kódu vyžadovaného vaším kanálem, například klíčů AWS pro přístup k S3 nebo k heslu k metastoru Apache Hive.
Další informace o práci s Azure Data Lake Storage Gen2 najdete v tématu Připojení k Azure Data Lake Storage Gen2 a Blob Storage.
Poznámka:
Předponu spark.hadoop.
musíte přidat ke konfiguračnímu spark_conf
klíči, který nastaví hodnotu tajného klíče.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Nahradit
<storage-account-name>
s názvem účtu úložiště ADLS Gen2.<scope-name>
s názvem oboru tajného kódu Azure Databricks.<secret-name>
s názvem klíče, který obsahuje přístupový klíč účtu úložiště Azure.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Nahradit
<container-name>
s názvem kontejneru účtu úložiště Azure, který ukládá vstupní data.<storage-account-name>
s názvem účtu úložiště ADLS Gen2.<path-to-input-dataset>
s cestou ke vstupní datové sadě.
Načtení dat z Azure Event Hubs
Azure Event Hubs je služba streamování dat, která poskytuje rozhraní kompatibilní s Apache Kafka. K načtení zpráv ze služby Azure Event Hubs můžete použít konektor Kafka strukturovaného streamování, který je součástí modulu runtime Delta Live Tables. Další informace o načítání a zpracování zpráv ze služby Azure Event Hubs najdete v tématu Použití služby Azure Event Hubs jako zdroje dat Delta Live Tables.