Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En esta página se describe cómo ingerir datos de Salesforce y cargarlos en Azure Databricks mediante Lakeflow Connect.
Antes de empezar
Para crear una canalización de ingesta, debe cumplir los siguientes requisitos:
Su área de trabajo debe estar habilitada para Unity Catalog.
La computación sin servidor debe estar habilitada para tu espacio de trabajo. Consulte Habilitar cómputo sin servidor.
Si tiene previsto crear una nueva conexión: debe tener
CREATE CONNECTION
privilegios en el metastore.Si el conector admite la creación de canalizaciones basadas en la interfaz de usuario, puede crear la conexión y la canalización al mismo tiempo completando los pasos de esta página. Sin embargo, si usa la creación de canalizaciones basadas en API, debe crear la conexión en el Explorador de catálogos antes de completar los pasos de esta página. Consulte Conexión a orígenes de ingesta administrados.
Si tiene previsto usar una conexión existente: debe tener
USE CONNECTION
privilegios oALL PRIVILEGES
en el objeto de conexión.Debe tener
USE CATALOG
privilegios en el catálogo de destino.Debe tener privilegios
USE SCHEMA
yCREATE TABLE
en un esquema existente o privilegiosCREATE SCHEMA
en el catálogo de destino.
Para ingerir desde Salesforce, se recomienda lo siguiente:
- Cree un usuario de Salesforce que Databricks pueda usar para recuperar datos. Asegúrese de que el usuario tiene acceso a la API y a todos los objetos que planea ingerir.
Creación de una canalización de ingesta
Permisos necesarios:USE CONNECTION
o ALL PRIVILEGES
en una conexión.
En este paso se describe cómo crear la canalización de ingesta. Cada tabla ingerida se escribe en una tabla de transmisión con exactamente el mismo nombre (pero en minúsculas).
Interfaz de usuario de Databricks
En la barra lateral del área de trabajo de Azure Databricks, haga clic en Ingesta de datos.
En la página Agregar datos, en los Conectores de Databricks, haga clic en Salesforce.
Se abre el Asistente para ingesta de Salesforce.
En la página de Canalización del asistente, escriba un nombre único para la canalización de ingesta.
En la lista desplegable Catálogo de destino, seleccione un catálogo. Los datos ingeridos y los registros de eventos se escribirán en este catálogo.
Seleccione la conexión de Catálogo de Unity que almacena las credenciales necesarias para acceder a los datos de Salesforce.
Si no hay conexiones de Salesforce, haga clic en Crear conexión. Debe tener el privilegio
CREATE CONNECTION
en el metastore.Haga clic en Crear canalización y continúe.
En la página Origen, seleccione las tablas que se van a ingerir y, a continuación, haga clic en Siguiente.
Si selecciona Todas las tablas, el conector de ingesta de Salesforce escribe todas las tablas existentes y futuras en el esquema de origen en el esquema de destino. Hay un máximo de 250 objetos por canalización.
En la página Destino, seleccione el catálogo de Unity Catalog y el esquema donde escribir.
Si no desea usar un esquema existente, haga clic en Crear esquema. Debe tener privilegios
USE CATALOG
yCREATE SCHEMA
en el catálogo primario.Haga clic en Guardar canalización y continuar.
(Opcional) En la página Configuración , haga clic en Crear programación. Establezca la frecuencia para actualizar las tablas de destino.
(Opcional) Establezca las notificaciones por correo electrónico para que la operación de canalización se complete correctamente o no.
Haga clic en Guardar y ejecutar canalización.
Conjuntos de recursos de Databricks
En esta pestaña se describe cómo implementar una canalización de ingesta mediante conjuntos de recursos de Databricks. Las agrupaciones pueden contener definiciones de YAML de trabajos y tareas, se administran mediante la CLI de Databricks y se pueden compartir y ejecutar en diferentes áreas de trabajo de destino (como desarrollo, almacenamiento provisional y producción). Para obtener más información, consulte Conjuntos de recursos de Databricks.
Puede usar las siguientes propiedades de configuración de tabla en la definición de canalización para seleccionar o anular la selección de columnas específicas para ingerir:
-
include_columns
: opcionalmente, especifique una lista de columnas que se van a incluir para la ingesta. Si usa esta opción para incluir explícitamente columnas, la canalización excluye automáticamente las columnas que se agregan al origen en el futuro. Para ingerir las columnas futuras, tendrá que agregarlas a la lista. -
exclude_columns
: opcionalmente, especifique una lista de columnas que se excluirán de la ingesta. Si usa esta opción para excluir explícitamente columnas, la canalización incluye automáticamente las columnas que se agregan al origen en el futuro. Para ingerir las columnas futuras, tendrá que agregarlas a la lista.
Cree una nueva agrupación mediante la CLI de Databricks:
databricks bundle init
Agregue dos nuevos archivos de recursos al lote:
- Un archivo de definición de canalización (
resources/sfdc_pipeline.yml
). - Un archivo de flujo de trabajo que controla la frecuencia de ingesta de datos (
resources/sfdc_job.yml
).
A continuación se muestra un archivo
resources/sfdc_pipeline.yml
de ejemplo:variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for sfdc_dab resources: pipelines: pipeline_sfdc: name: salesforce_pipeline catalog: ${var.dest_catalog} schema: ${var.dest_schema} ingestion_definition: connection_name: <salesforce-connection> objects: # An array of objects to ingest from Salesforce. This example # ingests the AccountShare, AccountPartner, and ApexPage objects. - table: source_schema: objects source_table: AccountShare destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} table_configuration: include_columns: # This can be exclude_columns instead - <column_a> - <column_b> - <column_c> - table: source_schema: objects source_table: AccountPartner destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: ApexPage destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema}
A continuación se muestra un archivo
resources/sfdc_job.yml
de ejemplo:resources: jobs: sfdc_dab_job: name: sfdc_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
- Un archivo de definición de canalización (
Implemente la canalización mediante la CLI de Databricks:
databricks bundle deploy
CLI de Databricks
Puede usar las siguientes propiedades de configuración de tabla en la definición de canalización para seleccionar o anular la selección de columnas específicas para ingerir:
-
include_columns
: opcionalmente, especifique una lista de columnas que se van a incluir para la ingesta. Si usa esta opción para incluir explícitamente columnas, la canalización excluye automáticamente las columnas que se agregan al origen en el futuro. Para ingerir las columnas futuras, tendrá que agregarlas a la lista. -
exclude_columns
: opcionalmente, especifique una lista de columnas que se excluirán de la ingesta. Si usa esta opción para excluir explícitamente columnas, la canalización incluye automáticamente las columnas que se agregan al origen en el futuro. Para ingerir las columnas futuras, tendrá que agregarlas a la lista.
Para crear la canalización:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
Para actualiza la canalización:
databricks pipelines update --json "<pipeline-definition | json-file-path>"
Para obtener la definición de la canalización:
databricks pipelines get "<pipeline-id>"
Para eliminar la canalización:
databricks pipelines delete "<pipeline-id>"
Para más información, puede ejecutar lo siguiente:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Definición de canalización JSON de ejemplo
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
Inicio, programación y establecimiento de alertas en la canalización
Puede crear una programación para la canalización en la página de detalles de la canalización.
Una vez creada la canalización, vuelva a visitar el área de trabajo de Azure Databricks y, a continuación, haga clic en Canalizaciones.
La nueva canalización aparece en la lista de canalizaciones.
Para ver los detalles de la canalización, haga clic en el nombre de la canalización.
En la página de detalles de la canalización, puede programar la canalización al hacer clic en Programar.
Para establecer notificaciones en la canalización, haga clic en Configuración, y luego agregue una notificación.
Para cada programación que agregue a una canalización, Lakeflow Connect crea automáticamente un trabajo para ella. La canalización de ingesta es una tarea dentro del trabajo. Opcionalmente, puede agregar más tareas al trabajo.
Nota:
Cuando se ejecuta la canalización, es posible que vea dos vistas de origen para una tabla determinada. Una vista contiene las instantáneas de los campos de fórmula. La otra vista contiene las extracciones de datos incrementales para los campos que no son de fórmula. Estas vistas se combinan en la tabla de destino.
Ejemplo: Ingerir dos objetos de Salesforce en esquemas independientes
La definición de canalización de ejemplo de esta sección ingiere dos objetos de Salesforce en esquemas independientes. La compatibilidad con la canalización de varios destinos es solo API.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table
Ejemplo: Ingerir un objeto de Salesforce tres veces
La definición de canalización de ejemplo de esta sección ingiere un objeto de Salesforce en tres tablas de destino diferentes. La compatibilidad con la canalización de varios destinos es solo API.
Opcionalmente, puede cambiar el nombre de una tabla que ingiere. Si cambia el nombre de una tabla de la canalización, se convierte en una canalización solo de API y ya no puede editar la canalización en la interfaz de usuario.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename