Share via


讀取使用 Delta Sharing 開啟共享共享的數據(適用於收件者)

本文說明如何使用 Delta Sharing 開啟共用 通訊協議來讀取與您共用的數據。 在開啟共用中,您會使用數據提供者與小組成員共用的認證檔案,以取得共享數據的安全讀取存取權。 只要認證有效,且提供者會繼續共享數據,存取就會保存。 提供者會管理認證到期和輪替。 數據 更新 幾乎即時可供您使用。 您可以讀取和製作共享數據的複本,但無法修改源數據。

注意

如果使用 Databricks-to-Databricks Delta Sharing 與您共用數據,則不需要認證檔案來存取數據,而且本文不適用於您。 如需指示,請參閱使用 Databricks-to-Databricks Delta Sharing 讀取共用的數據(適用於收件者)。

下列各節說明如何使用 Azure Databricks、Apache Spark、pandas 和 Power BI,以使用認證檔案來存取和讀取共享數據。 如需差異共用連接器的完整清單,以及如何使用連接器的詳細資訊,請參閱差異共用 開放原始碼 檔。 如果您無法存取共享數據,請連絡數據提供者。

注意

除非另有說明,否則合作夥伴整合是由第三方提供,而且您必須具有適當提供者的帳戶,才能使用其產品和服務。 雖然 Databricks 盡最大努力讓此內容保持在最新狀態,但我們不會就合作夥伴整合頁面上的內容或內容的精確度做出任何表示。 連絡適當的提供者,瞭解整合。

開始之前

小組成員必須下載數據提供者共用的認證檔案。 請參閱 在開放式共用模型中取得存取權。

他們應該使用安全通道與您共用該檔案或檔案位置。

Azure Databricks:使用開放式共用連接器讀取共享數據

本節說明如何使用開放式共享連接器,在 Azure Databricks 工作區中使用筆記本存取共享數據。 您或小組的另一個成員會將認證檔案儲存在 DBFS 中,然後使用它向數據提供者的 Azure Databricks 帳戶進行驗證,並讀取數據提供者與您共享的數據。

注意

如果數據提供者使用 Databricks 對 Databricks 共用,但未與您共用認證檔案,您必須使用 Unity 目錄存取數據。 如需指示,請參閱使用 Databricks-to-Databricks Delta Sharing 讀取共用的數據(適用於收件者)。

在此範例中,您會建立具有多個數據格的筆記本,以便獨立執行。 您可以改為將筆記本命令新增至相同的數據格,並依序執行。

步驟 1:將認證檔案儲存在 DBFS 中(Python 指示)

在此步驟中,您會在 Azure Databricks 中使用 Python Notebook 來儲存認證檔案,讓小組上的使用者可以存取共享的數據。

如果您或小組中的某人已經將認證檔案儲存在 DBFS 中,請跳至下一個步驟。

  1. 在文本編輯器中,開啟認證檔案。

  2. 在您的 Azure Databricks 工作區中,按兩下 [ 新增 > 筆記本]。

    • 輸入名稱。
    • 將筆記本的默認語言設定為 Python。
    • 選取要附加至筆記本的叢集。
    • 按一下 [建立]

    筆記本會在筆記本編輯器中開啟。

  3. 若要使用 Python 或 pandas 來存取共享的數據,請安裝 差異共用 Python 連接器。 在筆記本編輯器中,貼上下列命令:

    %sh pip install delta-sharing
    
  4. 執行資料格。

    如果尚未安裝 Python 連結 delta-sharing 庫,則會安裝在叢集中。

  5. 在新的數據格中,貼上下列命令,將認證檔案的內容上傳至 DBFS 中的資料夾。 取代變數,如下所示:

    • <dbfs-path>:您要儲存認證檔案的資料夾路徑

    • <credential-file-contents>:認證檔案的內容。 這不是檔案的路徑,而是檔案複製的內容。

      認證檔案包含 JSON,定義三個字段: shareCredentialsVersionendpointbearerToken

      %scala
      dbutils.fs.put("<dbfs-path>/config.share","""
      <credential-file-contents>
      """)
      
  6. 執行資料格。

    上傳認證檔案之後,您可以刪除此儲存格。 所有工作區使用者可以從 DBFS 讀取認證檔案,而且認證檔案可在您工作區中的所有叢集和 SQL 倉儲上的 DBFS 中使用。 若要刪除儲存格,請按兩下最右邊單元格動作選單中單元格動作的 x

步驟 2:使用筆記本來列出和讀取共享數據表

在此步驟中,您會列出共用中的數據表,或一組共用數據表和分割區,然後查詢數據表。

  1. 使用 Python,列出共用中的數據表。

    在新的儲存格中,貼上下列命令。 將 取代<dbfs-path>為步驟 1:將認證檔案儲存在 DBFS 中的路徑(Python 指示)。

    當程式代碼執行時,Python 會從叢集上的 DBFS 讀取認證檔案。 存取儲存在 DBFS 中的路徑 /dbfs/數據。

    import delta_sharing
    
    client = delta_sharing.SharingClient(f"/dbfs/<dbfs-path>/config.share")
    
    client.list_all_tables()
    
  2. 執行資料格。

    結果是數據表的陣列,以及每個數據表的元數據。 下列輸出顯示兩個資料表:

    Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
    

    如果輸出是空的,或不包含您預期的數據表,請連絡數據提供者。

  3. 查詢共享數據表。

    • 使用 Scala

      在新的儲存格中,貼上下列命令。 當程式代碼執行時,認證檔案會透過 JVM 從 DBFS 讀取。

      取代變數,如下所示:

      • <profile-path>:認證檔的 DBFS 路徑。 例如: /<dbfs-path>/config.share
      • <share-name>:數據表的 值 share=
      • <schema-name>:數據表的 值 schema=
      • <table-name>:數據表的 值 name=
      %scala
          spark.read.format("deltaSharing")
          .load("<profile-path>#<share-name>.<schema-name>.<table-name>").limit(10);
      

      執行資料格。 每次載入共享數據表時,都會看到來源的全新數據。

    • 使用 SQL

      若要使用 SQL 查詢數據,您可以從共享資料表在工作區中建立本機數據表,然後查詢本機數據表。 共用的數據不會儲存或快取在本機數據表中。 每次查詢本機數據表時,您會看到共享數據的目前狀態。

      在新的儲存格中,貼上下列命令。

      取代變數,如下所示:

      • <local-table-name>:本機數據表的名稱。
      • <profile-path>:認證檔案的位置。
      • <share-name>:數據表的 值 share=
      • <schema-name>:數據表的 值 schema=
      • <table-name>:數據表的 值 name=
      %sql
      DROP TABLE IF EXISTS table_name;
      
      CREATE TABLE <local-table-name> USING deltaSharing LOCATION "<profile-path>#<share-name>.<schema-name>.<table-name>";
      
      SELECT * FROM <local-table-name> LIMIT 10;
      

      當您執行 命令時,會直接查詢共享數據。 作為測試,會查詢數據表,並傳回前 10 個結果。

    如果輸出是空的,或不包含您預期的數據,請連絡數據提供者。

Apache Spark:讀取共享數據

請遵循下列步驟,使用Spark 3.x或更新版本存取共享數據。

這些指示假設您可以存取數據提供者所共用的認證檔案。 請參閱 在開放式共用模型中取得存取權。

安裝 Delta 共用 Python 和 Spark 連接器

若要存取與共享資料相關的元數據,例如與您共用的數據表清單,請執行下列動作。 此範例使用 Python。

  1. 安裝 delta-sharing Python 連接器

    pip install delta-sharing
    
  2. 安裝 Apache Spark 連接器

使用Spark列出共享數據表

列出共享中的數據表。 在下列範例中,將 取代 <profile-path> 為認證檔的位置。

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

結果是數據表的陣列,以及每個數據表的元數據。 下列輸出顯示兩個資料表:

Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]

如果輸出是空的,或不包含您預期的數據表,請連絡數據提供者。

使用Spark存取共享數據

執行下列命令,並取代這些變數:

  • <profile-path>:認證檔案的位置。
  • <share-name>:數據表的 值 share=
  • <schema-name>:數據表的 值 schema=
  • <table-name>:數據表的 值 name=
  • <version-as-of>:選。 要載入數據的數據表版本。 只有當數據提供者共享數據表的歷程記錄時,才能運作。 delta-sharing-spark需要 0.5.0 或更新版本。
  • <timestamp-as-of>:選。 在指定時間戳之前或指定時間戳的版本載入數據。 只有當數據提供者共享數據表的歷程記錄時,才能運作。 delta-sharing-spark需要 0.6.0 或更新版本。

Python

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)

spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)

spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

Scala

