共用方式為


在管線中載入資料

您可以使用管線,從 Azure Databricks 上 Apache Spark 支援的任何資料來源載入資料。 您可以在 Lakeflow 的 Spark 宣告式管線中,針對任何傳回 Spark DataFrame 的查詢定義資料集(資料表和檢視),包括串流 DataFrames 和用於 Spark DataFrames 的 Pandas。 針對數據擷取工作,Databricks 建議針對大部分的使用案例使用串流數據表。 串流數據表適用於使用自動載入器從雲端物件記憶體擷取數據,或從 Kafka 之類的訊息總線擷取數據。

備註

  • 並非所有資料來源都支援擷取的 SQL 支援。 您可以在管線中混合使用 SQL 和 Python 來源,以便在需要時使用 Python,並將 SQL 用於相同管線中的其他作業。
  • 如需瞭解使用未預設打包在 Lakeflow Spark 宣告式管線中的程式庫的詳細資訊,請參閱管理管線的 Python 依賴項
  • 如需瞭解有關 Azure Databricks 中資料匯入的相關資訊,請參閱 Lakeflow Connect 的標準連接器

下列範例示範一些常見的模式。

從現有的數據表載入

從 Azure Databricks 中的任何現有數據表載入數據。 您可以使用查詢來轉換資料,或載入資料表以在管線中進一步處理。

下列範例會從現有的數據表讀取資料:

Python

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

從雲端物件記憶體載入檔案

Databricks 建議在管線中使用自動載入器,以從雲端物件儲存體或 Unity 目錄磁碟區中的檔案進行大部分的資料擷取工作。 自動載入器和管線設計為以增量和冪等方式載入不斷增加的數據,當資料到達雲端儲存時。

請參閱自動加載器是什麼?從物件存儲加載數據

下列範例會使用自動載入器從雲端記憶體讀取資料:

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

下列範例使用自動載入器從 Unity 目錄磁碟區中的 CSV 檔案建立資料集:

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

備註

  • 如果您使用自動載入器搭配檔案通知,並針對管線或串流數據表執行完整重新整理,則必須手動清除您的資源。 您可以使用筆記本中的 CloudFilesResourceManager 來執行清除。
  • 若要在已啟用 Unity Catalog 的管線中使用 Auto Loader 載入檔案,您必須使用 外部位置。 若要了解關於使用 Unity 目錄搭配管線的更多資訊,請參閱 使用 Unity 目錄搭配管線

從訊息總線載入數據

您可以設定管線以從訊息匯流排擷取資料。 Databricks 建議使用串流資料表搭配連續執行和增強的自動調整,以提供來自訊息匯流排的高效低延遲載入擷取。 請參閱 透過自動調整最佳化 Lakeflow Spark 陳述式管線的叢集使用率

例如,下列程式代碼會使用 read_kafka 函式,將串流數據表設定為從 Kafka 擷取資料:

Python

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

若要從其他訊息總線來源匯入,請參閱:

從 Azure 事件中樞載入數據

Azure 事件中樞是提供 Apache Kafka 相容介面的數據串流服務。 您可以使用 Lakeflow Spark 宣告式管線執行階段中包含的結構化串流 Kafka 連接器,從 Azure 事件中樞載入訊息。 若要深入瞭解如何從 Azure 事件中樞載入和處理訊息,請參閱 使用 Azure 事件中樞作為管線資料來源

從外部系統載入數據

Lakeflow Spark 宣告式管線支援從 Azure Databricks 支援的任何資料來源載入資料。 請參閱 連線至數據源和外部服務。 您也可以使用 Lakehouse Federation 載入外部資料,適用於 支援的數據源。 由於要使用 Lakehouse 聯盟需要 Databricks Runtime 13.3 LTS 或更新版本,因此要使用 Lakehouse 聯盟,您必須將管線設定為使用 預覽通道

某些數據源在 SQL 中沒有對等的支援。 如果您無法將 Lakehouse 同盟與其中一個資料來源搭配使用,您可以使用 Python 從來源擷取資料。 您可以將 Python 和 SQL 原始檔新增至相同的管線。 下列範例會宣告具體化檢視,以存取遠端 PostgreSQL 數據表中的數據目前狀態:

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

從雲端物件記憶體載入小型或靜態數據集

您可以使用 Apache Spark 載入語法來載入小型或靜態數據集。 Lakeflow Spark 宣告式管線支援 Azure Databricks 上 Apache Spark 支援的所有檔案格式。 如需完整清單,請參閱 數據格式選項。

下列範例示範載入 JSON 以建立資料表:

Python

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

備註

read_files SQL 函式是 Azure Databricks 上所有 SQL 環境通用的。 這是在管線中使用 SQL 直接存取檔案的建議模式。 如需詳細資訊,請參閱 選項

從 Python 自訂資料來源載入資料

Python 自訂資料來源可讓您以自訂格式載入資料。 您可以編寫程式碼來讀取和寫入特定的外部資料來源,或利用現有系統中的現有 Python 程式碼從您自己的內部系統讀取資料。 如需開發 Python 資料來源的詳細資訊,請參閱 PySpark 自訂資料來源

若要使用 Python 自訂資料來源將資料載入管線,請使用格式名稱 my_custom_datasource註冊它,例如 ,然後從中讀取:

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

設定串流資料表以忽略來源串流資料表中的變更

備註

  • skipChangeCommits 旗標僅在與 spark.readStream 一起使用 option() 函式時才有效。 您無法在 dp.read_stream() 函式中使用此旗標。
  • 當來源串流數據表定義為 skipChangeCommits 函式的目標時,您無法使用 旗標。

根據默認設定,串流資料表需要只接受附加的來源。 當一個串流資料表使用另一個串流資料表作為來源,而需要在來源串流資料表上進行更新或刪除,例如在處理 GDPR 的「被遺忘的權利」時,可以在讀取來源串流資料表時設定 skipChangeCommits 旗標,以忽略這些變更。 如需此旗標的詳細資訊,請參閱 忽略更新和刪除

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

利用管線中的秘密安全地存取儲存憑證

您可以使用 Azure Databricks 秘密 來儲存認證,例如存取密鑰或密碼。 若要在管線中設定秘密,請在管線設定叢集組態中使用Spark屬性。 請參閱 設定管線的傳統計算

下列範例會使用秘密來儲存從 Azure Data Lake Storage (ADLS) 儲存體帳戶讀取輸入資料所需的存取金鑰,使用 自動載入器。 您可以使用這個相同的方法來設定管線所需的任何秘密,例如 AWS 密鑰來存取 S3 或 Apache Hive 中繼存放區的密碼。

若要深入瞭解如何使用 Azure Data Lake Storage,請參閱 連線至 Azure Data Lake Storage 和 Blob 儲存體

備註

您必須將 spark.hadoop. 前綴新增至設定秘密值的 spark_conf 配置密鑰。

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

Replace

  • <storage-account-name> 具有ADLS儲存帳戶名稱。
  • <scope-name> 與 Azure Databricks 秘密範圍名稱關聯。
  • <secret-name> 為包含 Azure 儲存帳戶存取金鑰的金鑰名稱。
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name>,其中包含儲存輸入數據的 Azure 記憶體帳戶容器名稱。
  • <storage-account-name> 具有ADLS儲存帳戶名稱。
  • <path-to-input-dataset> 附有輸入數據集的路徑。