Freigeben über


Datenimport aus Salesforce

Auf dieser Seite wird beschrieben, wie Sie Daten aus Salesforce aufnehmen und in Azure Databricks mithilfe von Lakeflow Connect laden.

Bevor Sie anfangen

Um eine Erfassungspipeline erstellen zu können, müssen folgende Anforderungen erfüllt sein:

  • Ihr Arbeitsbereich muss für Unity Catalog aktiviert sein.

  • Serverlose Berechnung muss für Ihren Arbeitsbereich aktiviert sein. Siehe "Serverloses Berechnen aktivieren".

  • Wenn Sie eine neue Verbindung erstellen möchten: Sie müssen über Berechtigungen auf den Metastore CREATE CONNECTION verfügen.

    Wenn Ihr Connector die benutzeroberflächenbasierte Pipelineerstellung unterstützt, können Sie die Verbindung und die Pipeline gleichzeitig erstellen, indem Sie die Schritte auf dieser Seite ausführen. Wenn Sie jedoch apibasierte Pipelineerstellung verwenden, müssen Sie die Verbindung im Katalog-Explorer erstellen, bevor Sie die Schritte auf dieser Seite ausführen. Siehe Herstellen einer Verbindung mit verwalteten Aufnahmequellen.

  • Wenn Sie beabsichtigen, eine vorhandene Verbindung zu verwenden: Sie müssen über USE CONNECTION Privilegien oder ALL PRIVILEGES Berechtigungen für das Verbindungsobjekt verfügen.

  • Sie müssen über USE CATALOG Berechtigungen für den Zielkatalog verfügen.

  • Sie müssen über USE SCHEMA- und CREATE TABLE-Berechtigungen für ein vorhandenes Schema oder CREATE SCHEMA-Berechtigungen für den Zielkatalog verfügen.

Zum Importieren aus Salesforce wird Folgendes empfohlen:

  • Erstellen Sie einen Salesforce-Benutzer, den Databricks zum Abrufen von Daten verwenden kann. Stellen Sie sicher, dass der Benutzer über API-Zugriff und Zugriff auf alle Objekte verfügt, die Sie aufnehmen möchten.

Erstellen einer Aufnahmepipeline

Erforderliche Berechtigungen:USE CONNECTION oder ALL PRIVILEGES für eine Verbindung.

In diesem Schritt wird beschrieben, wie Sie die Erfassungspipeline erstellen. Jede aufgenommene Tabelle wird in eine Streamingtabelle mit demselben Namen geschrieben (jedoch in Kleinbuchstaben).

Databricks UI

  1. Klicken Sie in der Randleiste des Azure Databricks-Arbeitsbereichs auf "Datenaufnahme".

  2. Klicken Sie auf der Seite Daten hinzufügen unter Databricks-Connectors auf Salesforce.

    Der Salesforce-Erfassungs-Assistent wird geöffnet.

  3. Geben Sie auf der Pipelineseite des Assistenten einen eindeutigen Namen für die Aufnahmepipeline ein.

  4. Wählen Sie im Dropdownmenü "Zielkatalog " einen Katalog aus. Erfasste Daten und Ereignisprotokolle werden in diesen Katalog geschrieben.

  5. Wählen Sie die Unity-Katalogverbindung aus, die die anmeldeinformationen speichert, die für den Zugriff auf Salesforce-Daten erforderlich sind.

    Wenn keine Salesforce-Verbindungen vorhanden sind, klicken Sie auf "Verbindung erstellen". Sie müssen über die CREATE CONNECTION Berechtigung für den Metastore verfügen.

  6. Klicken Sie auf "Pipeline erstellen", und fahren Sie fort.

  7. Wählen Sie auf der Seite "Quelle " die zu aufnehmenden Tabellen aus, und klicken Sie dann auf "Weiter".

    Wenn Sie "Alle Tabellen" auswählen, schreibt der Salesforce-Aufnahmeconnector alle vorhandenen und zukünftigen Tabellen im Quellschema in das Zielschema. Pro Pipeline können maximal 250 Objekte verwendet werden.

  8. Wählen Sie auf der Seite "Ziel " den Unity-Katalog und das Schema aus, in das geschrieben werden soll.

    Wenn Sie kein vorhandenes Schema verwenden möchten, klicken Sie auf "Schema erstellen". Sie müssen über die Berechtigungen USE CATALOG und CREATE SCHEMA für den übergeordneten Katalog verfügen.

  9. Klicken Sie auf "Pipeline speichern", und fahren Sie fort.

  10. (Optional) Klicken Sie auf der Seite "Einstellungen" auf " Zeitplan erstellen". Legen Sie die Häufigkeit fest, mit der die Zieltabellen aktualisiert werden.

  11. (Optional) Festlegen von E-Mail-Benachrichtigungen für Erfolg oder Fehler des Pipelinevorgangs.

  12. Klicken Sie auf "Speichern", und führen Sie die Pipelineaus.

