Databricks SQL でストリーミング テーブルを使用してデータを読み込む

重要

この機能はパブリック プレビュー段階にあります。 アクセスにサインアップするには、このフォームに入力します

Databricks では、Databricks SQL を使用してデータを取り込むために、ストリーミング テーブルを使用することをお勧めします。 ストリーミング テーブルは、ストリーミングまたは増分データ処理を追加でサポートする Unity Catalog マネージド テーブルです。 DLT パイプラインは、ストリーミング テーブルごとに自動的に作成されます。 ストリーミング テーブルを使用して、Kafka とクラウド オブジェクト ストレージから増分データ読み込みを行うことができます。

この記事では、Unity Catalog ボリューム (推奨) または外部の場所として構成されたクラウド オブジェクト ストレージからデータを読み込むために、ストリーミング テーブルを使用することについて説明します。

Note

Delta Lake テーブルをストリーミング ソースおよびシンクとして使用する方法については、「Delta テーブルのストリーミング読み取りと書き込み」を参照してください。

始める前に

作業を開始する前に、次の準備ができていることを確認します。

  • サーバーレスが有効になっている Azure Databricks アカウント。 詳細については、「サーバーレス SQL ウェアハウスを有効にする」を参照してください。

  • Unity Catalog が有効になっているワークスペース。 詳細については、「Unity Catalog を設定および管理する」を参照してください。

  • Current チャネルを使用する SQL ウェアハウス。

  • Delta Live Tables パイプラインによって作成されるストリーミング テーブルに対してクエリを実行するには、Databricks Runtime 13.3 LTS 以降または SQL ウェアハウスを使って共有コンピューティングを使用する必要があります。 Unity Catalog 対応パイプラインで作成されたストリーミング テーブルに対しては、割り当てられたクラスターおよび分離なしクラスターからクエリを実行できません。

  • Unity Catalog の外部の場所での READ FILES 特権。 情報については、「クラウド ストレージを Azure Databricks に接続するための外部の場所を作成する」を参照してください。

  • ストリーミング テーブルを作成するカタログでの USE CATALOG 特権。

  • ストリーミング テーブルを作成するスキーマでの USE SCHEMA 特権。

  • ストリーミング テーブルを作成するスキーマでの CREATE TABLE 特権。

  • ソース データへのパス。

    ボリューム パスの例: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    外部の場所のパスの例: abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    Note

    この記事では、読み込むデータが、Unity Catalog ボリュームまたはアクセスできる外部の場所に対応するクラウド ストレージの場所にあることを前提としています。

ソース データの検出とプレビュー

  1. ワークスペースのサイドバーで、[クエリ] をクリックし、[クエリの作成] をクリックします。

  2. クエリ エディターで、ドロップダウン リストから Current チャネルを使用する SQL ウェアハウスを選択します。

  3. 次のコマンドをエディターに貼り付け、ソース データを識別する情報を山かっこ (<>) に置き換え、[実行] をクリックします。

    Note

    関数の既定値でデータを解析できない場合は、read_files テーブル値関数の実行時にスキーマ推論エラーが発生する可能性があります。 たとえば、複数行の CSV ファイルまたは JSON ファイルに対して複数行モードを構成する必要がある場合があります。 パーサー オプションの一覧については、「read_files テーブル値関数」を参照してください。

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

ストリーミング テーブルにデータを読み込む

クラウド オブジェクト ストレージのデータからストリーミング テーブルを作成するには、クエリ エディターに次のコマンドを貼り付けて、[実行] をクリックします。

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

DLT パイプラインを使用してストリーミング テーブルを更新する

このセクションでは、クエリで定義されているソースで利用できる最新データを使用してストリーミング テーブルを更新するパターンについて説明します。

ストリーミング テーブルの CREATE 操作では、ストリーミング テーブルへのデータの初期の作成と読み込みに Databricks SQL ウェアハウスが使用されます。 ストリーミング テーブルの REFRESH 操作では、Delta Live Tables (DLT) が使用されます。 DLT パイプラインは、ストリーミング テーブルごとに自動的に作成されます。 ストリーミング テーブルが更新されると、更新を処理するために DLT パイプラインの更新が開始されます。

REFRESH コマンドを実行すると、DLT パイプライン リンクが返されます。 DLT パイプライン リンクを使用して、更新の状態を確認できます。

Note

ストリーミング テーブルを更新して最新データを取得できるのは、テーブル所有者だけです。 テーブルを作成するユーザーが所有者であり、所有者は変更できません。

Delta Live Tables とは」を参照してください。

新しいデータのみを取り込む

既定では、read_files 関数はテーブルの作成時にソース ディレクトリのすべての既存のデータを読み取り、更新ごとに新しく生成されたレコードを処理します。

テーブル作成時にソース ディレクトリに既に存在するデータが取り込まれるのを回避するには、includeExistingFiles オプションを false に設定します。 つまり、テーブルの作成後にディレクトリに生成されたデータのみが処理されます。 次に例を示します。

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

ストリーミング テーブルを完全に更新する

完全更新では、最新の定義を使用して、ソースで使用可能なすべてのデータが再処理されます。 完全更新では既存のデータが切り詰められるため、データの履歴全体を保持しないソースや、Kafka など、保持期間が短いソースの場合、完全更新の呼び出しは推奨されません。 ソースでデータが使用できなくなった場合、古いデータを回復できないことがあります。

次に例を示します。

REFRESH STREAMING TABLE my_bronze_table FULL

自動更新するストリーミング テーブルをスケジュールする

定義されたスケジュールに基づいて自動的に更新されるようにストリーミング テーブルを構成するには、クエリ エディターに次のコマンドを貼り付けて、[実行] をクリックします。

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

更新スケジュール クエリの例については、「代替ストリーミング テーブル」を参照してください。

更新の状態を追跡する

Delta Live Tables UI でストリーミング テーブルを管理するパイプラインを表示するか、ストリーミング テーブルの DESCRIBE EXTENDED コマンドによって返される更新情報を表示することで、ストリーミング テーブルの更新の状態を表示できます。

DESCRIBE EXTENDED <table-name>

Kafka からのストリーミング インジェスト

Kafka からのストリーミング インジェストの例については、「read_kafka」を参照してください。

ストリーミング テーブルへのアクセス権をユーザーに付与する

ストリーミング テーブルに対する SELECT 権限をユーザーに付与してクエリを実行できるようにするには、次のコマンドをクエリ エディターに貼り付けて、[実行] をクリックします。

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Unity Catalog のセキュリティ保護可能なオブジェクトの付与に関する詳細については、「Unity カタログの特権とセキュリティ保護可能なオブジェクト」を参照してください。

制限事項

  • Databricks SQL ストリーミング テーブルは、米国中南部と米国西部 2 のリージョンではサポートされていません。

その他のリソース