次の方法で共有


Apache Spark DataFrames を使用した Delta Sharing 共有テーブルの読み取り

この記事では、Apache Spark を使用して、Delta Sharing を使用して共有されたデータに対してクエリを実行する構文の例を示します。 DataFrame 操作のフォーマット オプションとして、deltasharing キーワードを使用します。

共有データに対してクエリを実行するためのその他のオプション

以下の例のように、メタストアに登録されている Delta Sharing カタログ内の共有テーブル名を使用するクエリを作成することもできます。

SQL

SELECT * FROM shared_table_name

Python

spark.read.table("shared_table_name")

Azure Databricks での Delta Sharing の構成と共有テーブル名を使用したデータのクエリ実行の詳細については、「Databricks 間の Delta Sharing を使用して共有されたデータの読み取り (受信者向け)」を参照してください。

構造化ストリーミングを使用すると、共有テーブルのレコードを段階的に処理できます。 構造化ストリーミングを使用するには、テーブルの履歴共有を有効にする必要があります。 「ALTER SHARE」を参照してください。 履歴共有には、Databricks Runtime 12.2 LTS 以降が必要です。

共有テーブルでソース Delta テーブルの変更データ フィードが有効で、共有の履歴が有効になっている場合は、構造化ストリーミング操作またはバッチ操作で Delta 共有を読み取る際に変更データ フィードを使用できます。 「Azure Databricks で Delta Lake 変更データ フィードを使用する」を参照してください。

Delta Sharing フォーマット キーワードを使用して読み取る

次の例に示すように、deltasharing キーワードは Apache Spark の DataFrame 読み取り操作でサポートされています。

df = (spark.read
  .format("deltasharing")
  .load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)

Delta Sharing 共有されたテーブルの変更データ フィードを読み取る

履歴が共有され変更データ フィードが有効になっているテーブルの場合は、Apache Spark DataFrame を使用して変更データ フィード レコードを読み取ることができます。 履歴共有には、Databricks Runtime 12.2 LTS 以降が必要です。

df = (spark.read
  .format("deltasharing")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)

構造化ストリーミングを使用して Delta Sharing 共有されたテーブルを読み取る

履歴が共有されているテーブルの場合は、共有テーブルを構造化ストリーミングのソースとして使用できます。 履歴共有には、Databricks Runtime 12.2 LTS 以降が必要です。

streaming_df = (spark.readStream
  .format("deltasharing")
  .load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)

# If CDF is enabled on the source table
streaming_cdf_df = (spark.readStream
  .format("deltasharing")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)