Databricks-Ressourcenpakete

Auf dieser Registerkarte wird beschrieben, wie Sie eine Datenerfassungspipeline mithilfe von Databricks Asset Bundles bereitstellen. Bundles können YAML-Definitionen von Aufträgen und Aufgaben enthalten, mithilfe der Databricks CLI verwaltet und in verschiedenen Zielarbeitsbereichen (z. B. Entwicklung, Staging und Produktion) freigegeben und ausgeführt werden. Weitere Informationen finden Sie unter Databricks Asset Bundles.

Sie können die folgenden Konfigurationseigenschaften der Tabelle in Ihrer Pipelinedefinition verwenden, um bestimmte Spalten auszuwählen oder abzuwählen, die eingelesen werden sollen:

  • include_columns: Geben Sie optional eine Liste von Spalten an, die für die Aufnahme eingeschlossen werden sollen. Wenn Sie diese Option verwenden, um Spalten explizit einzuschließen, schließt die Pipeline automatisch Spalten aus, die der Quelle in Zukunft hinzugefügt werden. Um die zukünftigen Spalten einzufügen, müssen Sie sie der Liste hinzufügen.
  • exclude_columns: Geben Sie optional eine Liste von Spalten an, die von der Aufnahme ausgeschlossen werden sollen. Wenn Sie diese Option verwenden, um Spalten explizit auszuschließen, enthält die Pipeline automatisch Spalten, die der Quelle in Zukunft hinzugefügt werden. Um die zukünftigen Spalten einzufügen, müssen Sie sie der Liste hinzufügen.
  1. Erstellen Eines neuen Bündels mithilfe der Databricks CLI:

    databricks bundle init
    
  2. Fügen Sie dem Bundle zwei neue Ressourcendateien hinzu:

    • Eine Pipelinedefinitionsdatei (resources/sfdc_pipeline.yml).
    • Eine Workflowdatei, die die Häufigkeit der Datenaufnahme steuert (resources/sfdc_job.yml).

    Im Folgenden finden Sie eine resources/sfdc_pipeline.yml-Beispieldatei:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
                  table_configuration:
                    include_columns: # This can be exclude_columns instead
                      - <column_a>
                      - <column_b>
                      - <column_c>
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
    

    Im Folgenden finden Sie eine resources/sfdc_job.yml-Beispieldatei:

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Stellen Sie die Pipeline mithilfe der Databricks CLI bereit:

    databricks bundle deploy
    

Databricks-Befehlszeilenschnittstelle

Sie können die folgenden Konfigurationseigenschaften der Tabelle in Ihrer Pipelinedefinition verwenden, um bestimmte Spalten auszuwählen oder abzuwählen, die eingelesen werden sollen:

  • include_columns: Geben Sie optional eine Liste von Spalten an, die für die Aufnahme eingeschlossen werden sollen. Wenn Sie diese Option verwenden, um Spalten explizit einzuschließen, schließt die Pipeline automatisch Spalten aus, die der Quelle in Zukunft hinzugefügt werden. Um die zukünftigen Spalten einzufügen, müssen Sie sie der Liste hinzufügen.
  • exclude_columns: Geben Sie optional eine Liste von Spalten an, die von der Aufnahme ausgeschlossen werden sollen. Wenn Sie diese Option verwenden, um Spalten explizit auszuschließen, enthält die Pipeline automatisch Spalten, die der Quelle in Zukunft hinzugefügt werden. Um die zukünftigen Spalten einzufügen, müssen Sie sie der Liste hinzufügen.

