Partager via


Ingérer des données à partir de Salesforce

Cette page explique comment ingérer des données à partir de Salesforce et les charger dans Azure Databricks à l’aide de Lakeflow Connect.

Avant de commencer

Pour créer un pipeline d’ingestion, vous devez répondre aux exigences suivantes :

  • Votre espace de travail est activé pour Unity Catalog.
  • Le calcul serverless est activé pour votre espace de travail. Consultez Activer le calcul serverless.
  • Si vous envisagez de créer une connexion : vous avez les privilèges CREATE CONNECTION sur le metastore.
  • Si vous envisagez d’utiliser une connexion existante : vous disposez USE CONNECTION de privilèges ou ALL PRIVILEGES sur l’objet de connexion.
  • Vous disposez de USE CATALOG privilèges sur le catalogue cible.
  • Vous disposez des privilèges USE SCHEMA et CREATE TABLE sur un schéma existant ou des privilèges CREATE SCHEMA sur le catalogue cible.

Pour importer les données à partir de Salesforce, voici ce qui est recommandé :

  • Créez un utilisateur Salesforce que Databricks peut utiliser pour récupérer des données. Vérifiez que l’utilisateur a accès à l’API et à tous les objets que vous prévoyez d’ingérer.

Créer un pipeline d’ingestion

Autorisations requises :USE CONNECTION ou ALL PRIVILEGES sur une connexion.

Cette étape explique comment créer le pipeline d’ingestion. Chaque table ingérée est écrite dans une table de diffusion en continu portant le même nom (mais en minuscules).

Interface utilisateur Databricks

  1. Dans la barre latérale de l’espace de travail Azure Databricks, cliquez sur Ingestion de données.

  2. Dans la page Ajouter des données , sous Connecteurs Databricks, cliquez sur Salesforce.

    L’Assistant Ingestion Salesforce s’ouvre.

  3. Dans la page Pipeline de l’Assistant, entrez un nom unique pour le pipeline d’ingestion.

  4. Dans la liste déroulante Catalogue de destination, sélectionnez un catalogue. Les données ingérées et les journaux des événements seront écrits dans ce catalogue.

  5. Sélectionnez la connexion du catalogue Unity qui stocke les informations d’identification requises pour accéder aux données Salesforce.

    S’il n’existe aucune connexion Salesforce, cliquez sur Créer une connexion. Vous devez disposer du privilège CREATE CONNECTION sur le metastore.

  6. Cliquez sur Créer un pipeline et continuez.

  7. Dans la page Source , sélectionnez les tables à ingérer, puis cliquez sur Suivant.

    Si vous sélectionnez Toutes les tables, le connecteur d’ingestion Salesforce écrit toutes les tables existantes et futures dans le schéma source dans le schéma de destination. Il existe une limite maximale de 250 objets par pipeline.

  8. Dans la page Destination, sélectionnez le catalogue Unity et le schéma dans lequel écrire.

    Si vous ne souhaitez pas utiliser un schéma existant, cliquez sur Créer un schéma. Vous devez disposer des privilèges USE CATALOG et CREATE SCHEMA sur le catalogue parent.

  9. Cliquez sur Enregistrer le pipeline et continuez.

  10. (Facultatif) Dans la page Paramètres , cliquez sur Créer une planification. Définissez la fréquence pour actualiser les tables de destination.

  11. (Facultatif) Définissez les notifications par e-mail pour la réussite ou l’échec de l’opération de pipeline.

  12. Cliquez sur Enregistrer et exécuter le pipeline.

Packs de ressources Databricks

Cet onglet explique comment déployer un pipeline d’ingestion à l’aide de Bundles de ressources Databricks. Les bundles peuvent contenir des définitions YAML de travaux et de tâches, sont gérés à l’aide de l’interface CLI Databricks et peuvent être partagés et exécutés dans différents espaces de travail cibles (tels que le développement, la préproduction et la production). Pour plus d’informations, consultez Les offres groupées de ressources Databricks.

  1. Créez un bundle à l’aide de l’interface CLI Databricks :

    databricks bundle init
    
  2. Ajoutez deux nouveaux fichiers de ressources à l’offre groupée :

    • Fichier de définition de pipeline (resources/sfdc_pipeline.yml).
    • Fichier de flux de travail qui contrôle la fréquence d’ingestion des données (resources/sfdc_job.yml).

    Voici un exemple de fichier resources/sfdc_pipeline.yml :

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          catalog: ${var.dest_catalog}
          target: ${var.dest_schema}
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
    

    Voici un exemple de fichier resources/sfdc_job.yml :

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Déployez le pipeline à l’aide de l’interface CLI Databricks :

    databricks bundle deploy
    

Interface CLI de Databricks

Vous pouvez utiliser les propriétés de configuration de tableau suivantes dans votre définition de pipeline pour sélectionner ou désélectionner des colonnes spécifiques à ingérer :

  • include_columns: spécifiez éventuellement une liste de colonnes à inclure pour l’ingestion. Si vous utilisez cette option pour inclure explicitement des colonnes, le pipeline exclut automatiquement les colonnes ajoutées à la source à l’avenir. Pour ingérer les futures colonnes, vous devez les ajouter à la liste.
  • exclude_columns : vous pouvez éventuellement spécifier une liste de colonnes à exclure de l’ingestion. Si vous utilisez cette option pour exclure explicitement les colonnes, le pipeline inclut automatiquement les colonnes ajoutées à la source à l’avenir. Pour ingérer les futures colonnes, vous devez les ajouter à la liste.

Pour créer le pipeline :

databricks pipelines create --json "<pipeline-definition | json-file-path>"

Pour mettre à jour le pipeline :

databricks pipelines update --json "<pipeline-definition | json-file-path>"

Pour obtenir la définition du pipeline :

databricks pipelines get "<pipeline-id>"

Pour supprimer le pipeline :

databricks pipelines delete "<pipeline-id>"

Pour plus d’informations, vous pouvez exécuter :

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Exemple de définition de pipeline JSON

"ingestion_definition": {

     "connection_name": "<connection-name>",

     "objects": [

       {

         "table": {

           "source_schema": "<source-schema>",

           "source_table": "<source-table>",

           "destination_catalog": "<destination-catalog>",

           "destination_schema": "<destination-schema>",

           "table_configuration": {

             "include_columns": ["<column-a>", "<column-b>", "<column-c>"]

           }

         }

       }

     ]

 }

Démarrer, planifier et définir des alertes sur votre pipeline

Vous pouvez créer un calendrier pour le pipeline sur la page de détails du pipeline.

  1. Une fois le pipeline créé, revisitez l’espace de travail Azure Databricks, puis cliquez sur Pipelines.

    Le nouveau pipeline apparaît dans la liste de pipelines.

  2. Pour afficher les informations sur le pipeline, cliquez sur le nom du pipeline.

  3. Sur la page de détails du pipeline, vous pouvez planifier le pipeline en cliquant sur Schedule.

  4. Pour définir des notifications sur le pipeline, cliquez sur Paramètres, puis ajoutez une notification.

Pour chaque planification que vous ajoutez à un pipeline, Lakeflow Connect crée automatiquement une tâche pour celle-ci. Le pipeline d’ingestion est une tâche au sein du travail. Vous pouvez éventuellement ajouter d’autres tâches au travail.

Remarque

Quand le pipeline s’exécute, vous pouvez voir deux vues sources pour une table donnée. Une des vues contient les instantanés des champs de formule. L’autre vue contient les extractions de données incrémentielles pour les champs sans formule. Ces vues sont jointes dans la table de destination.

Exemple : Ingérer deux objets Salesforce dans des schémas distincts

L’exemple de définition de pipeline de cette section ingestionne deux objets Salesforce dans des schémas distincts. La prise en charge du pipeline multi-destination est uniquement disponible via l’API.

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1 # Location of the pipeline event log
      schema: my_schema_1 # Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: AccountShare
              destination_catalog: my_catalog_1 # Location of this table
              destination_schema: my_schema_1 # Location of this table
          - table:
              source_schema: objects
              source_table: AccountPartner
              destination_catalog: my_catalog_2 # Location of this table
              destination_schema: my_schema_2 # Location of this table

Exemple : Ingérer un objet Salesforce trois fois

L’exemple de définition de pipeline de cette section ingestionne un objet Salesforce dans trois tables de destination différentes. La prise en charge du pipeline multi-destination est uniquement disponible via l’API.

Vous pouvez éventuellement renommer une table que vous ingérez. Si vous renommez une table dans votre pipeline, elle devient un pipeline API uniquement et vous ne pouvez plus modifier le pipeline dans l’interface utilisateur.

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1	# Location of the pipeline event log
      schema: my_schema_1	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_1	# Location of first copy
              destination_schema: my_schema_1	# Location of first copy
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of second copy
              destination_schema: my_schema_2	# Location of second copy
	        - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of third copy, renamed
              destination_schema: my_schema_2	# Location of third copy, renamed
              destination_table: order_duplicate # Table rename

Ressources supplémentaires