執行下列命令,並取代這些變數:

  • <profile-path>:認證檔案的位置。
  • <share-name>:數據表的 值 share=
  • <schema-name>:數據表的 值 schema=
  • <table-name>:數據表的 值 name=
  • <version-as-of>:選。 要載入數據的數據表版本。 只有當數據提供者共享數據表的歷程記錄時,才能運作。 delta-sharing-spark需要 0.5.0 或更新版本。
  • <timestamp-as-of>:選。 在指定時間戳之前或指定時間戳的版本載入數據。 只有當數據提供者共享數據表的歷程記錄時,才能運作。 delta-sharing-spark需要 0.6.0 或更新版本。
spark.read.format("deltaSharing")
.option("versionAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

spark.read.format("deltaSharing")
.option("timestampAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

使用Spark存取共用變更數據摘要

如果數據表歷程記錄已與您共用,而且源數據表上已啟用異動數據摘要(CDF),您可以執行下列命令來存取變更數據摘要,並取代這些變數。 delta-sharing-spark需要 0.5.0 或更新版本。

必須提供一個和一個 start 參數。

  • <profile-path>:認證檔案的位置。
  • <share-name>:數據表的 值 share=
  • <schema-name>:數據表的 值 schema=
  • <table-name>:數據表的 值 name=
  • <starting-version>:選。 包含的查詢起始版本。 指定為 Long。
  • <ending-version>:選。 包含的查詢結束版本。 如果未提供結束版本,API 會使用最新的數據表版本。
  • <starting-timestamp>:選。 查詢的起始時間戳,這會轉換成建立大於或等於此時間戳的版本。 以格式 yyyy-mm-dd hh:mm:ss[.fffffffff]指定為字串。
  • <ending-timestamp>:選。 查詢的結束時間戳,這會轉換成稍早建立或等於此時間戳的版本。 以格式指定為字串 yyyy-mm-dd hh:mm:ss[.fffffffff]

Python

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<ending-version>)

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Scala

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("statingVersion", <starting-version>)
.option("endingVersion", <ending-version>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("startingTimestamp", <starting-timestamp>)
.option("endingTimestamp", <ending-timestamp>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

如果輸出是空的,或不包含您預期的數據,請連絡數據提供者。

使用Spark結構化串流存取共享數據表

如果數據表歷程記錄與您共用,您可以串流讀取共享數據。 delta-sharing-spark需要 0.6.0 或更新版本。

支援的選項:

  • ignoreDeletes:忽略刪除資料的交易。
  • ignoreChanges:如果源數據表中的檔案因數據變更作業而重寫,例如 UPDATE、、 MERGE INTODELETE (在分割區內),或 OVERWRITE,請重新處理更新。 仍可以發出未變更的數據列。 因此,下游取用者應該能夠處理重複專案。 刪除項目不會傳播至下游。 ignoreChanges 包含 ignoreDeletes。 因此,如果您使用 ignoreChanges,您的數據流將不會因刪除或更新源數據表而中斷。
  • startingVersion:要從開始的共享數據表版本。 從這個版本開始的所有數據表變更都會由串流來源讀取。
  • startingTimestamp:要從開始的時間戳。 串流來源將會讀取時間戳或之後認可的所有數據表變更。 範例:"2023-01-01 00:00:00.0"
  • maxFilesPerTrigger:每個微批次中要考慮的新檔案數目。
  • maxBytesPerTrigger:每個微批次中處理的數據量。 此選項會設定「軟最大值」,這表示批次會處理大約這個數據量,而且可能會處理超過限制,以便在最小輸入單位大於此限制的情況下向前移動串流查詢。
  • readChangeFeed:數據流讀取共用數據表的變更數據摘要。

不支援的選項:

  • Trigger.availableNow

範例結構化串流查詢

Scala
spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
Python
spark.readStream.format("deltaSharing")\
.option("startingVersion", 0)\
.option("ignoreDeletes", true)\
.option("maxBytesPerTrigger", 10000)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

請參閱 Azure Databricks 上的串流。

啟用刪除向量或數據行對應的讀取數據表

重要

這項功能處於公開預覽狀態

刪除向量是提供者可以在共用差異數據表上啟用的記憶體優化功能。 請參閱 什麼是刪除向量?

Azure Databricks 也支援 Delta 數據表的數據行對應。 請參閱 使用 Delta Lake 數據行對應來重新命名和卸除數據行。

如果您的提供者共享資料表並啟用刪除向量或數據行對應,您可以使用執行 delta-sharing-spark 3.1 或更新版本的計算來讀取資料表。 如果您使用 Databricks 叢集,您可以使用執行 Databricks Runtime 14.1 或更新版本之叢集來執行批次讀取。 CDF 和串流查詢需要 Databricks Runtime 14.2 或更新版本。

您可以依目前方式執行批次查詢,因為它們可以根據共用數據表的數據表功能自動解析 responseFormat

若要讀取變更資料摘要 (CDF) 或針對已啟用刪除向量或資料行對應的共享資料表執行串流查詢,您必須設定其他選項 responseFormat=delta

下列範例顯示批次、CDF 和串流查詢:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
        .builder()
        .appName("...")
        .master("...")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()

val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

// Batch query
spark.read.format("deltaSharing").load(tablePath)

// CDF query
spark.read.format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("responseFormat", "delta")
  .option("startingVersion", 1)
  .load(tablePath)

// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)

Pandas:讀取共享數據

請遵循下列步驟來存取 pandas 0.25.3 或更新版本中的共享數據。

這些指示假設您可以存取數據提供者所共用的認證檔案。 請參閱 在開放式共用模型中取得存取權。

安裝 Delta Sharing Python 連接器

若要存取與共享資料相關的元數據,例如與您共用的數據表清單,您必須安裝 差異共用 Python 連接器

pip install delta-sharing

使用 pandas 列出共享數據表

若要列出共用中的數據表,請執行下列命令,並將 取代 <profile-path>/config.share 為認證檔案的位置。

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

如果輸出是空的,或不包含您預期的數據表,請連絡數據提供者。

使用 pandas 存取共享數據

若要使用 Python 存取 pandas 中的共享數據,請執行下列命令,並取代變數,如下所示:

  • <profile-path>:認證檔案的位置。
  • <share-name>:數據表的 值 share=
  • <schema-name>:數據表的 值 schema=
  • <table-name>:數據表的 值 name=
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")

使用 pandas 存取共用變更數據摘要

若要使用 Python 存取 pandas 中共用數據表的變更數據摘要,請執行下列命令,並取代變數,如下所示。 變更數據摘要可能無法使用,視數據提供者是否共享數據表的變更數據摘要而定。

  • <starting-version>:選。 包含的查詢起始版本。
  • <ending-version>:選。 包含的查詢結束版本。
  • <starting-timestamp>:選。 查詢的起始時間戳。 這會轉換成建立大於或等於此時間戳的版本。
  • <ending-timestamp>:選。 查詢的結束時間戳。 這會轉換成稍早建立或等於此時間戳的版本。
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<starting-version>)

delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

如果輸出是空的,或不包含您預期的數據,請連絡數據提供者。

Power BI:讀取共享數據

Power BI Delta Sharing 連接器可讓您透過 Delta Sharing 開啟通訊協定探索、分析及可視化與您共用的數據集。

需求

  • Power BI Desktop 2.99.621.0 或更新版本。
  • 存取數據提供者所共享的認證檔案。 請參閱 在開放式共用模型中取得存取權。

連線 至 Databricks

若要使用 Delta Sharing 連接器連線到 Azure Databricks,請執行下列動作:

  1. 使用文本編輯器開啟共用認證檔案,以擷取端點 URL 和令牌。
  2. 開啟 Power BI Desktop。
  3. 在 [ 取得數據] 功能表上,搜尋 差異共用
  4. 選取連接器,然後按兩下 [連線]。
  5. 輸入您從認證檔案複製到 [ 差異共用伺服器 URL] 字段的端點 URL
  6. 或者,在 [ 進階選項] 索引標籤中,為您可以下載的數據列數目上限設定數據 列限制 。 根據預設,這會設定為1百萬個數據列。
  7. 按一下 [確定]
  8. 針對 [驗證],將您從認證檔案 擷取的令牌複製到持有人令牌
  9. 按一下 連線

Power BI Delta 共用連接器的限制

Power BI Delta Sharing 連線 or 有下列限制:

  • 連接器載入的數據必須符合您電腦的記憶體。 若要確保這一點,連接器會將匯入的數據列數目限制為您 在 Power BI Desktop 的 [進階選項] 索引卷標底下設定的數據列限制

要求新的認證

如果您的認證啟用 URL 或下載的認證遺失、損毀或遭入侵,或您的認證到期,而您的提供者不會傳送新的認證,請連絡您的提供者以要求新的認證。