Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Important
This feature is in Beta. Workspace admins can control access to this feature from the Previews page. See Manage Azure Databricks previews.
This page shows how to create a managed Salesforce Marketing Cloud ingestion pipeline using Lakeflow Connect.
Requirements
To create an ingestion pipeline, you must first meet the following requirements:
Your workspace must be enabled for Unity Catalog.
Serverless compute must be enabled for your workspace. See Serverless compute requirements.
If you plan to create a new connection: You must have
CREATE CONNECTIONprivileges on the metastore. See Manage privileges in Unity Catalog.If the connector supports UI-based pipeline authoring, an admin can create the connection and the pipeline at the same time by completing the steps on this page. However, if the users who create pipelines use API-based pipeline authoring or are non-admin users, an admin must first create the connection in Catalog Explorer. See Connect to managed ingestion sources.
If you plan to use an existing connection: You must have
USE CONNECTIONprivileges orALL PRIVILEGESon the connection object.You must have
USE CATALOGprivileges on the target catalog.You must have
USE SCHEMAandCREATE TABLEprivileges on an existing schema orCREATE SCHEMAprivileges on the target catalog.
To ingest from Salesforce Marketing Cloud, you must complete Configure OAuth for Salesforce Marketing Cloud ingestion into Azure Databricks and create a connection. See Create a Salesforce Marketing Cloud connection.
Create an ingestion pipeline
Databricks UI
- In the sidebar of the Azure Databricks workspace, click Data Ingestion.
- On the Add data page, under Databricks connectors, click Salesforce Marketing Cloud.
- On the Connection page of the ingestion wizard, select the connection that stores your Salesforce Marketing Cloud access credentials. If you have the
CREATE CONNECTIONprivilege on the metastore, you can clickCreate connection to create a new connection with the authentication details in Create a Salesforce Marketing Cloud connection.
- Click Next.
- On the Ingestion setup page, enter a unique name for the pipeline.
- Select a catalog and a schema to write event logs to. If you have
USE CATALOGandCREATE SCHEMAprivileges on the catalog, you can clickCreate schema in the drop-down menu to create a new schema.
- Click Create pipeline and continue.
- On the Source page, select the tables to ingest.
- Click Save and continue.
- On the Destination page, select a catalog and a schema to load data into. If you have
USE CATALOGandCREATE SCHEMAprivileges on the catalog, you can clickCreate schema in the drop-down menu to create a new schema.
- Click Save and continue.
- (Optional) On the Schedules and notifications page, click
Create schedule. Set the frequency to refresh the destination tables.
- (Optional) Click
Add notification to set email notifications for pipeline operation success or failure, then click Save and run pipeline.
Declarative Automation Bundles
Use Declarative Automation Bundles to manage Salesforce Marketing Cloud pipelines as code. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see What are Declarative Automation Bundles?.
Create a bundle using the Databricks CLI:
databricks bundle initAdd two new resource files to the bundle:
- A pipeline definition file (for example,
resources/sfmc_pipeline.yml). See pipeline.ingestion_definition and Examples. - A job definition file that controls the frequency of data ingestion (for example,
resources/sfmc_job.yml).
- A pipeline definition file (for example,
Deploy the pipeline using the Databricks CLI:
databricks bundle deploy
Databricks notebook
Import the following notebook into your Azure Databricks workspace:
Leave cell one as-is.
Modify cell two or three with your pipeline configuration details. See Examples.
Click Run all.
Examples
Use these examples to configure your pipeline.
Ingest all tables from the default schema
Declarative Automation Bundles
The following pipeline definition file ingests all current and future tables from the default schema:
variables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
resources:
pipelines:
pipeline_sfmc:
name: sfmc_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <sfmc-connection>
objects:
# Ingest all tables from the default schema
- schema:
source_schema: default
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
Databricks notebook
The following pipeline specification ingests all current and future tables from the default schema:
pipeline_name = "sfmc_pipeline"
connection_name = "<sfmc-connection>"
pipeline_spec = {
"name": pipeline_name,
"ingestion_definition": {
"connection_name": connection_name,
"objects": [
{
"schema": {
"source_schema": "default",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)
Ingest a single source table
Declarative Automation Bundles
The following pipeline definition file ingests a single table from the default schema:
variables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
resources:
pipelines:
pipeline_sfmc:
name: sfmc_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <sfmc-connection>
objects:
# Ingest the campaigns table from the default schema
- table:
source_schema: default
source_table: campaigns
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
Databricks notebook
The following pipeline specification ingests a single table from the default schema:
pipeline_name = "sfmc_pipeline"
connection_name = "<sfmc-connection>"
pipeline_spec = {
"name": pipeline_name,
"ingestion_definition": {
"connection_name": connection_name,
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "campaigns",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)
Ingest data extensions
To ingest data from a data extension, use the External Key as the source_table value. See Salesforce Marketing Cloud connector concepts for details on source schema types.
Declarative Automation Bundles
The following pipeline definition file ingests a data extension:
variables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
resources:
pipelines:
pipeline_sfmc:
name: sfmc_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <sfmc-connection>
objects:
# Ingest a data extension using its External Key (Customer Key)
- table:
source_schema: dataextensions
source_table: <external-key>
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
Databricks notebook
The following pipeline specification ingests a data extension:
pipeline_name = "sfmc_pipeline"
connection_name = "<sfmc-connection>"
pipeline_spec = {
"name": pipeline_name,
"ingestion_definition": {
"connection_name": connection_name,
"objects": [
{
"table": {
"source_schema": "dataextensions",
"source_table": "<external-key>",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)
Declarative Automation Bundles job definition file
Declarative Automation Bundles
The following is an example job definition file to use with Declarative Automation Bundles. The job runs every day, exactly one day from the last run.
resources:
jobs:
sfmc_dab_job:
name: sfmc_dab_job
trigger:
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sfmc.id}
Common patterns
For advanced pipeline configurations, see Common patterns for managed ingestion pipelines.
Next steps
Start, schedule, and set alerts on your pipeline. See Common pipeline maintenance tasks.