Compartir a través de


Ingesta de datos desde Salesforce

En esta página se describe cómo ingerir datos de Salesforce y cargarlos en Azure Databricks mediante Lakeflow Connect.

Antes de empezar

Para crear una canalización de ingesta, debe cumplir los siguientes requisitos:

  • Su área de trabajo debe estar habilitada para Unity Catalog.

  • La computación sin servidor debe estar habilitada para tu espacio de trabajo. Consulte Habilitar cómputo sin servidor.

  • Si tiene previsto crear una nueva conexión: debe tener CREATE CONNECTION privilegios en el metastore.

    Si el conector admite la creación de canalizaciones basadas en la interfaz de usuario, puede crear la conexión y la canalización al mismo tiempo completando los pasos de esta página. Sin embargo, si usa la creación de canalizaciones basadas en API, debe crear la conexión en el Explorador de catálogos antes de completar los pasos de esta página. Consulte Conexión a orígenes de ingesta administrados.

  • Si tiene previsto usar una conexión existente: debe tener USE CONNECTION privilegios o ALL PRIVILEGES en el objeto de conexión.

  • Debe tener USE CATALOG privilegios en el catálogo de destino.

  • Debe tener privilegios USE SCHEMA y CREATE TABLE en un esquema existente o privilegios CREATE SCHEMA en el catálogo de destino.

Para ingerir desde Salesforce, se recomienda lo siguiente:

  • Cree un usuario de Salesforce que Databricks pueda usar para recuperar datos. Asegúrese de que el usuario tiene acceso a la API y a todos los objetos que planea ingerir.

Creación de una canalización de ingesta

Permisos necesarios:USE CONNECTION o ALL PRIVILEGES en una conexión.

En este paso se describe cómo crear la canalización de ingesta. Cada tabla ingerida se escribe en una tabla de transmisión con exactamente el mismo nombre (pero en minúsculas).

Interfaz de usuario de Databricks

  1. En la barra lateral del área de trabajo de Azure Databricks, haga clic en Ingesta de datos.

  2. En la página Agregar datos, en los Conectores de Databricks, haga clic en Salesforce.

    Se abre el Asistente para ingesta de Salesforce.

  3. En la página de Canalización del asistente, escriba un nombre único para la canalización de ingesta.

  4. En la lista desplegable Catálogo de destino, seleccione un catálogo. Los datos ingeridos y los registros de eventos se escribirán en este catálogo.

  5. Seleccione la conexión de Catálogo de Unity que almacena las credenciales necesarias para acceder a los datos de Salesforce.

    Si no hay conexiones de Salesforce, haga clic en Crear conexión. Debe tener el privilegio CREATE CONNECTION en el metastore.

  6. Haga clic en Crear canalización y continúe.

  7. En la página Origen, seleccione las tablas que se van a ingerir y, a continuación, haga clic en Siguiente.

    Si selecciona Todas las tablas, el conector de ingesta de Salesforce escribe todas las tablas existentes y futuras en el esquema de origen en el esquema de destino. Hay un máximo de 250 objetos por canalización.

  8. En la página Destino, seleccione el catálogo de Unity Catalog y el esquema donde escribir.

    Si no desea usar un esquema existente, haga clic en Crear esquema. Debe tener privilegios USE CATALOG y CREATE SCHEMA en el catálogo primario.

  9. Haga clic en Guardar canalización y continuar.

  10. (Opcional) En la página Configuración , haga clic en Crear programación. Establezca la frecuencia para actualizar las tablas de destino.

  11. (Opcional) Establezca las notificaciones por correo electrónico para que la operación de canalización se complete correctamente o no.

  12. Haga clic en Guardar y ejecutar canalización.

Conjuntos de recursos de Databricks

En esta pestaña se describe cómo implementar una canalización de ingesta mediante conjuntos de recursos de Databricks. Las agrupaciones pueden contener definiciones de YAML de trabajos y tareas, se administran mediante la CLI de Databricks y se pueden compartir y ejecutar en diferentes áreas de trabajo de destino (como desarrollo, almacenamiento provisional y producción). Para obtener más información, consulte Conjuntos de recursos de Databricks.

Puede usar las siguientes propiedades de configuración de tabla en la definición de canalización para seleccionar o anular la selección de columnas específicas para ingerir:

  • include_columns: opcionalmente, especifique una lista de columnas que se van a incluir para la ingesta. Si usa esta opción para incluir explícitamente columnas, la canalización excluye automáticamente las columnas que se agregan al origen en el futuro. Para ingerir las columnas futuras, tendrá que agregarlas a la lista.
  • exclude_columns: opcionalmente, especifique una lista de columnas que se excluirán de la ingesta. Si usa esta opción para excluir explícitamente columnas, la canalización incluye automáticamente las columnas que se agregan al origen en el futuro. Para ingerir las columnas futuras, tendrá que agregarlas a la lista.
  1. Cree una nueva agrupación mediante la CLI de Databricks:

    databricks bundle init
    
  2. Agregue dos nuevos archivos de recursos al lote:

    • Un archivo de definición de canalización (resources/sfdc_pipeline.yml).
    • Un archivo de flujo de trabajo que controla la frecuencia de ingesta de datos (resources/sfdc_job.yml).

    A continuación se muestra un archivo resources/sfdc_pipeline.yml de ejemplo:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
                  table_configuration:
                    include_columns: # This can be exclude_columns instead
                      - <column_a>
                      - <column_b>
                      - <column_c>
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
    

    A continuación se muestra un archivo resources/sfdc_job.yml de ejemplo:

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Implemente la canalización mediante la CLI de Databricks:

    databricks bundle deploy
    

