Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa pagina descrive come inserire dati da Salesforce e caricarli in Azure Databricks usando Lakeflow Connect.
Prima di iniziare
Per creare una pipeline di raccolta, è necessario soddisfare i seguenti requisiti.
L'area di lavoro deve essere abilitata per Unity Catalog.
L'ambiente di calcolo serverless deve essere abilitato per l'area di lavoro. Vedere Abilitare l'ambiente di calcolo serverless.
Se hai intenzione di creare una nuova connessione: devi avere
CREATE CONNECTION
i privilegi sul metastore.Se il connettore supporta la creazione di pipeline basate sull'interfaccia utente, è possibile creare la connessione e la pipeline contemporaneamente completando i passaggi in questa pagina. Tuttavia, se si usa la creazione di pipeline basate su API, è necessario creare la connessione in Esplora cataloghi prima di completare i passaggi in questa pagina. Vedere Connettersi alle origini di inserimento gestite.
Se prevedi di utilizzare una connessione esistente, devi avere privilegi
USE CONNECTION
sull'oggetto connessione oALL PRIVILEGES
.È necessario disporre dei privilegi
USE CATALOG
sul catalogo di destinazione.È necessario disporre di privilegi
USE SCHEMA
eCREATE TABLE
su uno schema esistente o di privilegiCREATE SCHEMA
sul catalogo di destinazione.
Per inserire da Salesforce, è consigliabile quanto segue:
- Creare un utente salesforce che Databricks può usare per recuperare i dati. Assicurarsi che l'utente abbia accesso all'API e accesso a tutti gli oggetti che si prevede di inserire.
Creare un flusso di inserimento dati
Autorizzazioni necessarie:USE CONNECTION
o ALL PRIVILEGES
per una connessione.
Questo passaggio descrive come creare la pipeline di ingestione. Ogni tabella inserita viene registrata in una tabella di streaming con lo stesso nome (ma tutto in minuscolo).
Interfaccia utente di Databricks
Nella barra laterale dell'area di lavoro di Azure Databricks fare clic su Inserimento dati.
Nella pagina Aggiungi dati, sotto Connettori Databricks, fare clic su Salesforce.
Verrà visualizzata la procedura guidata di inserimento di Salesforce.
Nella pagina Pipeline del wizard, inserisci un nome univoco per la pipeline di ingestione.
Nel menu a tendina del catalogo di destinazione, selezionare un catalogo. I dati acquisiti e i registri degli eventi verranno scritti in questo catalogo.
Selezionare la connessione di Unity Catalog in cui sono archiviate le credenziali necessarie per accedere ai dati di Salesforce.
Se non sono presenti connessioni Salesforce, fare clic su Crea connessione. È necessario avere il privilegio di
CREATE CONNECTION
nel metastore.Fare clic su Crea pipeline e continuare.
Nella pagina Origine selezionare le tabelle da inserire e quindi fare clic su Avanti.
Se si seleziona Tutte le tabelle, il connettore di inserimento Salesforce scrive tutte le tabelle esistenti e future nello schema di origine nello schema di destinazione. È previsto un massimo di 250 oggetti per ogni pipeline.
Nella pagina Destinazione, selezionare il catalogo Unity Catalog e lo schema su cui scrivere.
Se non si vuole usare uno schema esistente, fare clic su Crea schema. È necessario avere i privilegi di
USE CATALOG
eCREATE SCHEMA
nel catalogo padre.Fare clic su Salva pipeline e continua.
(Facoltativo) Nella pagina Impostazioni fare clic su Crea pianificazione. Impostare la frequenza per aggiornare le tabelle di destinazione.
(Facoltativo) Impostare le notifiche di posta elettronica per l'esito positivo o negativo dell'operazione della pipeline.
Fai clic su Salva ed esegui la pipeline.
Pacchetti di Asset di Databricks
Questa scheda descrive come distribuire una pipeline di inserimento usando i bundle di asset di Databricks. I bundle possono contenere definizioni YAML di processi e attività, vengono gestiti tramite l'interfaccia della riga di comando di Databricks e possono essere condivisi ed eseguiti in aree di lavoro di destinazione diverse, ad esempio sviluppo, gestione temporanea e produzione. Per altre informazioni, vedere Aggregazioni di asset di Databricks.
Puoi usare le seguenti proprietà di configurazione della tabella nella definizione della tua pipeline per selezionare o deselezionare specifiche colonne da ingerire.
-
include_columns
: Specificare facoltativamente un elenco di colonne da includere per l'ingestione. Se utilizzi questa opzione per includere esplicitamente le colonne, la pipeline esclude automaticamente le colonne che verranno aggiunte alla fonte in futuro. Per inglobare le colonne future, dovrai aggiungerle all'elenco. -
exclude_columns
: specificare facoltativamente un elenco di colonne da escludere dall'inserimento. Se utilizzi questa opzione per escludere esplicitamente le colonne, la pipeline includerà automaticamente le colonne che verranno aggiunte alla fonte in futuro. Per inglobare le colonne future, dovrai aggiungerle all'elenco.
Creare un nuovo bundle usando l'interfaccia della riga di comando di Databricks:
databricks bundle init
Aggiungere due nuovi file di risorse al bundle:
- Un file di definizione della pipeline (
resources/sfdc_pipeline.yml
). - File del flusso di lavoro che controlla la frequenza di inserimento dati (
resources/sfdc_job.yml
).
Di seguito è riportato un file
resources/sfdc_pipeline.yml
di esempio:variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for sfdc_dab resources: pipelines: pipeline_sfdc: name: salesforce_pipeline catalog: ${var.dest_catalog} schema: ${var.dest_schema} ingestion_definition: connection_name: <salesforce-connection> objects: # An array of objects to ingest from Salesforce. This example # ingests the AccountShare, AccountPartner, and ApexPage objects. - table: source_schema: objects source_table: AccountShare destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} table_configuration: include_columns: # This can be exclude_columns instead - <column_a> - <column_b> - <column_c> - table: source_schema: objects source_table: AccountPartner destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: ApexPage destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema}
Di seguito è riportato un file
resources/sfdc_job.yml
di esempio:resources: jobs: sfdc_dab_job: name: sfdc_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
- Un file di definizione della pipeline (
Distribuire la pipeline usando la CLI di Databricks.
databricks bundle deploy
Interfaccia a riga di comando di Databricks
Puoi usare le seguenti proprietà di configurazione della tabella nella definizione della tua pipeline per selezionare o deselezionare specifiche colonne da ingerire.
-
include_columns
: Specificare facoltativamente un elenco di colonne da includere per l'ingestione. Se utilizzi questa opzione per includere esplicitamente le colonne, la pipeline esclude automaticamente le colonne che verranno aggiunte alla fonte in futuro. Per inglobare le colonne future, dovrai aggiungerle all'elenco. -
exclude_columns
: specificare facoltativamente un elenco di colonne da escludere dall'inserimento. Se utilizzi questa opzione per escludere esplicitamente le colonne, la pipeline includerà automaticamente le colonne che verranno aggiunte alla fonte in futuro. Per inglobare le colonne future, dovrai aggiungerle all'elenco.
Per creare la pipeline:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
Per aggiornare la pipeline:
databricks pipelines update --json "<pipeline-definition | json-file-path>"
Per ottenere la definizione della pipeline:
databricks pipelines get "<pipeline-id>"
Per eliminare la pipeline:
databricks pipelines delete "<pipeline-id>"
Per ulteriori informazioni, puoi eseguire:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Definizione di pipeline JSON di esempio
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
Avvia, pianifica e imposta notifiche sulla tua pipeline
È possibile creare una pianificazione per la pipeline nella pagina dei dettagli della pipeline.
Dopo aver creato la pipeline, torna all'area di lavoro di Azure Databricks e quindi fai clic su Pipelines.
La nuova pipeline viene visualizzata nell’elenco delle pipeline.
Per visualizzare i dettagli della pipeline, fare clic sul nome della pipeline.
Nella pagina dei dettagli della pipeline è possibile pianificare la pipeline facendo clic su Schedule.
Per impostare le notifiche nella pipeline, fare clic su Impostazioni e quindi aggiungere una notifica.
Per ogni pianificazione aggiunta a una pipeline, Lakeflow Connect crea automaticamente un'attività. La pipeline di inserimento è un'attività all'interno del processo. Facoltativamente, è possibile aggiungere altre attività al processo.
Annotazioni
Quando la pipeline viene eseguita, è possibile che vengano visualizzate due viste di origine per una determinata tabella. Una visualizzazione contiene gli snapshot per i campi della formula. L'altra vista include l'acquisizione incrementale dei dati per i campi che non utilizzano formule. Queste viste vengono unite nella tabella di destinazione.
Esempio: inserire due oggetti Salesforce in schemi separati
La definizione della pipeline di esempio in questa sezione inserisce due oggetti Salesforce in schemi separati. Il supporto per pipeline a più destinazioni è solo tramite API.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table
Esempio: Inserire un oggetto Salesforce tre volte
La definizione della pipeline di esempio in questa sezione inserisce un oggetto Salesforce in tre diverse tabelle di destinazione. Il supporto per pipeline a più destinazioni è solo tramite API.
Puoi opzionalmente rinominare una tabella che ingerisci. Se rinomini una tabella nel tuo pipeline, diventa un pipeline esclusivamente API e non puoi più modificare il pipeline nell'interfaccia utente.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename