Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Op deze pagina wordt beschreven hoe u gegevens uit Salesforce opneemt en laadt in Azure Databricks met behulp van Lakeflow Connect.
Voordat u begint
Als u een opnamepijplijn wilt maken, moet u voldoen aan de volgende vereisten:
Uw werkruimte moet zijn ingeschakeld voor Unity Catalog.
Voor uw werkruimte moet serverloze rekenkracht ingeschakeld zijn. Zie Serverloze berekening inschakelen.
Als u van plan bent een nieuwe verbinding te maken: u moet bevoegdheden hebben
CREATE CONNECTION
voor de metastore.Als uw connector het ontwerpen van pijplijnen op basis van de gebruikersinterface ondersteunt, kunt u de verbinding en de pijplijn tegelijkertijd maken door de stappen op deze pagina uit te voeren. Als u echter op API gebaseerde pijplijncreatie gebruikt, moet u de verbinding maken in Catalog Explorer voordat u de stappen op deze pagina voltooit. Zie Verbinding maken met beheerde opnamebronnen.
Als u van plan bent een bestaande verbinding te gebruiken: u moet over
USE CONNECTION
rechten ofALL PRIVILEGES
beschikken voor het verbindingsobject.Je moet
USE CATALOG
rechten op de doelcatalogus hebben.U moet
USE SCHEMA
enCREATE TABLE
bevoegdheden hebben voor een bestaand schema ofCREATE SCHEMA
bevoegdheden voor de doelcatalogus.
Als u wilt opnemen vanuit Salesforce, wordt het volgende aanbevolen:
- Maak een Salesforce-gebruiker die Databricks kan gebruiken om gegevens op te halen. Zorg ervoor dat de gebruiker API-toegang heeft en toegang heeft tot alle objecten die u wilt opnemen.
Een opnamepijplijn maken
Vereiste machtigingen:USE CONNECTION
of ALL PRIVILEGES
voor een verbinding.
In deze stap wordt beschreven hoe u de opnamepijplijn maakt. Elke opgenomen tabel wordt geschreven naar een streamingtabel met dezelfde naam (maar allemaal kleine letters).
Databricks-gebruikersinterface
Klik in de zijbalk van de Azure Databricks-werkruimte op Gegevensopname.
Op de Gegevens toevoegen pagina, onder Databricks-connectors, klik op Salesforce.
De Salesforce-opnamewizard wordt geopend.
Voer op de pagina Pijplijn van de wizard een unieke naam in voor de opnamepijplijn.
In de vervolgkeuzelijst Doelcatalogus selecteer je een catalogus. Opgenomen gegevens en gebeurtenislogboeken worden naar deze catalogus geschreven.
Selecteer de Unity Catalog-verbinding waarin de referenties worden opgeslagen die zijn vereist voor toegang tot Salesforce-gegevens.
Als er geen Salesforce-verbindingen zijn, klikt u op Verbinding maken. U moet over de
CREATE CONNECTION
bevoegdheid beschikken voor de metastore.Klik op Pijplijn maken en ga door.
Op de Bron-pagina selecteer je de tabellen die je wilt inzamelen en klik vervolgens op Volgende.
Als u Alle tabellen selecteert, schrijft de Salesforce-opnameconnector alle bestaande en toekomstige tabellen in het bronschema naar het doelschema. Per pijplijn zijn er maximaal 250 objecten.
Selecteer op de doelpagina de Unity Catalog-catalogus en het schema waar u naar wilt schrijven.
Als u geen bestaand schema wilt gebruiken, klikt u op Schema maken. U moet de
USE CATALOG
enCREATE SCHEMA
rechten voor de bovenliggende catalogus hebben.Klik op Pijplijn opslaan en ga door.
(Optioneel) Klik op de pagina Instellingen op Planning maken. Stel de frequentie in om de doeltabellen te vernieuwen.
(Optioneel) E-mailmeldingen instellen voor geslaagde of mislukte pijplijnbewerkingen.
Klik op Opslaan en pijplijn uitvoeren.
Databricks Asset Pakketten
In dit tabblad wordt beschreven hoe u een opnamepijplijn implementeert met behulp van Databricks Asset Bundles. Bundels kunnen YAML-definities van taken en taken bevatten, worden beheerd met behulp van de Databricks CLI en kunnen worden gedeeld en uitgevoerd in verschillende doelwerkruimten (zoals ontwikkeling, fasering en productie). Zie Databricks Asset Bundles voor meer informatie.
U kunt de volgende tabelconfiguratie-eigenschappen binnen uw pijplijndefinitie gebruiken om specifieke kolommen te selecteren of te deselecteren voor verwerking.
-
include_columns
: Geef optioneel een lijst op van kolommen om op te nemen voor invoer. Als u deze optie gebruikt om kolommen expliciet op te nemen, sluit de pijplijn automatisch kolommen uit die in de toekomst aan de bron worden toegevoegd. Om de toekomstige kolommen te verwerken, moet u ze aan de lijst toevoegen. -
exclude_columns
: Desgewenst specificeer een lijst van kolommen om uit te sluiten van invoer. Als u deze optie gebruikt om kolommen expliciet uit te sluiten, includeert de pijplijn automatisch kolommen die in de toekomst aan de bron worden toegevoegd. Om de toekomstige kolommen te verwerken, moet u ze aan de lijst toevoegen.
Maak een nieuwe bundel met behulp van de Databricks CLI:
databricks bundle init
Voeg twee nieuwe resourcebestanden toe aan de bundel:
- Een pijplijndefinitiebestand (
resources/sfdc_pipeline.yml
). - Een werkstroombestand waarmee de frequentie van gegevensopname (
resources/sfdc_job.yml
) wordt gecontroleerd.
Hier volgt een voorbeeldbestand
resources/sfdc_pipeline.yml
:variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for sfdc_dab resources: pipelines: pipeline_sfdc: name: salesforce_pipeline catalog: ${var.dest_catalog} schema: ${var.dest_schema} ingestion_definition: connection_name: <salesforce-connection> objects: # An array of objects to ingest from Salesforce. This example # ingests the AccountShare, AccountPartner, and ApexPage objects. - table: source_schema: objects source_table: AccountShare destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} table_configuration: include_columns: # This can be exclude_columns instead - <column_a> - <column_b> - <column_c> - table: source_schema: objects source_table: AccountPartner destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: ApexPage destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema}
Hier volgt een voorbeeldbestand
resources/sfdc_job.yml
:resources: jobs: sfdc_dab_job: name: sfdc_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
- Een pijplijndefinitiebestand (
Implementeer de pijplijn met behulp van de Databricks CLI:
databricks bundle deploy
Databricks-CLI
U kunt de volgende tabelconfiguratie-eigenschappen binnen uw pijplijndefinitie gebruiken om specifieke kolommen te selecteren of te deselecteren voor verwerking.
-
include_columns
: Geef optioneel een lijst op van kolommen om op te nemen voor invoer. Als u deze optie gebruikt om kolommen expliciet op te nemen, sluit de pijplijn automatisch kolommen uit die in de toekomst aan de bron worden toegevoegd. Om de toekomstige kolommen te verwerken, moet u ze aan de lijst toevoegen. -
exclude_columns
: Desgewenst specificeer een lijst van kolommen om uit te sluiten van invoer. Als u deze optie gebruikt om kolommen expliciet uit te sluiten, includeert de pijplijn automatisch kolommen die in de toekomst aan de bron worden toegevoegd. Om de toekomstige kolommen te verwerken, moet u ze aan de lijst toevoegen.
De pijplijn maken:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
De pijplijn bijwerken:
databricks pipelines update --json "<pipeline-definition | json-file-path>"
Om de pijplijndefinitie te verkrijgen:
databricks pipelines get "<pipeline-id>"
De pijplijn verwijderen:
databricks pipelines delete "<pipeline-id>"
Voor meer informatie kunt u het volgende uitvoeren:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Voorbeeld van JSON-pijplijndefinitie
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
Start, plan en stel meldingen in voor uw pijplijn.
U kunt een planning voor de pijplijn maken op de pagina met pijplijndetails.
Nadat de pijplijn is gemaakt, gaat u opnieuw naar de Azure Databricks-werkruimte en klikt u vervolgens op Pipelines.
De nieuwe pijplijn wordt weergegeven in de lijst met pijplijnen.
Klik op de naam van de pijplijn om de details van de pijplijn weer te geven.
Op de pagina met pijplijndetails kunt u de pijplijn plannen door op Planningte klikken.
Als u meldingen in de pijplijn wilt instellen, klikt u op Instellingenen voegt u vervolgens een melding toe.
Voor elke planning die u aan een pijplijn toevoegt, maakt Lakeflow Connect automatisch een taak ervoor. De opnamepijplijn is een taak binnen de opdracht. U kunt desgewenst meer taken toevoegen aan de taak.
Opmerking
Wanneer de pijplijn draait, kunt u mogelijk twee bronweergaven voor een bepaalde tabel zien. Eén weergave bevat de momentopnamen voor formulevelden. De andere weergave bevat de incrementele gegevens die worden opgehaald voor niet-formulevelden. Deze weergaven worden samengevoegd in de bestemmingstabel.
Voorbeeld: Twee Salesforce-objecten opnemen in afzonderlijke schema's
In de voorbeeldpijplijndefinitie in deze sectie worden twee Salesforce-objecten opgenomen in afzonderlijke schema's. Ondersteuning voor multi-destinatiepijpleidingen is alleen beschikbaar via de API.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table
Voorbeeld: Één Salesforce-object drie keer opnemen
De voorbeeldpijplijndefinitie in deze sectie neemt een Salesforce-object op in drie verschillende doeltabellen. Ondersteuning voor multi-destinatiepijpleidingen is alleen beschikbaar via de API.
Je kunt optioneel een tabel die je binnenhaalt een andere naam geven. Als je een tabel in je pipeline hernoemt, wordt het een alleen-API-pipeline en kun je de pipeline niet langer in de UI bewerken.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename