このページでは、Salesforce からデータを取り込み、Lakeflow Connect を使用して Azure Databricks に読み込む方法について説明します。
開始する前に
インジェスト パイプラインを作成するには、以下の要件を満たす必要があります。
ワークスペースは、Unity Catalog に対して有効にする必要があります。
ワークスペースに対してサーバーレス コンピューティングを有効にする必要があります。 「サーバーレス コンピューティングを有効にする」をご覧ください。
新しい接続を作成する場合: メタストアに対する
CREATE CONNECTION
特権が必要です。コネクタで UI ベースのパイプライン作成がサポートされている場合は、このページの手順を完了することで、接続とパイプラインを同時に作成できます。 ただし、API ベースのパイプライン作成を使用する場合は、このページの手順を完了する前に、カタログ エクスプローラーで接続を作成する必要があります。 「マネージド インジェスト ソースへの接続」を参照してください。
既存の接続を使用する場合: 接続オブジェクトに対する
USE CONNECTION
特権またはALL PRIVILEGES
が必要です。ターゲット カタログに対する
USE CATALOG
特権が必要です。既存のスキーマに対する
USE SCHEMA
権限とCREATE TABLE
権限、またはターゲット カタログに対するCREATE SCHEMA
権限が必要です。
Salesforce から取り込むには、次の方法をお勧めします。
- Databricks がデータの取得に使用できる Salesforce ユーザーを作成します。 そのユーザーには、API アクセス権と、取り込む予定のすべてのオブジェクトに対するアクセス権を持たせます。
インジェスト パイプラインを作成する
接続に対して必要なアクセス許可:USE CONNECTION
または ALL PRIVILEGES
。
このステップでは、インジェスト パイプラインを作成する方法について説明します。 取り込まれた各テーブルは、同じ名前 (すべて小文字) でストリーミング テーブルに書き込まれます。
Databricks ユーザーインターフェース
Azure Databricks ワークスペースのサイドバーで、[ Data Ingestion をクリックします。
[ データの追加 ] ページの [ Databricks コネクタで、[ Salesforce] をクリックします。
Salesforce インジェスト ウィザードが開きます。
ウィザードの Pipeline ページで、インジェスト パイプラインの一意の名前を入力します。
[ 宛先カタログ ] ドロップダウンで、カタログを選択します。 取り込まれたデータとイベント ログがこのカタログに書き込まれます。
Salesforce データへのアクセスに必要な資格情報を格納する Unity カタログ接続を選択します。
Salesforce 接続がない場合は、[接続の 作成] をクリックします。 メタストアに対する
CREATE CONNECTION
特権が必要です。[パイプラインの作成] をクリックして続行します。
[ ソース ] ページで、取り込むテーブルを選択し、[ 次へ] をクリックします。
[すべてのテーブル] を選択すると、Salesforce インジェスト コネクタによって、ソース スキーマ内のすべての既存および将来のテーブルが宛先スキーマに書き込まれます。 パイプラインごとに最大 250 個のオブジェクトがあります。
[ 宛先 ] ページで、書き込む Unity カタログ カタログとスキーマを選択します。
既存のスキーマを使用しない場合は、[ スキーマの作成] をクリックします。 親カタログに対する
USE CATALOG
およびCREATE SCHEMA
特権が必要です。[パイプラインの保存] をクリックして続行。
(省略可能)[ 設定] ページで、[ スケジュールの作成] をクリックします。 変換先テーブルを更新する頻度を設定します。
(省略可能)パイプライン操作の成功または失敗に関する電子メール通知を設定します。
[ 保存してパイプラインを実行] をクリックします。
Databricks アセット バンドル
このタブでは、Databricks アセット バンドルを使用してインジェスト パイプラインをデプロイする方法について説明します。 バンドルには、ジョブとタスクの YAML 定義を含め、Databricks CLI を使用して管理できます。また、さまざまなターゲット ワークスペース (開発、ステージング、運用など) で共有および実行できます。 詳しくは、「Databricks アセット バンドル」をご参照ください。
パイプライン定義で次のテーブル構成プロパティを使用して、取り込む特定の列を選択または選択解除できます。
include_columns
: 必要に応じて、インジェストに含める列の一覧を指定します。 このオプションを使用して列を明示的に含める場合、パイプラインは将来ソースに追加される列を自動的に除外します。 今後の列を取り込むには、それらを一覧に追加する必要があります。exclude_columns
: 必要に応じて、インジェストから除外する列の一覧を指定します。 このオプションを使用して列を明示的に除外する場合、パイプラインには、将来ソースに追加される列が自動的に含まれます。 今後の列を取り込むには、それらを一覧に追加する必要があります。
Databricks CLI を使用して新しいバンドルを作成します:
databricks bundle init
バンドルに 2 つの新しいリソース ファイルを追加します:
- パイプライン定義ファイル (
resources/sfdc_pipeline.yml
)。 - データ インジェストの頻度を制御するワークフロー ファイル (
resources/sfdc_job.yml
)。
resources/sfdc_pipeline.yml
ファイルの例を以下に示します。variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for sfdc_dab resources: pipelines: pipeline_sfdc: name: salesforce_pipeline catalog: ${var.dest_catalog} schema: ${var.dest_schema} ingestion_definition: connection_name: <salesforce-connection> objects: # An array of objects to ingest from Salesforce. This example # ingests the AccountShare, AccountPartner, and ApexPage objects. - table: source_schema: objects source_table: AccountShare destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} table_configuration: include_columns: # This can be exclude_columns instead - <column_a> - <column_b> - <column_c> - table: source_schema: objects source_table: AccountPartner destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: ApexPage destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema}
resources/sfdc_job.yml
ファイルの例を以下に示します。resources: jobs: sfdc_dab_job: name: sfdc_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
- パイプライン定義ファイル (
Databricks CLI を使用してパイプラインをデプロイします:
databricks bundle deploy
Databricks コマンドラインインターフェース (CLI)
パイプライン定義で次のテーブル構成プロパティを使用して、取り込む特定の列を選択または選択解除できます。
include_columns
: 必要に応じて、インジェストに含める列の一覧を指定します。 このオプションを使用して列を明示的に含める場合、パイプラインは将来ソースに追加される列を自動的に除外します。 今後の列を取り込むには、それらを一覧に追加する必要があります。exclude_columns
: 必要に応じて、インジェストから除外する列の一覧を指定します。 このオプションを使用して列を明示的に除外する場合、パイプラインには、将来ソースに追加される列が自動的に含まれます。 今後の列を取り込むには、それらを一覧に追加する必要があります。
パイプラインを作成するために
databricks pipelines create --json "<pipeline-definition | json-file-path>"
パイプラインを更新するには:
databricks pipelines update --json "<pipeline-definition | json-file-path>"
パイプライン定義を取得するには:
databricks pipelines get "<pipeline-id>"
パイプラインを削除するには:
databricks pipelines delete "<pipeline-id>"
詳細が必要な場合は、以下を実行できます。
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
JSON パイプライン定義の例
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
パイプラインを開始し、スケジュールを設定し、アラートを設定する
パイプラインの詳細ページでパイプラインのスケジュールを作成できます。
パイプラインが作成されたら、Azure Databricks ワークスペースに再びアクセスし、[パイプラインの ] をクリックします。
新しいパイプラインがパイプラインの一覧に表示されます。
パイプラインの詳細を表示するには、そのパイプライン名をクリックします。
パイプラインの詳細ページで、スケジュールをクリックしてパイプラインをスケジュールできます。
パイプラインに通知を設定するには、[設定] クリックし、通知を追加します。
パイプラインに追加するスケジュールごとに、Lakeflow Connect によって自動的にジョブが作成されます。 インジェスト パイプラインは、ジョブ内のタスクです。 必要に応じて、ジョブにさらにタスクを追加できます。
注
パイプラインを実行すると、特定のテーブルに対して 2 つのソース ビューが表示されることがあります。 1 つのビューには、数式フィールドのスナップショットが含まれています。 もう 1 つのビューには、数式以外のフィールドの増分データ プルが含まれています。 これらのビューは、ターゲットテーブルで結合されます。
例: 2 つの Salesforce オブジェクトを個別のスキーマに取り込む
このセクションのパイプライン定義の例では、2 つの Salesforce オブジェクトを個別のスキーマに取り込みます。 複数宛先パイプラインのサポートは API 専用です。
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table
例: 1 つの Salesforce オブジェクトを 3 回取り込む
このセクションのパイプライン定義例では、Salesforce オブジェクトを 3 つの異なる変換先テーブルに取り込みます。 複数宛先パイプラインのサポートは API 専用です。
必要に応じて、取り込むテーブルの名前を変更できます。 パイプライン内のテーブルの名前を変更すると、そのテーブルは API 専用パイプラインになり、UI でパイプラインを編集できなくなります。
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename