Partager via


Importer des rapports Workday

Cette page explique comment ingérer des rapports Workday et les charger dans Azure Databricks à l’aide de Lakeflow Connect.

Avant de commencer

Pour créer un pipeline d’ingestion, vous devez répondre aux exigences suivantes :

  • Vous devez activer votre espace de travail pour Unity Catalog.

  • Le calcul serverless doit être activé pour votre espace de travail. Consultez Activer le calcul serverless.

  • Si vous envisagez de créer une connexion : vous devez disposer de CREATE CONNECTION privilèges sur le metastore.

    Si votre connecteur prend en charge la création de pipelines basés sur l’interface utilisateur, vous pouvez créer la connexion et le pipeline en même temps en effectuant les étapes de cette page. Toutefois, si vous utilisez la création de pipelines basée sur l’API, vous devez créer la connexion dans l’Explorateur de catalogues avant d’effectuer les étapes de cette page. Consultez Se connecter aux sources d’ingestion managées.

  • Si vous envisagez d’utiliser une connexion existante : vous devez disposer USE CONNECTION de privilèges ou ALL PRIVILEGES sur l’objet de connexion.

  • Vous devez disposer USE CATALOG de privilèges sur le catalogue cible.

  • Vous devez avoir les privilèges USE SCHEMA et CREATE TABLE sur un schéma existant ou les privilèges CREATE SCHEMA sur le catalogue cible.

Pour ingérer à partir de Workday, consultez Configurer des rapports Workday pour l’ingestion.

Configurer la mise en réseau

Si le contrôle de sortie serverless est activé, placez les noms d’hôtes de vos URL de rapport dans la liste d’autorisation. Par exemple, l’URL du rapport https://ww1.workday.com/service/ccx/<tenant>/<reportName>?format=json a pour nom d'hôte https://ww1.workday.com. Consultez Gérer les stratégies réseau pour le contrôle de sortie serverless.

Créer une connexion Workday

Autorisations requises :CREATE CONNECTION sur le metastore.

Pour créer une connexion Workday, procédez comme suit :

  1. Dans votre espace de travail Azure Databricks, cliquez sur Catalogue > Emplacements externes > Connexions > Créer une connexion.
  2. Pour Nom de la connexion, entrez un nom unique pour la connexion Workday.
  3. Pour Type de connexion, sélectionnez Workday Reports.
  4. Pour le Type d’authentification, sélectionnez Jeton d’actualisation OAuth, puis entrez l’ID client, la Clé secrète client et le Jeton d’actualisation que vous avez générés lors de l’installation de la source.
  5. Dans la page Créer une connexion, cliquez sur Créer.

Créer un pipeline d’ingestion

Cette étape explique comment configurer le pipeline d’ingestion. Chaque table ingérée obtient une table de streaming correspondante portant le même nom (mais en minuscules) dans la destination, sauf si vous l'avez explicitement renommée.

Packs de ressources Databricks

Cet onglet explique comment déployer un pipeline d’ingestion à l’aide de Bundles de ressources Databricks. Les bundles peuvent contenir des définitions YAML de travaux et de tâches, sont gérés à l’aide de l’interface CLI Databricks et peuvent être partagés et exécutés dans différents espaces de travail cibles (tels que le développement, la préproduction et la production). Pour plus d’informations, consultez Les offres groupées de ressources Databricks.

Vous pouvez utiliser les propriétés de configuration de tableau suivantes dans votre définition de pipeline pour sélectionner ou désélectionner des colonnes spécifiques à ingérer :

  • include_columns: spécifiez éventuellement une liste de colonnes à inclure pour l’ingestion. Si vous utilisez cette option pour inclure explicitement des colonnes, le pipeline exclut automatiquement les colonnes ajoutées à la source à l’avenir. Pour ingérer les futures colonnes, vous devez les ajouter à la liste.
  • exclude_columns : vous pouvez éventuellement spécifier une liste de colonnes à exclure de l’ingestion. Si vous utilisez cette option pour exclure explicitement les colonnes, le pipeline inclut automatiquement les colonnes ajoutées à la source à l’avenir. Pour ingérer les futures colonnes, vous devez les ajouter à la liste.

Vous pouvez aussi spécifier des invites dans l’URL de rapport (source_url), ce qui vous permet d’ingérer des rapports filtrés.

  1. Créez un bundle à l’aide de l’interface CLI Databricks :

    databricks bundle init
    
  2. Ajoutez deux nouveaux fichiers de ressources à l’offre groupée :

    • Fichier de définition de pipeline (resources/workday_pipeline.yml).
    • Fichier de flux de travail qui contrôle la fréquence d’ingestion des données (resources/workday_job.yml).

    Voici un exemple de fichier resources/workday_pipeline.yml :

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for workday_dab
    resources:
      pipelines:
        pipeline_workday:
          name: workday_pipeline
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <workday-connection>
            objects:
              # An array of objects to ingest from Workday. This example
              # ingests a sample report about all active employees. The Employee_ID key is used as
              # the primary key for the report.
              - report:
                  source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
                  destination_table: All_Active_Employees_Data
                  table_configuration:
                    primary_keys:
                      - Employee_ID
                    include_columns: # This can be exclude_columns instead
                      - <column_a>
                      - <column_b>
                      - <column_c>
    

    Voici un exemple de fichier resources/workday_job.yml :

    resources:
      jobs:
        workday_dab_job:
          name: workday_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_workday.id}
    
  3. Déployez le pipeline à l’aide de l’interface CLI Databricks :

    databricks bundle deploy
    

Carnet de notes

Vous pouvez utiliser les propriétés de configuration de tableau suivantes dans votre définition de pipeline pour sélectionner ou désélectionner des colonnes spécifiques à ingérer :

  • include_columns: spécifiez éventuellement une liste de colonnes à inclure pour l’ingestion. Si vous utilisez cette option pour inclure explicitement des colonnes, le pipeline exclut automatiquement les colonnes ajoutées à la source à l’avenir. Pour ingérer les futures colonnes, vous devez les ajouter à la liste.
  • exclude_columns : vous pouvez éventuellement spécifier une liste de colonnes à exclure de l’ingestion. Si vous utilisez cette option pour exclure explicitement les colonnes, le pipeline inclut automatiquement les colonnes ajoutées à la source à l’avenir. Pour ingérer les futures colonnes, vous devez les ajouter à la liste.

Vous pouvez aussi spécifier des invites dans l’URL de rapport (source_url), ce qui vous permet d’ingérer des rapports filtrés.

  1. Générez un jeton d’accès personnel.

  2. Collez le code suivant dans une cellule du notebook Python en modifiant la valeur <personal-access-token> :

    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
    
  3. Collez le code suivant dans une deuxième cellule du notebook :

    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json
    
    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"
    
    headers = {
       'Authorization': 'Bearer {}'.format(api_token),
       'Content-Type': 'application/json'
    }
    
    def check_response(response):
       if response.status_code == 200:
          print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
       else:
          print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
    
    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)
    
    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)
    
    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
    
  4. Collez le code suivant dans une troisième cellule du notebook, et modifiez-le afin de refléter les spécifications de votre pipeline.

    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.
    
    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
       "connection_name": "<YOUR_CONNECTON_NAME>",
       "objects": [
          {
             "report": {
             "source_url": "<YOUR_REPORT_URL>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_TABLE>",
             "table_configuration": {
                   "primary_keys": ["<PRIMARY_KEY>"]
                }
             }
          }, {
             "report": {
             "source_url": "<YOUR_SECOND_REPORT_URL>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
             "table_configuration": {
                   "primary_keys": ["<PRIMARY_KEY>"],
                   "scd_type": "SCD_TYPE_2",
                   "include_columns": ["<column_a>", "<column_b>", "<column_c>"]
                }
             }
          }
       ]
    }
    }
    """
    
    create_pipeline(pipeline_spec)
    
  5. Exécutez la première cellule du notebook avec votre jeton d’accès personnel.

  6. Exécutez la deuxième cellule du notebook.

  7. Exécutez la troisième cellule du notebook avec les informations de votre pipeline. Ceci exécute create_pipeline.

    • list_pipeline retourne l’ID du pipeline et ses informations.
    • edit_pipeline vous permet de modifier la définition du pipeline.
    • delete_pipeline supprime le pipeline.

Interface de ligne de commande

Vous pouvez utiliser les propriétés de configuration de tableau suivantes dans votre définition de pipeline pour sélectionner ou désélectionner des colonnes spécifiques à ingérer :

  • include_columns: spécifiez éventuellement une liste de colonnes à inclure pour l’ingestion. Si vous utilisez cette option pour inclure explicitement des colonnes, le pipeline exclut automatiquement les colonnes ajoutées à la source à l’avenir. Pour ingérer les futures colonnes, vous devez les ajouter à la liste.
  • exclude_columns : vous pouvez éventuellement spécifier une liste de colonnes à exclure de l’ingestion. Si vous utilisez cette option pour exclure explicitement les colonnes, le pipeline inclut automatiquement les colonnes ajoutées à la source à l’avenir. Pour ingérer les futures colonnes, vous devez les ajouter à la liste.

Vous pouvez aussi spécifier des invites dans l’URL de rapport (source_url), ce qui vous permet d’ingérer des rapports filtrés.

Pour créer le pipeline :

databricks pipelines create --json "<pipeline_definition OR json file path>"

Pour modifier le pipeline :

databricks pipelines update --json "<<pipeline_definition OR json file path>"

Pour obtenir la définition du pipeline :

databricks pipelines get "<your_pipeline_id>"

Pour supprimer le pipeline :

databricks pipelines delete "<your_pipeline_id>"

Pour plus d’informations, vous pouvez toujours exécuter :

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Exemple de définition de pipeline JSON :

"ingestion_definition": {

     "connection_name": "<connection-name>",

     "objects": [

       {

         "report": {

           "source_url": "<report-url>",

           "destination_catalog": "<destination-catalog>",

           "destination_schema": "<destination-schema>",

           "table_configuration": {

              "primary_keys": ["<primary-key>"],

              "scd_type": "SCD_TYPE_2",

              "include_columns": ["<column-a>", "<column-b>", "<column-c>"]

           }

         }

       }

     ]

 }

Démarrer, planifier et définir des alertes sur votre pipeline

Vous pouvez créer un calendrier pour le pipeline sur la page de détails du pipeline.

  1. Une fois le pipeline créé, revisitez l’espace de travail Azure Databricks, puis cliquez sur Pipelines.

    Le nouveau pipeline apparaît dans la liste de pipelines.

  2. Pour afficher les informations sur le pipeline, cliquez sur le nom du pipeline.

  3. Sur la page de détails du pipeline, vous pouvez planifier le pipeline en cliquant sur Schedule.

  4. Pour définir des notifications sur le pipeline, cliquez sur Paramètres, puis ajoutez une notification.

Pour chaque planification que vous ajoutez à un pipeline, Lakeflow Connect crée automatiquement une tâche pour celle-ci. Le pipeline d’ingestion est une tâche au sein du travail. Vous pouvez éventuellement ajouter d’autres tâches au travail.

Exemple : Ingérer deux rapports Workday dans des schémas distincts

L’exemple de définition de pipeline de cette section ingère deux rapports Workday dans des schémas distincts. Le support du pipeline multi-destination est disponible uniquement via l'API.

resources:
  pipelines:
    pipeline_workday:
      name: workday_pipeline
      catalog: my_catalog_1 # Location of the pipeline event log
      schema: my_schema_1 # Location of the pipeline event log
      ingestion_definition:
        connection_name: <workday-connection>
        objects:
          - report:
              source_url: <report-url-1>
              destination_catalog: my_catalog_1
              destination_schema: my_schema_1
              destination_table: my_table_1
              table_configuration:
                primary_keys:
                  - <primary_key_column>
          - report:
              source_url: <report-url-2>
              destination_catalog: my_catalog_2
              destination_schema: my_schema_2
              destination_table: my_table_2
              table_configuration:
                primary_keys:
                  - <primary_key_column>

Ressources supplémentaires