このページでは、Workday レポートを取り込み、Lakeflow Connect を使用して Azure Databricks に読み込む方法について説明します。
開始する前に
インジェスト パイプラインを作成するには、以下の要件を満たす必要があります。
ワークスペースは、Unity Catalog に対して有効にする必要があります。
ワークスペースに対してサーバーレス コンピューティングを有効にする必要があります。 「サーバーレス コンピューティングを有効にする」をご覧ください。
新しい接続を作成する場合: メタストアに対する
CREATE CONNECTION
特権が必要です。コネクタで UI ベースのパイプライン作成がサポートされている場合は、このページの手順を完了することで、接続とパイプラインを同時に作成できます。 ただし、API ベースのパイプライン作成を使用する場合は、このページの手順を完了する前に、カタログ エクスプローラーで接続を作成する必要があります。 「マネージド インジェスト ソースへの接続」を参照してください。
既存の接続を使用する場合: 接続オブジェクトに対する
USE CONNECTION
特権またはALL PRIVILEGES
が必要です。ターゲット カタログに対する
USE CATALOG
特権が必要です。既存のスキーマに対する
USE SCHEMA
権限とCREATE TABLE
権限、またはターゲット カタログに対するCREATE SCHEMA
権限が必要です。
Workday から取り込むには、 ソースのセットアップを完了する必要があります。
ネットワークの構成
サーバーレスエグレス制御が有効になっている場合は、レポート URL のホスト名を許可リストに登録します。 たとえば、レポート URL https://ww1.workday.com/service/ccx/<tenant>/<reportName>?format=json
にはホスト名 https://ww1.workday.com
があります。 サーバーレスエグレス制御のネットワーク ポリシーの管理を参照してください。
オプション 1: Azure Databricks UI
Azure Databricks ワークスペースのサイドバーで、[ Data Ingestion をクリックします。
データの追加ページのDatabricks コネクタのWorkday レポートをクリックします。
インジェスト ウィザードが開きます。
ウィザードの [ インジェスト パイプライン ] ページで、パイプラインの一意の名前を入力します。
[イベント ログの場所] で、パイプライン イベント ログを格納するカタログとスキーマを選択します。
ソース データへのアクセスに必要な資格情報を格納する Unity カタログ接続を選択します。
ソースへの既存の接続がない場合は、[接続の 作成 ] をクリックし、 ソースのセットアップから取得した認証の詳細を入力します。 メタストアに対する
CREATE CONNECTION
特権が必要です。[パイプラインの作成] をクリックして続行します。
[ レポート ] ページで、[ レポートの追加 ] をクリックし、レポートの URL を入力します。 取り込むレポートごとに繰り返し、[ 次へ] をクリックします。
[ 宛先 ] ページで、書き込む Unity カタログ カタログとスキーマを選択します。
既存のスキーマを使用しない場合は、[ スキーマの作成] をクリックします。 親カタログに対する
USE CATALOG
およびCREATE SCHEMA
特権が必要です。[パイプラインの保存] をクリックして続行。
(省略可能)[ 設定] ページで、[ スケジュールの作成] をクリックします。 変換先テーブルを更新する頻度を設定します。
(省略可能)パイプライン操作の成功または失敗に関する電子メール通知を設定します。
[ 保存してパイプラインを実行] をクリックします。
オプション 2: Databricks アセット バンドル
このセクションでは、Databricks アセット バンドルを使用してインジェスト パイプラインをデプロイする方法について説明します。 バンドルには、ジョブとタスクの YAML 定義を含め、Databricks CLI を使用して管理できます。また、さまざまなターゲット ワークスペース (開発、ステージング、運用など) で共有および実行できます。 詳しくは、「Databricks アセット バンドル」をご参照ください。
パイプライン定義で次のテーブル構成プロパティを使用して、取り込む特定の列を選択または選択解除できます。
include_columns
: 必要に応じて、インジェストに含める列の一覧を指定します。 このオプションを使用して列を明示的に含める場合、パイプラインは将来ソースに追加される列を自動的に除外します。 今後の列を取り込むには、それらを一覧に追加する必要があります。exclude_columns
: 必要に応じて、インジェストから除外する列の一覧を指定します。 このオプションを使用して列を明示的に除外する場合、パイプラインには、将来ソースに追加される列が自動的に含まれます。 今後の列を取り込むには、それらを一覧に追加する必要があります。
レポート URL (source_url
) にプロンプトを指定して、フィルター処理されたレポートを取り込むこともできます。
Workday への Unity カタログ接続が存在することを確認します。 接続を作成する手順については、「 マネージド インジェスト ソースへの接続」を参照してください。
Databricks CLI を使用して新しいバンドルを作成します:
databricks bundle init
バンドルに 2 つの新しいリソース ファイルを追加します:
- パイプライン定義ファイル (
resources/workday_pipeline.yml
)。 - データ インジェストの頻度を制御するワークフロー ファイル (
resources/workday_job.yml
)。
resources/workday_pipeline.yml
ファイルの例を以下に示します。variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for workday_dab resources: pipelines: pipeline_workday: name: workday_pipeline catalog: ${var.dest_catalog} schema: ${var.dest_schema} ingestion_definition: connection_name: <workday-connection> objects: # An array of objects to ingest from Workday. This example # ingests a sample report about all active employees. The Employee_ID key is used as # the primary key for the report. - report: source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} destination_table: All_Active_Employees_Data table_configuration: primary_keys: - Employee_ID include_columns: # This can be exclude_columns instead - <column_a> - <column_b> - <column_c>
resources/workday_job.yml
ファイルの例を以下に示します。resources: jobs: workday_dab_job: name: workday_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_workday.id}
- パイプライン定義ファイル (
Databricks CLI を使用してパイプラインをデプロイします:
databricks bundle deploy
オプション 3: Azure Databricks ノートブック
パイプライン定義で次のテーブル構成プロパティを使用して、取り込む特定の列を選択または選択解除できます。
include_columns
: 必要に応じて、インジェストに含める列の一覧を指定します。 このオプションを使用して列を明示的に含める場合、パイプラインは将来ソースに追加される列を自動的に除外します。 今後の列を取り込むには、それらを一覧に追加する必要があります。exclude_columns
: 必要に応じて、インジェストから除外する列の一覧を指定します。 このオプションを使用して列を明示的に除外する場合、パイプラインには、将来ソースに追加される列が自動的に含まれます。 今後の列を取り込むには、それらを一覧に追加する必要があります。
レポート URL (source_url
) にプロンプトを指定して、フィルター処理されたレポートを取り込むこともできます。
Workday への Unity カタログ接続が存在することを確認します。 接続を作成する手順については、「 マネージド インジェスト ソースへの接続」を参照してください。
個人用アクセス トークンを生成します。
<personal-access-token>
値を変更する Python ノートブック セルに次のコードを貼り付けます。# SHOULD MODIFY # This step sets up a PAT to make API calls to the Databricks service. api_token = "<personal-access-token>"
2 番目のノートブック セルに次のコードを貼り付けます。
# DO NOT MODIFY # This step sets up a connection to make API calls to the Databricks service. import requests import json notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext() workspace_url = notebook_context.apiUrl().get() api_url = f"{workspace_url}/api/2.0/pipelines" headers = { 'Authorization': 'Bearer {}'.format(api_token), 'Content-Type': 'application/json' } def check_response(response): if response.status_code == 200: print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False))) else: print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}") # DO NOT MODIFY # These are API definition to be used. def create_pipeline(pipeline_definition: str): response = requests.post(url=api_url, headers=headers, data=pipeline_definition) check_response(response) def edit_pipeline(id: str, pipeline_definition: str): response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition) check_response(response) def delete_pipeline(id: str): response = requests.delete(url=f"{api_url}/{id}", headers=headers) check_response(response) def get_pipeline(id: str): response = requests.get(url=f"{api_url}/{id}", headers=headers) check_response(response) def list_pipeline(filter: str = ""): body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}""" response = requests.get(url=api_url, headers=headers, data=body) check_response(response)
パイプラインの仕様を反映するように変更する 3 番目のノートブック セルに次のコードを貼り付けます。
# SHOULD MODIFY # Update this notebook to configure your ingestion pipeline. pipeline_spec = """ { "name": "<YOUR_PIPELINE_NAME>", "ingestion_definition": { "connection_name": "<YOUR_CONNECTON_NAME>", "objects": [ { "report": { "source_url": "<YOUR_REPORT_URL>", "destination_catalog": "<YOUR_DATABRICKS_CATALOG>", "destination_schema": "<YOUR_DATABRICKS_SCHEMA>", "destination_table": "<YOUR_DATABRICKS_TABLE>", "table_configuration": { "primary_keys": ["<PRIMARY_KEY>"] } } }, { "report": { "source_url": "<YOUR_SECOND_REPORT_URL>", "destination_catalog": "<YOUR_DATABRICKS_CATALOG>", "destination_schema": "<YOUR_DATABRICKS_SCHEMA>", "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>", "table_configuration": { "primary_keys": ["<PRIMARY_KEY>"], "scd_type": "SCD_TYPE_2", "include_columns": ["<column_a>", "<column_b>", "<column_c>"] } } } ] } } """ create_pipeline(pipeline_spec)
個人用アクセス トークンを使用して最初のノートブック セルを実行します。
2 番目のノートブック セルを実行します。
パイプラインの詳細を 含む 3 番目のノートブック セルを実行します。 これによって
create_pipeline
が実行されます。list_pipeline
はパイプライン ID とその詳細を返します。edit_pipeline
でパイプライン定義を編集できます。delete_pipeline
でパイプラインが削除されます。
オプション 4: Databricks CLI
パイプライン定義で次のテーブル構成プロパティを使用して、取り込む特定の列を選択または選択解除できます。
include_columns
: 必要に応じて、インジェストに含める列の一覧を指定します。 このオプションを使用して列を明示的に含める場合、パイプラインは将来ソースに追加される列を自動的に除外します。 今後の列を取り込むには、それらを一覧に追加する必要があります。exclude_columns
: 必要に応じて、インジェストから除外する列の一覧を指定します。 このオプションを使用して列を明示的に除外する場合、パイプラインには、将来ソースに追加される列が自動的に含まれます。 今後の列を取り込むには、それらを一覧に追加する必要があります。
レポート URL (source_url
) にプロンプトを指定して、フィルター処理されたレポートを取り込むこともできます。
- Workday への Unity カタログ接続が存在することを確認します。 接続を作成する手順については、「 マネージド インジェスト ソースへの接続」を参照してください。
- 次のコマンドを実行してパイプラインを作成します。
databricks pipelines create --json "<pipeline-definition OR json-file-path>"
パイプライン定義テンプレート
JSON パイプライン定義テンプレートを次に示します。
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"primary_keys": ["<primary-key>"],
"scd_type": "SCD_TYPE_2",
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
パイプラインを開始し、スケジュールを設定し、アラートを設定する
パイプラインの詳細ページでパイプラインのスケジュールを作成できます。
パイプラインが作成されたら、Azure Databricks ワークスペースに再びアクセスし、[パイプラインの ] をクリックします。
新しいパイプラインがパイプラインの一覧に表示されます。
パイプラインの詳細を表示するには、そのパイプライン名をクリックします。
パイプラインの詳細ページで、スケジュールをクリックしてパイプラインをスケジュールできます。
パイプラインに通知を設定するには、[設定] クリックし、通知を追加します。
パイプラインに追加するスケジュールごとに、Lakeflow Connect によって自動的にジョブが作成されます。 インジェスト パイプラインは、ジョブ内のタスクです。 必要に応じて、ジョブにさらにタスクを追加できます。
例: 2 つの Workday レポートを個別のスキーマに取り込む
このセクションのパイプライン定義例では、2 つの Workday レポートを個別のスキーマに取り込みます。 複数宛先パイプラインのサポートは API 専用です。
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <workday-connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: my_catalog_1
destination_schema: my_schema_1
destination_table: my_table_1
table_configuration:
primary_keys:
- <primary_key_column>
- report:
source_url: <report-url-2>
destination_catalog: my_catalog_2
destination_schema: my_schema_2
destination_table: my_table_2
table_configuration:
primary_keys:
- <primary_key_column>