Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Auf dieser Seite wird beschrieben, wie Sie Daten aus Salesforce aufnehmen und in Azure Databricks mithilfe von Lakeflow Connect laden.
Bevor Sie anfangen
Um eine Erfassungspipeline erstellen zu können, müssen folgende Anforderungen erfüllt sein:
Ihr Arbeitsbereich muss für Unity Catalog aktiviert sein.
Serverlose Berechnung muss für Ihren Arbeitsbereich aktiviert sein. Siehe "Serverloses Berechnen aktivieren".
Wenn Sie eine neue Verbindung erstellen möchten: Sie müssen über Berechtigungen auf den Metastore
CREATE CONNECTION
verfügen.Wenn Ihr Connector die benutzeroberflächenbasierte Pipelineerstellung unterstützt, können Sie die Verbindung und die Pipeline gleichzeitig erstellen, indem Sie die Schritte auf dieser Seite ausführen. Wenn Sie jedoch apibasierte Pipelineerstellung verwenden, müssen Sie die Verbindung im Katalog-Explorer erstellen, bevor Sie die Schritte auf dieser Seite ausführen. Siehe Herstellen einer Verbindung mit verwalteten Aufnahmequellen.
Wenn Sie beabsichtigen, eine vorhandene Verbindung zu verwenden: Sie müssen über
USE CONNECTION
Privilegien oderALL PRIVILEGES
Berechtigungen für das Verbindungsobjekt verfügen.Sie müssen über
USE CATALOG
Berechtigungen für den Zielkatalog verfügen.Sie müssen über
USE SCHEMA
- undCREATE TABLE
-Berechtigungen für ein vorhandenes Schema oderCREATE SCHEMA
-Berechtigungen für den Zielkatalog verfügen.
Zum Importieren aus Salesforce wird Folgendes empfohlen:
- Erstellen Sie einen Salesforce-Benutzer, den Databricks zum Abrufen von Daten verwenden kann. Stellen Sie sicher, dass der Benutzer über API-Zugriff und Zugriff auf alle Objekte verfügt, die Sie aufnehmen möchten.
Erstellen einer Aufnahmepipeline
Erforderliche Berechtigungen:USE CONNECTION
oder ALL PRIVILEGES
für eine Verbindung.
In diesem Schritt wird beschrieben, wie Sie die Erfassungspipeline erstellen. Jede aufgenommene Tabelle wird in eine Streamingtabelle mit demselben Namen geschrieben (jedoch in Kleinbuchstaben).
Databricks UI
Klicken Sie in der Randleiste des Azure Databricks-Arbeitsbereichs auf "Datenaufnahme".
Klicken Sie auf der Seite Daten hinzufügen unter Databricks-Connectors auf Salesforce.
Der Salesforce-Erfassungs-Assistent wird geöffnet.
Geben Sie auf der Pipelineseite des Assistenten einen eindeutigen Namen für die Aufnahmepipeline ein.
Wählen Sie im Dropdownmenü "Zielkatalog " einen Katalog aus. Erfasste Daten und Ereignisprotokolle werden in diesen Katalog geschrieben.
Wählen Sie die Unity-Katalogverbindung aus, die die anmeldeinformationen speichert, die für den Zugriff auf Salesforce-Daten erforderlich sind.
Wenn keine Salesforce-Verbindungen vorhanden sind, klicken Sie auf "Verbindung erstellen". Sie müssen über die
CREATE CONNECTION
Berechtigung für den Metastore verfügen.Klicken Sie auf "Pipeline erstellen", und fahren Sie fort.
Wählen Sie auf der Seite "Quelle " die zu aufnehmenden Tabellen aus, und klicken Sie dann auf "Weiter".
Wenn Sie "Alle Tabellen" auswählen, schreibt der Salesforce-Aufnahmeconnector alle vorhandenen und zukünftigen Tabellen im Quellschema in das Zielschema. Pro Pipeline können maximal 250 Objekte verwendet werden.
Wählen Sie auf der Seite "Ziel " den Unity-Katalog und das Schema aus, in das geschrieben werden soll.
Wenn Sie kein vorhandenes Schema verwenden möchten, klicken Sie auf "Schema erstellen". Sie müssen über die Berechtigungen
USE CATALOG
undCREATE SCHEMA
für den übergeordneten Katalog verfügen.Klicken Sie auf "Pipeline speichern", und fahren Sie fort.
(Optional) Klicken Sie auf der Seite "Einstellungen" auf " Zeitplan erstellen". Legen Sie die Häufigkeit fest, mit der die Zieltabellen aktualisiert werden.
(Optional) Festlegen von E-Mail-Benachrichtigungen für Erfolg oder Fehler des Pipelinevorgangs.
Klicken Sie auf "Speichern", und führen Sie die Pipelineaus.
Databricks-Ressourcenpakete
Auf dieser Registerkarte wird beschrieben, wie Sie eine Datenerfassungspipeline mithilfe von Databricks Asset Bundles bereitstellen. Bundles können YAML-Definitionen von Aufträgen und Aufgaben enthalten, mithilfe der Databricks CLI verwaltet und in verschiedenen Zielarbeitsbereichen (z. B. Entwicklung, Staging und Produktion) freigegeben und ausgeführt werden. Weitere Informationen finden Sie unter Databricks Asset Bundles.
Sie können die folgenden Konfigurationseigenschaften der Tabelle in Ihrer Pipelinedefinition verwenden, um bestimmte Spalten auszuwählen oder abzuwählen, die eingelesen werden sollen:
-
include_columns
: Geben Sie optional eine Liste von Spalten an, die für die Aufnahme eingeschlossen werden sollen. Wenn Sie diese Option verwenden, um Spalten explizit einzuschließen, schließt die Pipeline automatisch Spalten aus, die der Quelle in Zukunft hinzugefügt werden. Um die zukünftigen Spalten einzufügen, müssen Sie sie der Liste hinzufügen. -
exclude_columns
: Geben Sie optional eine Liste von Spalten an, die von der Aufnahme ausgeschlossen werden sollen. Wenn Sie diese Option verwenden, um Spalten explizit auszuschließen, enthält die Pipeline automatisch Spalten, die der Quelle in Zukunft hinzugefügt werden. Um die zukünftigen Spalten einzufügen, müssen Sie sie der Liste hinzufügen.
Erstellen Eines neuen Bündels mithilfe der Databricks CLI:
databricks bundle init
Fügen Sie dem Bundle zwei neue Ressourcendateien hinzu:
- Eine Pipelinedefinitionsdatei (
resources/sfdc_pipeline.yml
). - Eine Workflowdatei, die die Häufigkeit der Datenaufnahme steuert (
resources/sfdc_job.yml
).
Im Folgenden finden Sie eine
resources/sfdc_pipeline.yml
-Beispieldatei:variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for sfdc_dab resources: pipelines: pipeline_sfdc: name: salesforce_pipeline catalog: ${var.dest_catalog} schema: ${var.dest_schema} ingestion_definition: connection_name: <salesforce-connection> objects: # An array of objects to ingest from Salesforce. This example # ingests the AccountShare, AccountPartner, and ApexPage objects. - table: source_schema: objects source_table: AccountShare destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} table_configuration: include_columns: # This can be exclude_columns instead - <column_a> - <column_b> - <column_c> - table: source_schema: objects source_table: AccountPartner destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: ApexPage destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema}
Im Folgenden finden Sie eine
resources/sfdc_job.yml
-Beispieldatei:resources: jobs: sfdc_dab_job: name: sfdc_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
- Eine Pipelinedefinitionsdatei (
Stellen Sie die Pipeline mithilfe der Databricks CLI bereit:
databricks bundle deploy
Databricks-Befehlszeilenschnittstelle
Sie können die folgenden Konfigurationseigenschaften der Tabelle in Ihrer Pipelinedefinition verwenden, um bestimmte Spalten auszuwählen oder abzuwählen, die eingelesen werden sollen:
-
include_columns
: Geben Sie optional eine Liste von Spalten an, die für die Aufnahme eingeschlossen werden sollen. Wenn Sie diese Option verwenden, um Spalten explizit einzuschließen, schließt die Pipeline automatisch Spalten aus, die der Quelle in Zukunft hinzugefügt werden. Um die zukünftigen Spalten einzufügen, müssen Sie sie der Liste hinzufügen. -
exclude_columns
: Geben Sie optional eine Liste von Spalten an, die von der Aufnahme ausgeschlossen werden sollen. Wenn Sie diese Option verwenden, um Spalten explizit auszuschließen, enthält die Pipeline automatisch Spalten, die der Quelle in Zukunft hinzugefügt werden. Um die zukünftigen Spalten einzufügen, müssen Sie sie der Liste hinzufügen.
So erstellen Sie die Pipeline:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
So aktualisieren Sie das Pipelineprofil:
databricks pipelines update --json "<pipeline-definition | json-file-path>"
Um die Pipelinedefinition abzurufen:
databricks pipelines get "<pipeline-id>"
So löschen Sie die Pipeline
databricks pipelines delete "<pipeline-id>"
Für weitere Informationen können Sie den Befehl ausführen:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Json-Pipelinedefinition (Beispiel)
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
Starten, Planen und Einrichten von Benachrichtigungen für Ihre Pipeline
Sie können einen Zeitplan für die Pipeline auf der Pipelinedetailseite erstellen.
Nachdem die Pipeline erstellt wurde, überprüfen Sie den Azure Databricks-Arbeitsbereich, und klicken Sie dann auf "Pipelines".
Die neue Pipeline wird in der Pipelineliste angezeigt.
Um die Pipelinedetails anzuzeigen, klicken Sie auf den Pipelinenamen.
Auf der Seite mit den Pipelinedetails können Sie die Pipeline planen, indem Sie auf "Zeitplan" klicken.
Um Benachrichtigungen für die Pipeline festzulegen, klicken Sie auf "Einstellungen", und fügen Sie dann eine Benachrichtigung hinzu.
Für jeden Zeitplan, den Sie einer Pipeline hinzufügen, erstellt Lakeflow Connect automatisch einen Auftrag dafür. Die Aufnahmepipeline ist eine Aufgabe innerhalb des Auftrags. Sie können dem Auftrag optional weitere Aufgaben hinzufügen.
Hinweis
Wenn die Pipeline ausgeführt wird, werden möglicherweise zwei Quellsichten für eine Tabelle angezeigt. Eine Ansicht enthält die Momentaufnahmen für Formelfelder. Die andere Sicht enthält die inkrementellen Datenpullvorgänge für Felder, die keine Formeln enthalten. Diese Sichten werden in der Zieltabelle verknüpft.
Beispiel: Importieren von zwei Salesforce-Objekten in separate Schemata
In der Beispielpipelinedefinition in diesem Abschnitt werden zwei Salesforce-Objekte in separate Schemas aufgenommen. Die Unterstützung für Multi-Destination-Pipelines erfolgt ausschließlich über die API.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table
Beispiel: Dreimaliges Importieren eines Salesforce-Objekts
Die Beispielpipelinedefinition in diesem Abschnitt nimmt ein Salesforce-Objekt in drei verschiedene Zieltabellen ein. Die Unterstützung für Multi-Destination-Pipelines erfolgt ausschließlich über die API.
Sie können nach Belieben eine Tabelle umbenennen, die Sie importieren. Wenn Sie eine Tabelle in Ihrer Pipeline umbenennen, wird sie zu einer nur API-Pipeline, und Sie können die Pipeline nicht mehr in der Benutzeroberfläche bearbeiten.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename