Delta Sharing オープン共有 (受信者用) を使用して共有されたデータを読み取る

この記事では、Delta Sharing "オープン共有" プロトコルを使用した共有データを読み取る方法について説明します。 オープン共有では、データ プロバイダーによってチームのメンバーと共有された資格情報ファイルを使用して、共有データへの安全な読み取りアクセスを取得します。 資格情報が有効であり、プロバイダーがデータを共有し続ける限り、アクセス権は保持されます。 プロバイダーは資格情報の有効期限とローテーションを管理します。 データの更新は、ほぼリアルタイムで利用できます。 共有データの読み取りとコピーはできますが、ソースデータを変更することはできません。

Note

データが Databricks 間 Delta Sharing を使用して共有されている場合、データにアクセスするために資格情報ファイルは必要なく、この記事は適用されません。 手順については、「Databricks 間 Delta Sharing (受信者用) を使用して共有されたデータの読み取り」を参照してください。

以下のセクションでは、Azure Databricks、Apache Spark、pandas、Power BI で資格情報ファイルを使用して、共有データにアクセスして読み取る方法について説明します。 Delta Sharing コネクタの完全なリストと使用方法に関する情報については、Delta Sharing オープンソースのドキュメントを参照してください。 共有データへのアクセスで問題が発生した場合は、データプロバイダーにお問い合わせください。

注意

別に記載がない限り、パートナー統合はサード パーティから提供されており、その製品およびサービスの利用には、適切なプロバイダーのアカウントを持っている必要があります。 Databricks ではこのコンテンツを最新の状態に保つように努めていますが、パートナー統合ページ上の統合やコンテンツの正確性については Microsoft が表明するものではありません。 統合に関しては適切なプロバイダーに連絡してください。

開始する前に

チームのメンバーは、データ プロバイダーによって共有されている資格情報ファイルをダウンロードする必要があります。 「オープン共有モデルでアクセスを取得する」を参照してください。

そのファイルまたはファイルの場所を共有するには、セキュリティで保護されたチャネルを使用する必要があります。

Azure Databricks: オープン共有コネクタを使用して共有データを読み取る

このセクションでは、オープン共有コネクタを使用して、Azure Databricks ワークスペースのノートブックを使用して共有データにアクセスする方法について説明します。 ユーザーまたはそのチームの別のメンバーが資格情報ファイルを DBFS に格納したら、それを使用してデータ プロバイダーの Azure Databricks アカウントに対する認証を行い、データ プロバイダーが共有しているデータを読み取ります。

Note

データ プロバイダーが Databricks 間共有を使用していて、資格情報ファイルを共有していない場合は、Unity Catalog を使用してデータにアクセスする必要があります。 手順については、「Databricks 間 Delta Sharing (受信者用) を使用して共有されたデータの読み取り」を参照してください。

この例では、独立して実行できる複数のセルを含むノートブックを作成します。 代わりに、同じセルにノートブック コマンドを追加して、順番に実行することもできます。

手順 1: 資格情報ファイルを DBFS に格納する (Python の手順)

この手順では、Azure Databricks の Python ノートブックを使用して資格情報ファイルを格納し、チームのユーザーが共有データにアクセスできるようにします。

自分またはチームの誰かが既に資格情報ファイルを DBFS に保存している場合は、次の手順に進みます。

  1. テキスト エディターで、資格情報ファイルを開きます。

  2. Azure Databricks ワークスペースで、[新規] > [ノートブック] をクリックします。

    • 名前を入力します。
    • ノートブックの既定の言語を Python に設定します。
    • ノートブックにアタッチするクラスターを選択します。
    • Create をクリックしてください。

    ノートブックエディターでノートブックが開きます。

  3. Python または Pandas を使用して共有データにアクセスするには、 Delta Sharing Python コネクタをインストールします。 ノートブックエディターで、次のコマンドを貼り付けます。

    %sh pip install delta-sharing
    
  4. セルを実行します。

    delta-sharingPython ライブラリがまだインストールされていない場合は、クラスターにインストールします。

  5. 新しいセルに次のコマンドを貼り付けます。このコマンドは、資格情報ファイルの内容を DBFS 内のフォルダーにアップロードします。 変数を次のように置き換えます。

    • <dbfs-path>: パスを資格情報ファイルを保存したいフォルダーに

    • <credential-file-contents>: 資格情報ファイルの内容 これは、ファイルへのパスではなく、ファイルのコピーされた内容です。

      資格情報ファイルには、shareCredentialsVersionendpointbearerTokenの 3 つのフィールドを定義する JSON が含まれています。

      %scala
      dbutils.fs.put("<dbfs-path>/config.share","""
      <credential-file-contents>
      """)
      
  6. セルを実行します。

    資格情報ファイルがアップロードされたら、このセルを削除できます。 すべてのワークスペース ユーザーは DBFS から資格情報ファイルを読み取ることができ、資格情報ファイルは、すべてのクラスター上の DBFS とワークスペース内の SQL ウェアハウスで使用できます。 セルを削除するには、右端のセル操作セル操作メニューで、xをクリックします。

手順 2: ノートブックを使用して共有テーブルの一覧表示と読み取りを行う

この手順では、"共有" 内のテーブル、または共有テーブルとパーティションのセットを一覧表示し、テーブルに対してクエリを実行します。

  1. Python を使用して、共有内のテーブルを一覧表示します。

    新しいセルに次のコマンドを貼り付けます。 <dbfs-path> を、「手順 1: 資格情報ファイルを DBFS に格納する (Python の手順)」で作成したパスに置き換えます。

    コードを実行すると、Python はクラスターの DBFS から資格情報ファイルを読み取ります。 パス /dbfs/ で DBFS に格納されているデータにアクセスします。

    import delta_sharing
    
    client = delta_sharing.SharingClient(f"/dbfs/<dbfs-path>/config.share")
    
    client.list_all_tables()
    
  2. セルを実行します。

    結果は、テーブルの配列と各テーブルのメタデータとなります。 次の出力は、2つのテーブルを表示します。

    Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
    

    出力が空か、期待されるテーブルが含まれていない場合は、データプロバイダーにお問い合わせください。

  3. 共有テーブルに対してクエリを実行します。

    • Scala の使用:

      新しいセルに次のコマンドを貼り付けます。 コードを実行すると、資格情報ファイルが JVM を介して DBFS から読み取られます。

      変数を次のように置き換えます。

      • <profile-path>: 資格情報ファイルの DBFS パス。 たとえば、「 /<dbfs-path>/config.share 」のように入力します。
      • <share-name>: テーブルの share= の値。
      • <schema-name>: テーブルの schema= の値。
      • <table-name>: テーブルの name= の値。
      %scala
          spark.read.format("deltaSharing")
          .load("<profile-path>#<share-name>.<schema-name>.<table-name>").limit(10);
      

      セルを実行します。 共有テーブルを読み込むたびに、ソースからの新しいデータが表示されます。

    • SQL の使用:

      SQL を使用して共有データのクエリを実行するには、共有テーブルからワークスペースにローカル テーブルを作成した後、そのローカル テーブルにクエリを実行します。 共有データは、ローカルテーブルに保存またはキャッシュされません。 ローカルテーブルに対してクエリを実行するたびに、共有データの現在の状態が表示されます。

      新しいセルに次のコマンドを貼り付けます。

      変数を次のように置き換えます。

      • <local-table-name>: ローカルテーブルの名前。
      • <profile-path>: 資格情報ファイルの場所。
      • <share-name>: テーブルの share= の値。
      • <schema-name>: テーブルの schema= の値。
      • <table-name>: テーブルの name= の値。
      %sql
      DROP TABLE IF EXISTS table_name;
      
      CREATE TABLE <local-table-name> USING deltaSharing LOCATION "<profile-path>#<share-name>.<schema-name>.<table-name>";
      
      SELECT * FROM <local-table-name> LIMIT 10;
      

      コマンドを実行すると、共有データに直接クエリが実行されます。 テストとして、テーブルに対してクエリが実行され、最初の 10 件の結果が返されます。

    出力が空か、期待されるデータが含まれていない場合は、データプロバイダーにお問い合わせください。