CLI de Databricks

Puede usar las siguientes propiedades de configuración de tabla en la definición de canalización para seleccionar o anular la selección de columnas específicas para ingerir:

  • include_columns: opcionalmente, especifique una lista de columnas que se van a incluir para la ingesta. Si usa esta opción para incluir explícitamente columnas, la canalización excluye automáticamente las columnas que se agregan al origen en el futuro. Para ingerir las columnas futuras, tendrá que agregarlas a la lista.
  • exclude_columns: opcionalmente, especifique una lista de columnas que se excluirán de la ingesta. Si usa esta opción para excluir explícitamente columnas, la canalización incluye automáticamente las columnas que se agregan al origen en el futuro. Para ingerir las columnas futuras, tendrá que agregarlas a la lista.

Para crear la canalización:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

Para actualiza la canalización:

databricks pipelines update --json "<pipeline-definition | json-file-path>"

Para obtener la definición de la canalización:

databricks pipelines get "<pipeline-id>"

Para eliminar la canalización:

databricks pipelines delete "<pipeline-id>"

Para más información, puede ejecutar lo siguiente:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Definición de canalización JSON de ejemplo

"ingestion_definition": {

     "connection_name": "<connection-name>",

     "objects": [

       {

         "table": {

           "source_schema": "<source-schema>",

           "source_table": "<source-table>",

           "destination_catalog": "<destination-catalog>",

           "destination_schema": "<destination-schema>",

           "table_configuration": {

             "include_columns": ["<column-a>", "<column-b>", "<column-c>"]

           }

         }

       }

     ]

 }

Inicio, programación y establecimiento de alertas en la canalización

Puede crear una programación para la canalización en la página de detalles de la canalización.

  1. Una vez creada la canalización, vuelva a visitar el área de trabajo de Azure Databricks y, a continuación, haga clic en Canalizaciones.

    La nueva canalización aparece en la lista de canalizaciones.

  2. Para ver los detalles de la canalización, haga clic en el nombre de la canalización.

  3. En la página de detalles de la canalización, puede programar la canalización al hacer clic en Programar.

  4. Para establecer notificaciones en la canalización, haga clic en Configuración, y luego agregue una notificación.

Para cada programación que agregue a una canalización, Lakeflow Connect crea automáticamente un trabajo para ella. La canalización de ingesta es una tarea dentro del trabajo. Opcionalmente, puede agregar más tareas al trabajo.

Nota:

Cuando se ejecuta la canalización, es posible que vea dos vistas de origen para una tabla determinada. Una vista contiene las instantáneas de los campos de fórmula. La otra vista contiene las extracciones de datos incrementales para los campos que no son de fórmula. Estas vistas se combinan en la tabla de destino.

Ejemplo: Ingerir dos objetos de Salesforce en esquemas independientes

La definición de canalización de ejemplo de esta sección ingiere dos objetos de Salesforce en esquemas independientes. La compatibilidad con la canalización de varios destinos es solo API.

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1 # Location of the pipeline event log
      schema: my_schema_1 # Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: AccountShare
              destination_catalog: my_catalog_1 # Location of this table
              destination_schema: my_schema_1 # Location of this table
          - table:
              source_schema: objects
              source_table: AccountPartner
              destination_catalog: my_catalog_2 # Location of this table
              destination_schema: my_schema_2 # Location of this table

Ejemplo: Ingerir un objeto de Salesforce tres veces

La definición de canalización de ejemplo de esta sección ingiere un objeto de Salesforce en tres tablas de destino diferentes. La compatibilidad con la canalización de varios destinos es solo API.

Opcionalmente, puede cambiar el nombre de una tabla que ingiere. Si cambia el nombre de una tabla de la canalización, se convierte en una canalización solo de API y ya no puede editar la canalización en la interfaz de usuario.

resources:
  pipelines:
    pipeline_sfdc:
      name: salesforce_pipeline
      catalog: my_catalog_1	# Location of the pipeline event log
      schema: my_schema_1	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <salesforce-connection>
        objects:
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_1	# Location of first copy
              destination_schema: my_schema_1	# Location of first copy
          - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of second copy
              destination_schema: my_schema_2	# Location of second copy
	        - table:
              source_schema: objects
              source_table: Order
              destination_catalog: my_catalog_2	# Location of third copy, renamed
              destination_schema: my_schema_2	# Location of third copy, renamed
              destination_table: order_duplicate # Table rename

Recursos adicionales