Condividi tramite


Importare dati da Salesforce

Questa pagina descrive come inserire dati da Salesforce e caricarli in Azure Databricks usando Lakeflow Connect.

Prima di iniziare

Per creare una pipeline di raccolta, è necessario soddisfare i seguenti requisiti.

  • L'area di lavoro deve essere abilitata per Unity Catalog.

  • L'ambiente di calcolo serverless deve essere abilitato per l'area di lavoro. Vedere Abilitare l'ambiente di calcolo serverless.

  • Se hai intenzione di creare una nuova connessione: devi avere CREATE CONNECTION i privilegi sul metastore.

    Se il connettore supporta la creazione di pipeline basate sull'interfaccia utente, è possibile creare la connessione e la pipeline contemporaneamente completando i passaggi in questa pagina. Tuttavia, se si usa la creazione di pipeline basate su API, è necessario creare la connessione in Esplora cataloghi prima di completare i passaggi in questa pagina. Vedere Connettersi alle origini di inserimento gestite.

  • Se prevedi di utilizzare una connessione esistente, devi avere privilegi USE CONNECTION sull'oggetto connessione o ALL PRIVILEGES.

  • È necessario disporre dei privilegi USE CATALOG sul catalogo di destinazione.

  • È necessario disporre di privilegi USE SCHEMA e CREATE TABLE su uno schema esistente o di privilegi CREATE SCHEMA sul catalogo di destinazione.

Per inserire da Salesforce, è consigliabile quanto segue:

  • Creare un utente salesforce che Databricks può usare per recuperare i dati. Assicurarsi che l'utente abbia accesso all'API e accesso a tutti gli oggetti che si prevede di inserire.

Creare un flusso di inserimento dati

Autorizzazioni necessarie:USE CONNECTION o ALL PRIVILEGES per una connessione.

Questo passaggio descrive come creare la pipeline di ingestione. Ogni tabella inserita viene registrata in una tabella di streaming con lo stesso nome (ma tutto in minuscolo).

Interfaccia utente di Databricks

  1. Nella barra laterale dell'area di lavoro di Azure Databricks fare clic su Inserimento dati.

  2. Nella pagina Aggiungi dati, sotto Connettori Databricks, fare clic su Salesforce.

    Verrà visualizzata la procedura guidata di inserimento di Salesforce.

  3. Nella pagina Pipeline del wizard, inserisci un nome univoco per la pipeline di ingestione.

  4. Nel menu a tendina del catalogo di destinazione, selezionare un catalogo. I dati acquisiti e i registri degli eventi verranno scritti in questo catalogo.

  5. Selezionare la connessione di Unity Catalog in cui sono archiviate le credenziali necessarie per accedere ai dati di Salesforce.

    Se non sono presenti connessioni Salesforce, fare clic su Crea connessione. È necessario avere il privilegio di CREATE CONNECTION nel metastore.

  6. Fare clic su Crea pipeline e continuare.

  7. Nella pagina Origine selezionare le tabelle da inserire e quindi fare clic su Avanti.

    Se si seleziona Tutte le tabelle, il connettore di inserimento Salesforce scrive tutte le tabelle esistenti e future nello schema di origine nello schema di destinazione. È previsto un massimo di 250 oggetti per ogni pipeline.

  8. Nella pagina Destinazione, selezionare il catalogo Unity Catalog e lo schema su cui scrivere.

    Se non si vuole usare uno schema esistente, fare clic su Crea schema. È necessario avere i privilegi di USE CATALOG e CREATE SCHEMA nel catalogo padre.

  9. Fare clic su Salva pipeline e continua.

  10. (Facoltativo) Nella pagina Impostazioni fare clic su Crea pianificazione. Impostare la frequenza per aggiornare le tabelle di destinazione.

  11. (Facoltativo) Impostare le notifiche di posta elettronica per l'esito positivo o negativo dell'operazione della pipeline.

  12. Fai clic su Salva ed esegui la pipeline.

Pacchetti di Asset di Databricks

Questa scheda descrive come distribuire una pipeline di inserimento usando i bundle di asset di Databricks. I bundle possono contenere definizioni YAML di processi e attività, vengono gestiti tramite l'interfaccia della riga di comando di Databricks e possono essere condivisi ed eseguiti in aree di lavoro di destinazione diverse, ad esempio sviluppo, gestione temporanea e produzione. Per altre informazioni, vedere Aggregazioni di asset di Databricks.

Puoi usare le seguenti proprietà di configurazione della tabella nella definizione della tua pipeline per selezionare o deselezionare specifiche colonne da ingerire.

  • include_columns: Specificare facoltativamente un elenco di colonne da includere per l'ingestione. Se utilizzi questa opzione per includere esplicitamente le colonne, la pipeline esclude automaticamente le colonne che verranno aggiunte alla fonte in futuro. Per inglobare le colonne future, dovrai aggiungerle all'elenco.
  • exclude_columns: specificare facoltativamente un elenco di colonne da escludere dall'inserimento. Se utilizzi questa opzione per escludere esplicitamente le colonne, la pipeline includerà automaticamente le colonne che verranno aggiunte alla fonte in futuro. Per inglobare le colonne future, dovrai aggiungerle all'elenco.
  1. Creare un nuovo bundle usando l'interfaccia della riga di comando di Databricks:

    databricks bundle init
    
  2. Aggiungere due nuovi file di risorse al bundle:

    • Un file di definizione della pipeline (resources/sfdc_pipeline.yml).
    • File del flusso di lavoro che controlla la frequenza di inserimento dati (resources/sfdc_job.yml).

    Di seguito è riportato un file resources/sfdc_pipeline.yml di esempio:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
                  table_configuration:
                    include_columns: # This can be exclude_columns instead
                      - <column_a>
                      - <column_b>
                      - <column_c>
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
    

    Di seguito è riportato un file resources/sfdc_job.yml di esempio:

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Distribuire la pipeline usando la CLI di Databricks.

    databricks bundle deploy
    

Interfaccia a riga di comando di Databricks

Puoi usare le seguenti proprietà di configurazione della tabella nella definizione della tua pipeline per selezionare o deselezionare specifiche colonne da ingerire.

  • include_columns: Specificare facoltativamente un elenco di colonne da includere per l'ingestione. Se utilizzi questa opzione per includere esplicitamente le colonne, la pipeline esclude automaticamente le colonne che verranno aggiunte alla fonte in futuro. Per inglobare le colonne future, dovrai aggiungerle all'elenco.
  • exclude_columns: specificare facoltativamente un elenco di colonne da escludere dall'inserimento. Se utilizzi questa opzione per escludere esplicitamente le colonne, la pipeline includerà automaticamente le colonne che verranno aggiunte alla fonte in futuro. Per inglobare le colonne future, dovrai aggiungerle all'elenco.

Per creare la pipeline:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

Per aggiornare la pipeline:

databricks pipelines update --json "<pipeline-definition | json-file-path>"

Per ottenere la definizione della pipeline:

databricks pipelines get "<pipeline-id>"

Per eliminare la pipeline:

databricks pipelines delete "<pipeline-id>"

Per ulteriori informazioni, puoi eseguire:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Definizione di pipeline JSON di esempio

"ingestion_definition": {

     "connection_name": "<connection-name>",

     "objects": [

       {

         "table": {

           "source_schema": "<source-schema>",

           "source_table": "<source-table>",

           "destination_catalog": "<destination-catalog>",

           "destination_schema": "<destination-schema>",

           "table_configuration": {

             "include_columns": ["<column-a>", "<column-b>", "<column-c>"]

           }

         }

       }

     ]

 }

Avvia, pianifica e imposta notifiche sulla tua pipeline

È possibile creare una pianificazione per la pipeline nella pagina dei dettagli della pipeline.

  1. Dopo aver creato la pipeline, torna all'area di lavoro di Azure Databricks e quindi fai clic su Pipelines.

    La nuova pipeline viene visualizzata nell’elenco delle pipeline.

  2. Per visualizzare i dettagli della pipeline, fare clic sul nome della pipeline.

  3. Nella pagina dei dettagli della pipeline è possibile pianificare la pipeline facendo clic su Schedule.

  4. Per impostare le notifiche nella pipeline, fare clic su Impostazioni e quindi aggiungere una notifica.

Per ogni pianificazione aggiunta a una pipeline, Lakeflow Connect crea automaticamente un'attività. La pipeline di inserimento è un'attività all'interno del processo. Facoltativamente, è possibile aggiungere altre attività al processo.

Annotazioni

Quando la pipeline viene eseguita, è possibile che vengano visualizzate due viste di origine per una determinata tabella. Una visualizzazione contiene gli snapshot per i campi della formula. L'altra vista include l'acquisizione incrementale dei dati per i campi che non utilizzano formule. Queste viste vengono unite nella tabella di destinazione.

Esempio: inserire due oggetti Salesforce in schemi separati

La definizione della pipeline di esempio in questa sezione inserisce due oggetti Salesforce in schemi separati. Il supporto per pipeline a più destinazioni è solo tramite API.

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1 # Location of the pipeline event log
      schema: my_schema_1 # Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: AccountShare
              destination_catalog: my_catalog_1 # Location of this table
              destination_schema: my_schema_1 # Location of this table
          - table:
              source_schema: objects
              source_table: AccountPartner
              destination_catalog: my_catalog_2 # Location of this table
              destination_schema: my_schema_2 # Location of this table

Esempio: Inserire un oggetto Salesforce tre volte

La definizione della pipeline di esempio in questa sezione inserisce un oggetto Salesforce in tre diverse tabelle di destinazione. Il supporto per pipeline a più destinazioni è solo tramite API.

Puoi opzionalmente rinominare una tabella che ingerisci. Se rinomini una tabella nel tuo pipeline, diventa un pipeline esclusivamente API e non puoi più modificare il pipeline nell'interfaccia utente.

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1	# Location of the pipeline event log
      schema: my_schema_1	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_1	# Location of first copy
              destination_schema: my_schema_1	# Location of first copy
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of second copy
              destination_schema: my_schema_2	# Location of second copy
	        - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of third copy, renamed
              destination_schema: my_schema_2	# Location of third copy, renamed
              destination_table: order_duplicate # Table rename

Risorse aggiuntive