Apache Spark: 共有データを読み取る

Apache Spark 3.x 以上を使用して共有データにアクセスするには、次の手順に従います。

以下の手順では、データ プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。

Delta Sharing Python コネクタと Spark コネクタをインストールする

共有されたテーブルの一覧など、共有データに関連するメタデータにアクセスするには、次の手順を行います。 この例では Python を使用します。

  1. Delta Sharing Python コネクタをインストールします。

    pip install delta-sharing
    
  2. Apache Spark コネクタをインストールします。

Spark を使用して共有テーブルを一覧表示する

共有内のテーブルを一覧表示します。 次の例では、 <profile-path>を資格情報ファイルの場所に置き換えます。

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

結果は、テーブルの配列と各テーブルのメタデータとなります。 次の出力は、2つのテーブルを表示します。

Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]

出力が空か、期待されるテーブルが含まれていない場合は、データプロバイダーにお問い合わせください。

Spark を使用して共有データにアクセスする

次の変数を置き換えて、次のコマンドを実行します。

  • <profile-path>: 資格情報ファイルの場所。
  • <share-name>: テーブルの share= の値。
  • <schema-name>: テーブルの schema= の値。
  • <table-name>: テーブルの name= の値。
  • <version-as-of>: 省略可能。 データを読み込むテーブルのバージョン。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 delta-sharing-spark 0.5.0 以上が必要です。
  • <timestamp-as-of>: 省略可能。 特定のタイムスタンプまたはそれより前のバージョンでデータを読み込みます。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 delta-sharing-spark 0.6.0 以上が必要です。

Python

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)

spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)

spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

Scala

次の変数を置き換えて、次のコマンドを実行します。

  • <profile-path>: 資格情報ファイルの場所。
  • <share-name>: テーブルの share= の値。
  • <schema-name>: テーブルの schema= の値。
  • <table-name>: テーブルの name= の値。
  • <version-as-of>: 省略可能。 データを読み込むテーブルのバージョン。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 delta-sharing-spark 0.5.0 以上が必要です。
  • <timestamp-as-of>: 省略可能。 特定のタイムスタンプまたはそれより前のバージョンでデータを読み込みます。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 delta-sharing-spark 0.6.0 以上が必要です。