So erstellen Sie die Pipeline:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

So aktualisieren Sie das Pipelineprofil:

databricks pipelines update --json "<pipeline-definition | json-file-path>"

Um die Pipelinedefinition abzurufen:

databricks pipelines get "<pipeline-id>"

So löschen Sie die Pipeline

databricks pipelines delete "<pipeline-id>"

Für weitere Informationen können Sie den Befehl ausführen:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Json-Pipelinedefinition (Beispiel)

"ingestion_definition": {

     "connection_name": "<connection-name>",

     "objects": [

       {

         "table": {

           "source_schema": "<source-schema>",

           "source_table": "<source-table>",

           "destination_catalog": "<destination-catalog>",

           "destination_schema": "<destination-schema>",

           "table_configuration": {

             "include_columns": ["<column-a>", "<column-b>", "<column-c>"]

           }

         }

       }

     ]

 }

Starten, Planen und Einrichten von Benachrichtigungen für Ihre Pipeline

Sie können einen Zeitplan für die Pipeline auf der Pipelinedetailseite erstellen.

  1. Nachdem die Pipeline erstellt wurde, überprüfen Sie den Azure Databricks-Arbeitsbereich, und klicken Sie dann auf "Pipelines".

    Die neue Pipeline wird in der Pipelineliste angezeigt.

  2. Um die Pipelinedetails anzuzeigen, klicken Sie auf den Pipelinenamen.

  3. Auf der Seite mit den Pipelinedetails können Sie die Pipeline planen, indem Sie auf "Zeitplan" klicken.

  4. Um Benachrichtigungen für die Pipeline festzulegen, klicken Sie auf "Einstellungen", und fügen Sie dann eine Benachrichtigung hinzu.

Für jeden Zeitplan, den Sie einer Pipeline hinzufügen, erstellt Lakeflow Connect automatisch einen Auftrag dafür. Die Aufnahmepipeline ist eine Aufgabe innerhalb des Auftrags. Sie können dem Auftrag optional weitere Aufgaben hinzufügen.

Hinweis

Wenn die Pipeline ausgeführt wird, werden möglicherweise zwei Quellsichten für eine Tabelle angezeigt. Eine Ansicht enthält die Momentaufnahmen für Formelfelder. Die andere Sicht enthält die inkrementellen Datenpullvorgänge für Felder, die keine Formeln enthalten. Diese Sichten werden in der Zieltabelle verknüpft.

Beispiel: Importieren von zwei Salesforce-Objekten in separate Schemata

In der Beispielpipelinedefinition in diesem Abschnitt werden zwei Salesforce-Objekte in separate Schemas aufgenommen. Die Unterstützung für Multi-Destination-Pipelines erfolgt ausschließlich über die API.

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1 # Location of the pipeline event log
      schema: my_schema_1 # Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: AccountShare
              destination_catalog: my_catalog_1 # Location of this table
              destination_schema: my_schema_1 # Location of this table
          - table:
              source_schema: objects
              source_table: AccountPartner
              destination_catalog: my_catalog_2 # Location of this table
              destination_schema: my_schema_2 # Location of this table

Beispiel: Dreimaliges Importieren eines Salesforce-Objekts

Die Beispielpipelinedefinition in diesem Abschnitt nimmt ein Salesforce-Objekt in drei verschiedene Zieltabellen ein. Die Unterstützung für Multi-Destination-Pipelines erfolgt ausschließlich über die API.

Sie können nach Belieben eine Tabelle umbenennen, die Sie importieren. Wenn Sie eine Tabelle in Ihrer Pipeline umbenennen, wird sie zu einer nur API-Pipeline, und Sie können die Pipeline nicht mehr in der Benutzeroberfläche bearbeiten.

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1	# Location of the pipeline event log
      schema: my_schema_1	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_1	# Location of first copy
              destination_schema: my_schema_1	# Location of first copy
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of second copy
              destination_schema: my_schema_2	# Location of second copy
	        - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of third copy, renamed
              destination_schema: my_schema_2	# Location of third copy, renamed
              destination_table: order_duplicate # Table rename

Weitere Ressourcen