Delen via


Gegevens opnemen uit Salesforce

Op deze pagina wordt beschreven hoe u gegevens uit Salesforce opneemt en laadt in Azure Databricks met behulp van Lakeflow Connect.

Voordat u begint

Als u een opnamepijplijn wilt maken, moet u voldoen aan de volgende vereisten:

  • Uw werkruimte moet zijn ingeschakeld voor Unity Catalog.

  • Voor uw werkruimte moet serverloze rekenkracht ingeschakeld zijn. Zie Serverloze berekening inschakelen.

  • Als u van plan bent een nieuwe verbinding te maken: u moet bevoegdheden hebben CREATE CONNECTION voor de metastore.

    Als uw connector het ontwerpen van pijplijnen op basis van de gebruikersinterface ondersteunt, kunt u de verbinding en de pijplijn tegelijkertijd maken door de stappen op deze pagina uit te voeren. Als u echter op API gebaseerde pijplijncreatie gebruikt, moet u de verbinding maken in Catalog Explorer voordat u de stappen op deze pagina voltooit. Zie Verbinding maken met beheerde opnamebronnen.

  • Als u van plan bent een bestaande verbinding te gebruiken: u moet over USE CONNECTION rechten of ALL PRIVILEGES beschikken voor het verbindingsobject.

  • Je moet USE CATALOG rechten op de doelcatalogus hebben.

  • U moet USE SCHEMA en CREATE TABLE bevoegdheden hebben voor een bestaand schema of CREATE SCHEMA bevoegdheden voor de doelcatalogus.

Als u wilt opnemen vanuit Salesforce, wordt het volgende aanbevolen:

  • Maak een Salesforce-gebruiker die Databricks kan gebruiken om gegevens op te halen. Zorg ervoor dat de gebruiker API-toegang heeft en toegang heeft tot alle objecten die u wilt opnemen.

Een opnamepijplijn maken

Vereiste machtigingen:USE CONNECTION of ALL PRIVILEGES voor een verbinding.

In deze stap wordt beschreven hoe u de opnamepijplijn maakt. Elke opgenomen tabel wordt geschreven naar een streamingtabel met dezelfde naam (maar allemaal kleine letters).

Databricks-gebruikersinterface

  1. Klik in de zijbalk van de Azure Databricks-werkruimte op Gegevensopname.

  2. Op de Gegevens toevoegen pagina, onder Databricks-connectors, klik op Salesforce.

    De Salesforce-opnamewizard wordt geopend.

  3. Voer op de pagina Pijplijn van de wizard een unieke naam in voor de opnamepijplijn.

  4. In de vervolgkeuzelijst Doelcatalogus selecteer je een catalogus. Opgenomen gegevens en gebeurtenislogboeken worden naar deze catalogus geschreven.

  5. Selecteer de Unity Catalog-verbinding waarin de referenties worden opgeslagen die zijn vereist voor toegang tot Salesforce-gegevens.

    Als er geen Salesforce-verbindingen zijn, klikt u op Verbinding maken. U moet over de CREATE CONNECTION bevoegdheid beschikken voor de metastore.

  6. Klik op Pijplijn maken en ga door.

  7. Op de Bron-pagina selecteer je de tabellen die je wilt inzamelen en klik vervolgens op Volgende.

    Als u Alle tabellen selecteert, schrijft de Salesforce-opnameconnector alle bestaande en toekomstige tabellen in het bronschema naar het doelschema. Per pijplijn zijn er maximaal 250 objecten.

  8. Selecteer op de doelpagina de Unity Catalog-catalogus en het schema waar u naar wilt schrijven.

    Als u geen bestaand schema wilt gebruiken, klikt u op Schema maken. U moet de USE CATALOG en CREATE SCHEMA rechten voor de bovenliggende catalogus hebben.

  9. Klik op Pijplijn opslaan en ga door.

  10. (Optioneel) Klik op de pagina Instellingen op Planning maken. Stel de frequentie in om de doeltabellen te vernieuwen.

  11. (Optioneel) E-mailmeldingen instellen voor geslaagde of mislukte pijplijnbewerkingen.

  12. Klik op Opslaan en pijplijn uitvoeren.

Databricks Asset Pakketten

In dit tabblad wordt beschreven hoe u een opnamepijplijn implementeert met behulp van Databricks Asset Bundles. Bundels kunnen YAML-definities van taken en taken bevatten, worden beheerd met behulp van de Databricks CLI en kunnen worden gedeeld en uitgevoerd in verschillende doelwerkruimten (zoals ontwikkeling, fasering en productie). Zie Databricks Asset Bundles voor meer informatie.

U kunt de volgende tabelconfiguratie-eigenschappen binnen uw pijplijndefinitie gebruiken om specifieke kolommen te selecteren of te deselecteren voor verwerking.

  • include_columns: Geef optioneel een lijst op van kolommen om op te nemen voor invoer. Als u deze optie gebruikt om kolommen expliciet op te nemen, sluit de pijplijn automatisch kolommen uit die in de toekomst aan de bron worden toegevoegd. Om de toekomstige kolommen te verwerken, moet u ze aan de lijst toevoegen.
  • exclude_columns: Desgewenst specificeer een lijst van kolommen om uit te sluiten van invoer. Als u deze optie gebruikt om kolommen expliciet uit te sluiten, includeert de pijplijn automatisch kolommen die in de toekomst aan de bron worden toegevoegd. Om de toekomstige kolommen te verwerken, moet u ze aan de lijst toevoegen.
  1. Maak een nieuwe bundel met behulp van de Databricks CLI:

    databricks bundle init
    
  2. Voeg twee nieuwe resourcebestanden toe aan de bundel:

    • Een pijplijndefinitiebestand (resources/sfdc_pipeline.yml).
    • Een werkstroombestand waarmee de frequentie van gegevensopname (resources/sfdc_job.yml) wordt gecontroleerd.

    Hier volgt een voorbeeldbestand resources/sfdc_pipeline.yml :

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
                  table_configuration:
                    include_columns: # This can be exclude_columns instead
                      - <column_a>
                      - <column_b>
                      - <column_c>
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
    

    Hier volgt een voorbeeldbestand resources/sfdc_job.yml :

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Implementeer de pijplijn met behulp van de Databricks CLI:

    databricks bundle deploy
    

Databricks-CLI

U kunt de volgende tabelconfiguratie-eigenschappen binnen uw pijplijndefinitie gebruiken om specifieke kolommen te selecteren of te deselecteren voor verwerking.

  • include_columns: Geef optioneel een lijst op van kolommen om op te nemen voor invoer. Als u deze optie gebruikt om kolommen expliciet op te nemen, sluit de pijplijn automatisch kolommen uit die in de toekomst aan de bron worden toegevoegd. Om de toekomstige kolommen te verwerken, moet u ze aan de lijst toevoegen.
  • exclude_columns: Desgewenst specificeer een lijst van kolommen om uit te sluiten van invoer. Als u deze optie gebruikt om kolommen expliciet uit te sluiten, includeert de pijplijn automatisch kolommen die in de toekomst aan de bron worden toegevoegd. Om de toekomstige kolommen te verwerken, moet u ze aan de lijst toevoegen.

De pijplijn maken:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

De pijplijn bijwerken:

databricks pipelines update --json "<pipeline-definition | json-file-path>"

Om de pijplijndefinitie te verkrijgen:

databricks pipelines get "<pipeline-id>"

De pijplijn verwijderen:

databricks pipelines delete "<pipeline-id>"

Voor meer informatie kunt u het volgende uitvoeren:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Voorbeeld van JSON-pijplijndefinitie

"ingestion_definition": {

     "connection_name": "<connection-name>",

     "objects": [

       {

         "table": {

           "source_schema": "<source-schema>",

           "source_table": "<source-table>",

           "destination_catalog": "<destination-catalog>",

           "destination_schema": "<destination-schema>",

           "table_configuration": {

             "include_columns": ["<column-a>", "<column-b>", "<column-c>"]

           }

         }

       }

     ]

 }

Start, plan en stel meldingen in voor uw pijplijn.

U kunt een planning voor de pijplijn maken op de pagina met pijplijndetails.

  1. Nadat de pijplijn is gemaakt, gaat u opnieuw naar de Azure Databricks-werkruimte en klikt u vervolgens op Pipelines.

    De nieuwe pijplijn wordt weergegeven in de lijst met pijplijnen.

  2. Klik op de naam van de pijplijn om de details van de pijplijn weer te geven.

  3. Op de pagina met pijplijndetails kunt u de pijplijn plannen door op Planningte klikken.

  4. Als u meldingen in de pijplijn wilt instellen, klikt u op Instellingenen voegt u vervolgens een melding toe.

Voor elke planning die u aan een pijplijn toevoegt, maakt Lakeflow Connect automatisch een taak ervoor. De opnamepijplijn is een taak binnen de opdracht. U kunt desgewenst meer taken toevoegen aan de taak.

Opmerking

Wanneer de pijplijn draait, kunt u mogelijk twee bronweergaven voor een bepaalde tabel zien. Eén weergave bevat de momentopnamen voor formulevelden. De andere weergave bevat de incrementele gegevens die worden opgehaald voor niet-formulevelden. Deze weergaven worden samengevoegd in de bestemmingstabel.

Voorbeeld: Twee Salesforce-objecten opnemen in afzonderlijke schema's

In de voorbeeldpijplijndefinitie in deze sectie worden twee Salesforce-objecten opgenomen in afzonderlijke schema's. Ondersteuning voor multi-destinatiepijpleidingen is alleen beschikbaar via de API.

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1 # Location of the pipeline event log
      schema: my_schema_1 # Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: AccountShare
              destination_catalog: my_catalog_1 # Location of this table
              destination_schema: my_schema_1 # Location of this table
          - table:
              source_schema: objects
              source_table: AccountPartner
              destination_catalog: my_catalog_2 # Location of this table
              destination_schema: my_schema_2 # Location of this table

Voorbeeld: Één Salesforce-object drie keer opnemen

De voorbeeldpijplijndefinitie in deze sectie neemt een Salesforce-object op in drie verschillende doeltabellen. Ondersteuning voor multi-destinatiepijpleidingen is alleen beschikbaar via de API.

Je kunt optioneel een tabel die je binnenhaalt een andere naam geven. Als je een tabel in je pipeline hernoemt, wordt het een alleen-API-pipeline en kun je de pipeline niet langer in de UI bewerken.

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1	# Location of the pipeline event log
      schema: my_schema_1	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_1	# Location of first copy
              destination_schema: my_schema_1	# Location of first copy
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of second copy
              destination_schema: my_schema_2	# Location of second copy
	        - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of third copy, renamed
              destination_schema: my_schema_2	# Location of third copy, renamed
              destination_table: order_duplicate # Table rename

Aanvullende bronnen