次の方法で共有


Workday レポートの取り込み

このページでは、Workday レポートを取り込み、Lakeflow Connect を使用して Azure Databricks に読み込む方法について説明します。

開始する前に

インジェスト パイプラインを作成するには、以下の要件を満たす必要があります。

  • ワークスペースは、Unity Catalog に対して有効にする必要があります。

  • ワークスペースに対してサーバーレス コンピューティングを有効にする必要があります。 「サーバーレス コンピューティングを有効にする」をご覧ください。

  • 新しい接続を作成する場合: メタストアに対する CREATE CONNECTION 特権が必要です。

    コネクタで UI ベースのパイプライン作成がサポートされている場合は、このページの手順を完了することで、接続とパイプラインを同時に作成できます。 ただし、API ベースのパイプライン作成を使用する場合は、このページの手順を完了する前に、カタログ エクスプローラーで接続を作成する必要があります。 「マネージド インジェスト ソースへの接続」を参照してください。

  • 既存の接続を使用する場合: 接続オブジェクトに対する USE CONNECTION 特権または ALL PRIVILEGES が必要です。

  • ターゲット カタログに対する USE CATALOG 特権が必要です。

  • 既存のスキーマに対する USE SCHEMA 権限と CREATE TABLE 権限、またはターゲット カタログに対する CREATE SCHEMA 権限が必要です。

Workday から取り込むには、 ソースのセットアップを完了する必要があります。

ネットワークの構成

サーバーレスエグレス制御が有効になっている場合は、レポート URL のホスト名を許可リストに登録します。 たとえば、レポート URL https://ww1.workday.com/service/ccx/<tenant>/<reportName>?format=json にはホスト名 https://ww1.workday.comがあります。 サーバーレスエグレス制御のネットワーク ポリシーの管理を参照してください。

オプション 1: Azure Databricks UI

  1. Azure Databricks ワークスペースのサイドバーで、[ Data Ingestion をクリックします。

  2. データの追加ページのDatabricks コネクタWorkday レポートをクリックします。

    インジェスト ウィザードが開きます。

  3. ウィザードの [ インジェスト パイプライン ] ページで、パイプラインの一意の名前を入力します。

  4. [イベント ログの場所] で、パイプライン イベント ログを格納するカタログとスキーマを選択します。

  5. ソース データへのアクセスに必要な資格情報を格納する Unity カタログ接続を選択します。

    ソースへの既存の接続がない場合は、[接続の 作成 ] をクリックし、 ソースのセットアップから取得した認証の詳細を入力します。 メタストアに対する CREATE CONNECTION 特権が必要です。

  6. [パイプラインの作成] をクリックして続行します

  7. [ レポート ] ページで、[ レポートの追加 ] をクリックし、レポートの URL を入力します。 取り込むレポートごとに繰り返し、[ 次へ] をクリックします。

  8. [ 宛先 ] ページで、書き込む Unity カタログ カタログとスキーマを選択します。

    既存のスキーマを使用しない場合は、[ スキーマの作成] をクリックします。 親カタログに対する USE CATALOG および CREATE SCHEMA 特権が必要です。

  9. [パイプラインの保存] をクリックして続行

  10. (省略可能)[ 設定] ページで、[ スケジュールの作成] をクリックします。 変換先テーブルを更新する頻度を設定します。

  11. (省略可能)パイプライン操作の成功または失敗に関する電子メール通知を設定します。

  12. [ 保存してパイプラインを実行] をクリックします。

オプション 2: Databricks アセット バンドル

このセクションでは、Databricks アセット バンドルを使用してインジェスト パイプラインをデプロイする方法について説明します。 バンドルには、ジョブとタスクの YAML 定義を含め、Databricks CLI を使用して管理できます。また、さまざまなターゲット ワークスペース (開発、ステージング、運用など) で共有および実行できます。 詳しくは、「Databricks アセット バンドル」をご参照ください。

パイプライン定義で次のテーブル構成プロパティを使用して、取り込む特定の列を選択または選択解除できます。

  • include_columns: 必要に応じて、インジェストに含める列の一覧を指定します。 このオプションを使用して列を明示的に含める場合、パイプラインは将来ソースに追加される列を自動的に除外します。 今後の列を取り込むには、それらを一覧に追加する必要があります。
  • exclude_columns: 必要に応じて、インジェストから除外する列の一覧を指定します。 このオプションを使用して列を明示的に除外する場合、パイプラインには、将来ソースに追加される列が自動的に含まれます。 今後の列を取り込むには、それらを一覧に追加する必要があります。

レポート URL (source_url) にプロンプトを指定して、フィルター処理されたレポートを取り込むこともできます。

  1. Workday への Unity カタログ接続が存在することを確認します。 接続を作成する手順については、「 マネージド インジェスト ソースへの接続」を参照してください。

  2. Databricks CLI を使用して新しいバンドルを作成します:

    databricks bundle init
    
  3. バンドルに 2 つの新しいリソース ファイルを追加します:

    • パイプライン定義ファイル (resources/workday_pipeline.yml)。
    • データ インジェストの頻度を制御するワークフロー ファイル (resources/workday_job.yml)。

    resources/workday_pipeline.yml ファイルの例を以下に示します。

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for workday_dab
    resources:
      pipelines:
        pipeline_workday:
          name: workday_pipeline
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <workday-connection>
            objects:
              # An array of objects to ingest from Workday. This example
              # ingests a sample report about all active employees. The Employee_ID key is used as
              # the primary key for the report.
              - report:
                  source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
                  destination_table: All_Active_Employees_Data
                  table_configuration:
                    primary_keys:
                      - Employee_ID
                    include_columns: # This can be exclude_columns instead
                      - <column_a>
                      - <column_b>
                      - <column_c>
    

    resources/workday_job.yml ファイルの例を以下に示します。

    resources:
      jobs:
        workday_dab_job:
          name: workday_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_workday.id}
    
  4. Databricks CLI を使用してパイプラインをデプロイします:

    databricks bundle deploy
    

