次の方法で共有


Lakeflow Connect の標準コネクタ

Azure Databricks には、Delta Lake に裏打ちされたレイクハウスにデータを取り込むためのさまざまな方法が用意されています。 この記事では、サポートされているインジェスト ツールと、データ ソースや待機時間などの条件に基づいて使用する方法のガイダンスを示します。

インジェストの方法

Databricks にデータを取り込むには、次の方法を使用します。

  • "バッチ インジェスト": データ行セットに対する頻度の低い処理
  • "ストリーミング インジェスト": 個々のデータ行またはデータ行セットが届いた際のリアルタイム処理

取り込まれたデータは、ダウンストリーム データと AI ユース ケース全体で使用できる デルタ テーブルに読み込まれます。 Databricks の Lakehouse アーキテクチャにより、ユース ケース間でデータを複製する必要はありません。また、Unity カタログを利用して、すべてのデータに対する一元的なアクセス制御、監査、系列、データ検出を行うことができます。

バッチ インジェスト

バッチ インジェストでは、多くの場合、スケジュール (毎日など) に基づいて Databricks に行のセット (またはバッチ) としてデータを読み込んだり、手動でトリガーしたりできます。 これは、従来の抽出、変換、読み込み (ETL) ユース ケースの "抽出" 部分を表します。 バッチ インジェストを使用して、次のデータからデータを読み込むことができます。

  • CSV などのローカル ファイル
  • Amazon S3、Azure Data Lake Storage、Google Cloud Storage などのクラウド オブジェクト ストレージ
  • Salesforce などの SaaS アプリケーションと SQL Server などのデータベース

バッチ インジェストでは、CSV、TSV、JSON、XML、Avro、ORC、Parquet、テキスト ファイルなど、さまざまなファイル ソース形式がサポートされています。

Databricks では、従来のバッチ インジェストオプションと増分バッチ インジェスト オプションの両方がサポートされています。 従来のバッチ インジェストでは、実行されるたびにすべてのレコードが処理されますが、増分バッチ インジェストではデータ ソース内の新しいレコードが自動的に検出され、既に取り込まれているレコードは無視されます。 つまり、処理する必要があるデータが少なくなり、インジェスト ジョブの実行速度が速くなり、コンピューティング リソースをより効率的に使用できるようになります。

従来 (1 回限り) のバッチ インジェスト

データ追加 UI を使用して、ローカル データ ファイルをアップロードしたり、パブリック URL からファイルをダウンロードしたりできます。 ファイルをアップロードするを参照してください。

増分バッチインジェスト

このセクションでは、サポートされている増分バッチ インジェスト ツールについて説明します。

ストリーミング テーブル

CREATE STREAMING TABLE SQL コマンドを使用すると、クラウド オブジェクト ストレージからストリーミング テーブルにデータを増分的に読み込むことができます。 CREATE STREAMING TABLEを参照してください。

例: ストリーミング テーブルを使用した増分バッチ インジェスト

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv"
)
クラウド オブジェクト ストレージ コネクタ

組み込みのクラウド オブジェクト ストレージ コネクタである自動ローダーを使用すると、Amazon S3 (S3)、Azure Data Lake Storage Gen 2 (ALDS2)、または Google Cloud Storage (GCS) に到着すると、新しいデータ ファイルを段階的かつ効率的に処理できます。 「自動ローダー」を参照してください。

例: 自動ローダーを使用した増分バッチインジェスト

df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data")
  .schema("/databricks-datasets/retail-org/customers/schema")
  .load("/databricks-datasets/retail-org/customers/")
フルマネージド コネクタ

Lakeflow Connect には、SaaS アプリケーションとデータベースから取り込むフル マネージド コネクタが用意されています。 マネージド コネクタは、次を使用して使用できます。

  • Databricks ユーザーインターフェース
  • Databricks コマンドラインインターフェース (CLI)
  • Databricks API
  • Databricks SDK
  • Databricks アセット バンドル

Lakeflow Connect のマネージド コネクタに関する記事を参照してください。

ストリーミング インジェスト

ストリーミング インジェストでは、データ行またはデータ行のバッチを生成時に継続的に読み込み、ほぼリアルタイムで受信したデータ行に対してクエリを実行できます。 ストリーミング インジェストを使用して、Apache Kafka、Amazon Kinesis、Google Pub/Sub、Apache Pulsar などのソースからストリーミング データを読み込むことができます。

Databricks では、組み込みのコネクタを使用したストリーミング インジェストもサポートされています。 これらのコネクタを使用すると、ストリーミング ソースから届く新しいデータを段階的かつ効率的に処理できます。 「ストリーミング データ ソースの構成」を参照してください。

例: Kafka からのストリーミング インジェスト

spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "topic1")
    .option("startingOffsets", "latest")
    .load()

Lakeflow 宣言型パイプラインを使用したバッチインジェストとストリーミング インジェスト

Databricks では 、Lakeflow 宣言型パイプライン を使用して、信頼性の高いスケーラブルなデータ処理パイプラインを構築することをお勧めします。 Lakeflow 宣言型パイプラインでは、バッチ インジェストとストリーミング インジェストの両方がサポートされており、自動ローダーでサポートされている任意のデータ ソースからデータを取り込むことができます。

例: Lakeflow の宣言型パイプラインを使った増分バッチインジェスト

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

例: Lakeflow 宣言パイプラインを使用した Kafka からのストリーミング インジェスト

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

インジェスト スケジュール

データは、1 回限り、定期的なスケジュールで、または継続的に取り込むことができます。

  • ほぼリアルタイムのストリーミングのユース ケースでは、連続モードを使用します。
  • バッチ インジェストのユース ケースでは、1 回だけ取り込むか、定期的なスケジュールを設定します。

トリガーされたパイプライン モードと継続的パイプライン モードを参照してください。

インジェスト パートナー

多くのサード パーティ製ツールでは、Databricks へのバッチインジェストまたはストリーミング インジェストがサポートされています。 Databricks ではさまざまなサード パーティの統合が検証されますが、ソース システムへのアクセスを構成し、データを取り込む手順はツールによって異なります。 検証済みツール 一覧については、インジェスト パートナーの を参照してください。 一部のテクノロジ パートナーは、Databricks Partner Connectでも紹介されています。これにより、サードパーティ製ツールを Lakehouse データに簡単に接続できる UI が提供されます。

DIY インジェスト

Databricks には、一般的なコンピューティング プラットフォームが用意されています。 その結果、Python や Java など、Databricks でサポートされている任意のプログラミング言語を使用して、独自のインジェスト コネクタを作成できます。 データ読み込みツール、Airbyte、Debezium などの一般的なオープン ソース コネクタ ライブラリをインポートして利用することもできます。

摂取の代替手段

Databricks では、大量のデータ、待機時間の短いクエリ、サードパーティ製の API の制限に対応するようにスケーリングされるため、ほとんどのユース ケースでインジェストをお勧めします。 インジェストでは、ソース システムから Azure Databricks にデータがコピーされるため、重複するデータが時間の経過と同時に古くなる可能性があります。 データをコピーしない場合は、次のツールを使用できます。

  • Lakehouse Federation を使用すると、データを移動せずに外部データ ソースに対してクエリを実行できます。
  • Delta Sharing を使用すると、プラットフォーム、クラウド、リージョン間でデータを安全に共有できます。

ただし、データをコピーしない場合は、Lakehouse Federation または Delta Sharing を使用します。

Delta Sharingを使用するタイミング

次のシナリオでは、Delta Sharing を選択します。

  • データ重複の制限
  • 可能な限り新しいデータのクエリを実行する

Lakehouse Federation を使用する場合

次のシナリオでは、Lakehouse Federation を選択します。

  • ETL パイプラインでのアドホック レポートまたは概念実証作業

インジェスト方法を選択するときの考慮事項

考慮事項 指導
データ ソース データ ソースに Lakeflow Connect ネイティブ コネクタが存在する場合、これはデータを取り込む最も簡単な方法です。 Lakeflow Connect でサポートされていないデータ ソースの場合は、ソースからデータを抽出し、自動ローダーを使用して Databricks にデータを取り込みます。 ローカル ファイルの場合は、Databricks UI を使用してデータをアップロードします。
遅延 ほぼリアルタイムでデータを分析する場合は、ストリーミングを使用して増分処理を利用します。 ストリーミングでは、各レコードが到着するとすぐにデータのクエリを実行できます。 それ以外の場合は、バッチ インジェストを使用します。
データの移動 ソース システムから Databricks にデータをコピーできない場合は、Lakehouse Federation または Delta Sharing を使用します。

Delta Lake にデータを移行する

既存のデータを Delta Lake に移行する方法については、「データを Delta Lake に移行する」を参照してください。

COPY INTO (レガシ)

CREATE STREAMING TABLE SQL コマンドは、クラウド オブジェクト ストレージからの増分インジェスト用の従来の COPY INTO SQL コマンドの代わりに推奨されます。 COPY INTOを参照してください。 よりスケーラブルで堅牢なファイル インジェスト エクスペリエンスを実現するために、Databricks では、SQL ユーザーが COPY INTOではなくストリーミング テーブルを利用することをお勧めします。