spark.read.format("deltaSharing")
.option("versionAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

spark.read.format("deltaSharing")
.option("timestampAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

Spark を使用して共有変更データ フィードにアクセスする

テーブル履歴が自分と共有されていて、ソース テーブルで変更データ フィード (CDF) が有効になっている場合は、次を実行して変更データ フィードにアクセスできます (これらの変数を置き換えます)。 delta-sharing-spark 0.5.0 以上が必要です。

開始パラメーターは 1 つだけ指定する必要があります。

  • <profile-path>: 資格情報ファイルの場所。
  • <share-name>: テーブルの share= の値。
  • <schema-name>: テーブルの schema= の値。
  • <table-name>: テーブルの name= の値。
  • <starting-version>: 省略可能。 クエリの開始バージョン (これを含む)。 Long として指定します。
  • <ending-version>: 省略可能。 クエリの終了バージョン (これを含む)。 終了バージョンが指定されていない場合、API では最新のテーブル バージョンが使用されます。
  • <starting-timestamp>: 省略可能。 クエリの開始タイムスタンプ。このタイムスタンプ以降に作成されたバージョンに変換されます。 yyyy-mm-dd hh:mm:ss[.fffffffff] 形式の文字列として指定します。
  • <ending-timestamp>: 省略可能。 クエリの終了タイムスタンプ。このタイムスタンプ以前に作成されたバージョンに変換されます。 yyyy-mm-dd hh:mm:ss[.fffffffff] 形式の文字列として指定します

Python

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<ending-version>)

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Scala

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("statingVersion", <starting-version>)
.option("endingVersion", <ending-version>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("startingTimestamp", <starting-timestamp>)
.option("endingTimestamp", <ending-timestamp>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

出力が空か、期待されるデータが含まれていない場合は、データプロバイダーにお問い合わせください。

Spark 構造化ストリームを使用して共有テーブルにアクセスする

テーブル履歴が共有されている場合は、共有データの読み取りをストリーミングできます。 delta-sharing-spark 0.6.0 以上が必要です。

サポートされているオプション:

  • ignoreDeletes: データを削除するトランザクションを無視します。
  • ignoreChanges: UPDATEMERGE INTODELETE (パーティション内)、OVERWRITE などのデータ変更操作のためにファイルがソース テーブルで書き換えられた場合は、更新を再処理します。 変更されていない行は、引き続き出力できます。 そのため、ダウンストリームのコンシューマーが重複を処理できる必要があります。 削除はダウンストリームには反映されません。 ignoreChangesignoreDeletes を含みます。 そのため、ignoreChanges を使用するろ、ソース テーブルに対する削除または更新によってストリームが中断されることはありません。
  • startingVersion: 対象となる最初の共有テーブルのバージョン。 このバージョン (を含む) 以降のすべてのテーブル変更は、ストリーミング ソースによって読み取りされます。
  • startingTimestamp: 対象となる最初のタイムスタンプ。 このタイムスタンプ (を含む) 以降にコミットされたすべてのテーブル変更は、ストリーミング ソースによって読み取りされます。 例: "2023-01-01 00:00:00.0".
  • maxFilesPerTrigger: すべてのマイクロバッチで考慮される新しいファイルの数。
  • maxBytesPerTrigger: 各マイクロバッチで処理されるデータの量。 このオプションにより "ソフト最大値" が設定されます。これは、最小の入力単位がこの制限を超える場合にストリーミング クエリを進めるために、バッチでほぼこの量のデータが処理され、制限を超える処理が行われる可能性があることを意味します。
  • readChangeFeed: 共有テーブルの変更データ フィードをストリームで読み取ります。

サポートされていないオプション:

  • Trigger.availableNow

サンプルの構造化ストリーミング クエリ

Scala
spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
Python
spark.readStream.format("deltaSharing")\
.option("startingVersion", 0)\
.option("ignoreDeletes", true)\
.option("maxBytesPerTrigger", 10000)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Azure Databricks でのストリーミング」も参照してください。

削除ベクターまたは列マッピングが有効になっているテーブルの読み取り

重要

この機能はパブリック プレビュー段階にあります。

削除ベクトルは、プロバイダーが共有 Delta テーブルで有効にできるストレージ最適化機能です。 「削除ベクトルとは」を参照してください。

Azure Databricks では、Delta テーブルの列マッピングもサポートされています。 「Delta Lake の列マッピングを使用して列の名前変更と削除を行う」をご覧ください。

削除ベクトルまたは列マッピングが有効になっているテーブルをプロバイダーが共有している場合は、delta-sharing-spark 3.1 以降を実行しているコンピューティングを使用してそのテーブルを読み取ることができます。 Databricks クラスターを使用している場合は、Databricks Runtime 14.1 以降を実行しているクラスターを使用してバッチ読み取りを実行できます。 CDF とストリーミングの各クエリには、Databricks Runtime 14.2 以降が必要です。

バッチ クエリは、共有テーブルのテーブル機能に基づいて responseFormat を自動的に解決できるため、そのまま実行できます。

変更データ フィード (CDF) を読み取ったり、削除ベクトルまたは列マッピングが有効になっている共有テーブルに対してストリーミング クエリを実行したりするには、追加のオプション responseFormat=delta を設定する必要があります。

次の例は、バッチ、CDF、ストリーミングの各クエリを示しています。

import org.apache.spark.sql.SparkSession

val spark = SparkSession
        .builder()
        .appName("...")
        .master("...")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()

val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

// Batch query
spark.read.format("deltaSharing").load(tablePath)

// CDF query
spark.read.format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("responseFormat", "delta")
  .option("startingVersion", 1)
  .load(tablePath)

// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)

Pandas: 共有データを読み取る

Pandas 0.25.3 以上で共有データにアクセスするには、次の手順に従います。

以下の手順では、データ プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。

Delta Sharing Python コネクタをインストールする

共有されたテーブルの一覧など、共有データに関連するメタデータにアクセスするには、Delta Sharing Python コネクタをインストールします。

pip install delta-sharing

pandas を使用して共有テーブルを一覧表示する

共有内のテーブルを一覧表示するには、<profile-path>/config.share を資格情報ファイルの場所に置き換えて、次を実行します。

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

出力が空か、期待されるテーブルが含まれていない場合は、データプロバイダーにお問い合わせください。

pandas を使用して共有データにアクセスする

Python を使用して pandas で共有データにアクセスするには、変数を次のように置き換えて、次を実行します。

  • <profile-path>: 資格情報ファイルの場所。
  • <share-name>: テーブルの share= の値。
  • <schema-name>: テーブルの schema= の値。
  • <table-name>: テーブルの name= の値。
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")

pandas を使用して共有変更データ フィードにアクセスする

Python を使用して pandas で共有テーブルの変更データ フィードにアクセスするには、変数を次のように置き換えて、次を実行します。 データ プロバイダーがテーブルの変更データ フィードを共有しているかどうかによって、変更データ フィードを使用できない場合があります。

  • <starting-version>: 省略可能。 クエリの開始バージョン (これを含む)。
  • <ending-version>: 省略可能。 クエリの終了バージョン (これを含む)。
  • <starting-timestamp>: 省略可能。 クエリの開始タイムスタンプ。 このタイムスタンプ以降に作成されたバージョンに変換されます。
  • <ending-timestamp>: 省略可能。 クエリの終了タイムスタンプ。 このタイムスタンプ以前に作成されたバージョンに変換されます。
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<starting-version>)

delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

出力が空か、期待されるデータが含まれていない場合は、データプロバイダーにお問い合わせください。

Power BI: 共有データを読み取る

Power BI Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを使用して、共有されているデータセットを検出、分析、視覚化できます。

必要条件

Databricks に接続する

Delta Sharing コネクタを使用して Azure Databricks に接続するには、次の手順を行います。

  1. テキスト エディターで資格情報ファイルを開き、エンドポイント URL とトークンを取得します。
  2. Power BI Desktop を開きます。
  3. [データを取得する] メニューで、[差分共有] を検索します。
  4. コネクタを選び、[接続] をクリックします。
  5. [差分共有サーバーの URL] フィールドに、資格情報ファイルから取得したエンドポイント URL を入力します。
  6. 必要に応じて、[詳細設定オプション] タブで、ダウンロードできる行の最大数に対して [行数の制限] を設定できます。 既定では、これは 100 万行に設定されています。
  7. [OK] をクリックします。
  8. [認証] では、資格情報ファイルから取得したトークンをベアラー トークンにコピーします。
  9. [接続] をクリックします。

Power BI Delta Sharing コネクタの制限事項

Power BI Delta Sharing コネクタには、次の制限があります。

  • コネクタによって読み込まれるデータは、マシンのメモリに収まる必要があります。 これを保証するために、コネクタでは、インポートされる行数を、Power BI Desktop の [詳細設定オプション] タブで設定した [行の制限] に制限します。

新しい資格情報を要求する

資格情報のアクティブ化 URL やダウンロードされた資格情報が失われるか、破損するか、侵害された場合、または資格情報の有効期限が切れてプロバイダーから新しい資格情報が送信されていない場合は、プロバイダーに連絡して新しい資格情報を要求してください。