在管線中載入資料

您可以使用管線,從 Azure Databricks 上 Apache Spark 支援的任何資料來源載入資料。 你可以在 Lakeflow Spark 宣告式管線中針對任何回傳 Spark DataFrame 的查詢(包括串流 DataFrame 和 Pandas for Spark DataFrames)定義資料集——資料表和檢視。 針對數據擷取工作,Databricks 建議針對大部分的使用案例使用串流數據表。 串流資料表對於從雲端物件儲存使用 Auto Loader 或從訊息系統(如 Kafka)擷取資料非常有用。

並非所有資料來源都支援擷取的 SQL 支援。 不過,你可以在同一條管線中混合使用 SQL 和 Python 原始碼,以便在需要時使用 Python。 如需瞭解使用未預設打包在 Lakeflow Spark 宣告式管線中的程式庫的詳細資訊,請參閱管理管線的 Python 依賴項。 如需瞭解有關 Azure Databricks 中資料匯入的相關資訊,請參閱 Lakeflow Connect 的標準連接器

以下範例展示了一些常見的資料載入模式。

從現有資料表載入

從 Azure Databricks 中的任何現有數據表載入數據。 你可以使用查詢轉換資料,或者載入資料表以便在工作流程中進一步處理。

Python

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

從雲端物件記憶體載入檔案

Databricks 建議在管線中使用自動載入器,以從雲端物件儲存體或 Unity 目錄磁碟區中的檔案進行大部分的資料擷取工作。 自動載入器和管線設計為以增量和冪等方式載入不斷增加的數據,當資料到達雲端儲存時。 請參閱自動加載器是什麼?從物件存儲加載數據

以下範例是利用 Auto Loader 從雲端儲存讀取資料。

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

以下範例使用 Auto Loader 從 Unity 目錄磁碟區的 CSV 檔案建立資料集。

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

備註

  • 如果您使用自動載入器搭配檔案通知,並針對管線或串流數據表執行完整重新整理,則必須手動清除您的資源。 您可以使用筆記本中的 CloudFilesResourceManager 來執行清除。
  • 若要在已啟用 Unity Catalog 的管線中使用 Auto Loader 載入檔案,您必須使用 外部位置。 若要了解關於使用 Unity 目錄搭配管線的更多資訊,請參閱 使用 Unity 目錄搭配管線

登入驗證至雲端儲存空間

Auto Loader 使用 Unity Catalog 的外部位置來驗證雲端儲存。 你必須為你想讀取的儲存路徑設定一個外部位置,並將權限授予 READ FILES 執行中的使用者。

要從 Azure Data Lake Storage 匯入資料,請設定一個外部位置,並由一個引用儲存容器的儲存憑證作為支持。 欲了解更多資訊,請參閱 使用 Unity 目錄連接雲端物件儲存

從訊息匯流排讀取資料

您可以設定管線以從訊息匯流排擷取資料。 Databricks 建議使用串流資料表搭配連續執行和增強的自動調整,以提供來自訊息匯流排的高效低延遲載入擷取。 欲了解更多資訊,請參閱 「利用自動調整來優化 Lakeflow Spark 宣告式管線的叢集利用率」

例如,以下程式碼將串流資料表配置為使用 read_kafka 函式從 Kafka 匯入資料。

Python

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

若要從其他訊息總線來源匯入,請參閱:

從Azure 事件中樞載入資料

Azure 事件中樞是提供 Apache Kafka 相容介面的數據串流服務。 您可以使用 Lakeflow Spark 宣告式管線執行階段中包含的結構化串流 Kafka 連接器,從 Azure 事件中樞載入訊息。 若要深入瞭解如何從 Azure 事件中樞載入和處理訊息,請參閱 使用 Azure 事件中樞作為管線資料來源

載入資料來自外部系統

Lakeflow Spark 宣告式管線支援從 Azure Databricks 支援的任何資料來源載入資料。 請參閱 連線至數據源和外部服務。 您也可以使用 Lakehouse Federation 載入外部資料,適用於 支援的數據源。 因為 Lakehouse Federation 需要 Databricks Runtime 13.3 LTS 或以上的版本,若要使用 Lakehouse Federation,請設定你的管線使用 預覽通道

有些資料來源沒有同等的 SQL 支援。 如果您無法將 Lakehouse 同盟與其中一個資料來源搭配使用,您可以使用 Python 從來源擷取資料。 您可以將 Python 和 SQL 原始檔新增至相同的管線。 以下範例宣告一個物質化檢視,以存取遠端 PostgreSQL 資料表中當前的資料狀態。

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

從雲端物件儲存載入小型或靜態資料集

您可以使用 Apache Spark 載入語法來載入小型或靜態數據集。 Lakeflow Spark 宣告式管線支援 Azure Databricks 上 Apache Spark 支援的所有檔案格式。 如需完整清單,請參閱 數據格式選項。

以下範例示範載入 JSON 以建立資料表。

Python

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

備註

read_files SQL 函式是 Azure Databricks 上所有 SQL 環境通用的。 這是在管線中使用 SQL 直接存取檔案的建議模式。 如需詳細資訊,請參閱 選項

從 Python 自訂資料來源載入資料

Python 自訂資料來源可讓您以自訂格式載入資料。 你可以寫程式碼來讀取和寫入特定的外部資料來源,或使用現有的 Python 程式碼從你自己的內部系統讀取資料。 如需開發 Python 資料來源的詳細資訊,請參閱 PySpark 自訂資料來源

以下範例註冊一個格式名稱 my_custom_datasource 的自訂資料來源,並以批次與串流模式讀取資料。

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

設定串流資料表以忽略來源串流資料表中的變更

根據默認設定,串流資料表需要只接受附加的來源。 如果你的來源串流表需要更新或刪除,請利用skipChangeCommits 此旗標忽略這些變更,例如在GDPR「被遺忘權」的處理過程中。 此旗標僅在 spark.readStream 使用 option() 該函式時生效,且當來源串流表是 create_auto_cdc_flow() 函式的目標時無法使用。 欲了解更多資訊,請參閱 處理來源 Delta 資料表的變更

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

利用管線中的秘密安全地存取儲存憑證

您可以使用 Azure Databricks 秘密 來儲存認證,例如存取密鑰或密碼。 若要在管線中設定秘密,請在管線設定叢集組態中使用Spark屬性。 請參閱 設定管線的傳統計算

以下範例使用機密來儲存存取金鑰,此金鑰是利用自動載入器讀取 Azure Data Lake Storage 儲存帳戶中輸入資料所需的。 您可以使用這個相同的方法來設定管線所需的任何秘密,例如 AWS 密鑰來存取 S3 或 Apache Hive 中繼存放區的密碼。

若要深入瞭解如何使用 Azure Data Lake Storage,請參閱 連線至 Azure Data Lake Storage 和 Blob 儲存體

備註

您必須將 spark.hadoop. 前綴新增至設定秘密值的 spark_conf 配置密鑰。

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

在此程式碼範例中,替換以下數值。

預留位置 替換為
<container-name> Azure 儲存帳號容器的名稱。
<storage-account-name> ADLS儲存帳號名稱。
<path> 管線輸出資料與元資料的路徑。
<scope-name> Azure Databricks 密碼範圍名稱。
<secret-name> 包含 Azure 儲存帳號存取金鑰的金鑰名稱。
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

在此程式碼範例中,替換以下數值。

預留位置 替換為
<container-name> 儲存輸入資料的 Azure 儲存帳號容器名稱。
<storage-account-name> ADLS儲存帳號名稱。
<path-to-input-dataset> 就是通往輸入資料集的路徑。