オプション 3: Azure Databricks ノートブック

パイプライン定義で次のテーブル構成プロパティを使用して、取り込む特定の列を選択または選択解除できます。

  • include_columns: 必要に応じて、インジェストに含める列の一覧を指定します。 このオプションを使用して列を明示的に含める場合、パイプラインは将来ソースに追加される列を自動的に除外します。 今後の列を取り込むには、それらを一覧に追加する必要があります。
  • exclude_columns: 必要に応じて、インジェストから除外する列の一覧を指定します。 このオプションを使用して列を明示的に除外する場合、パイプラインには、将来ソースに追加される列が自動的に含まれます。 今後の列を取り込むには、それらを一覧に追加する必要があります。

レポート URL (source_url) にプロンプトを指定して、フィルター処理されたレポートを取り込むこともできます。

  1. Workday への Unity カタログ接続が存在することを確認します。 接続を作成する手順については、「 マネージド インジェスト ソースへの接続」を参照してください。

  2. 個人用アクセス トークンを生成します。

  3. <personal-access-token> 値を変更する Python ノートブック セルに次のコードを貼り付けます。

    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
    
  4. 2 番目のノートブック セルに次のコードを貼り付けます。

    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json
    
    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"
    
    headers = {
       'Authorization': 'Bearer {}'.format(api_token),
       'Content-Type': 'application/json'
    }
    
    def check_response(response):
       if response.status_code == 200:
          print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
       else:
          print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
    
    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)
    
    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)
    
    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
    
  5. パイプラインの仕様を反映するように変更する 3 番目のノートブック セルに次のコードを貼り付けます。

    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.
    
    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
       "connection_name": "<YOUR_CONNECTON_NAME>",
       "objects": [
          {
             "report": {
             "source_url": "<YOUR_REPORT_URL>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_TABLE>",
             "table_configuration": {
                   "primary_keys": ["<PRIMARY_KEY>"]
                }
             }
          }, {
             "report": {
             "source_url": "<YOUR_SECOND_REPORT_URL>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
             "table_configuration": {
                   "primary_keys": ["<PRIMARY_KEY>"],
                   "scd_type": "SCD_TYPE_2",
                   "include_columns": ["<column_a>", "<column_b>", "<column_c>"]
                }
             }
          }
       ]
    }
    }
    """
    
    create_pipeline(pipeline_spec)
    
  6. 個人用アクセス トークンを使用して最初のノートブック セルを実行します。

  7. 2 番目のノートブック セルを実行します。

  8. パイプラインの詳細を 含む 3 番目のノートブック セルを実行します。 これによって create_pipeline が実行されます。

    • list_pipeline はパイプライン ID とその詳細を返します。
    • edit_pipeline でパイプライン定義を編集できます。
    • delete_pipeline でパイプラインが削除されます。

オプション 4: Databricks CLI

パイプライン定義で次のテーブル構成プロパティを使用して、取り込む特定の列を選択または選択解除できます。

  • include_columns: 必要に応じて、インジェストに含める列の一覧を指定します。 このオプションを使用して列を明示的に含める場合、パイプラインは将来ソースに追加される列を自動的に除外します。 今後の列を取り込むには、それらを一覧に追加する必要があります。
  • exclude_columns: 必要に応じて、インジェストから除外する列の一覧を指定します。 このオプションを使用して列を明示的に除外する場合、パイプラインには、将来ソースに追加される列が自動的に含まれます。 今後の列を取り込むには、それらを一覧に追加する必要があります。

レポート URL (source_url) にプロンプトを指定して、フィルター処理されたレポートを取り込むこともできます。

  1. Workday への Unity カタログ接続が存在することを確認します。 接続を作成する手順については、「 マネージド インジェスト ソースへの接続」を参照してください。
  2. 次のコマンドを実行してパイプラインを作成します。
databricks pipelines create --json "<pipeline-definition OR json-file-path>"

パイプライン定義テンプレート

JSON パイプライン定義テンプレートを次に示します。

"ingestion_definition": {

     "connection_name": "<connection-name>",

     "objects": [

       {

         "report": {

           "source_url": "<report-url>",

           "destination_catalog": "<destination-catalog>",

           "destination_schema": "<destination-schema>",

           "table_configuration": {

              "primary_keys": ["<primary-key>"],

              "scd_type": "SCD_TYPE_2",

              "include_columns": ["<column-a>", "<column-b>", "<column-c>"]

           }

         }

       }

     ]

 }

パイプラインを開始し、スケジュールを設定し、アラートを設定する

パイプラインの詳細ページでパイプラインのスケジュールを作成できます。

  1. パイプラインが作成されたら、Azure Databricks ワークスペースに再びアクセスし、[パイプラインの ] をクリックします。

    新しいパイプラインがパイプラインの一覧に表示されます。

  2. パイプラインの詳細を表示するには、そのパイプライン名をクリックします。

  3. パイプラインの詳細ページで、スケジュールをクリックしてパイプラインをスケジュールできます。

  4. パイプラインに通知を設定するには、[設定] クリックし、通知を追加します。

パイプラインに追加するスケジュールごとに、Lakeflow Connect によって自動的にジョブが作成されます。 インジェスト パイプラインは、ジョブ内のタスクです。 必要に応じて、ジョブにさらにタスクを追加できます。

例: 2 つの Workday レポートを個別のスキーマに取り込む

このセクションのパイプライン定義例では、2 つの Workday レポートを個別のスキーマに取り込みます。 複数宛先パイプラインのサポートは API 専用です。

resources:
  pipelines:
    pipeline_workday:
      name: workday_pipeline
      catalog: my_catalog_1 # Location of the pipeline event log
      schema: my_schema_1 # Location of the pipeline event log
      ingestion_definition:
        connection_name: <workday-connection>
        objects:
          - report:
              source_url: <report-url-1>
              destination_catalog: my_catalog_1
              destination_schema: my_schema_1
              destination_table: my_table_1
              table_configuration:
                primary_keys:
                  - <primary_key_column>
          - report:
              source_url: <report-url-2>
              destination_catalog: my_catalog_2
              destination_schema: my_schema_2
              destination_table: my_table_2
              table_configuration:
                primary_keys:
                  - <primary_key_column>

その他のリソース