Share via


Introducción a la captura de datos modificados en el almacén analítico de Azure Cosmos DB

SE APLICA A: NoSQL MongoDB

Use la captura de datos modificados (CDC) en el almacén analítico de Azure Cosmos DB como origen para Azure Data Factory o Azure Synapse Analytics para capturar cambios específicos en los datos.

Nota

Tenga en cuenta que la interfaz de servicio vinculado para la API de Azure Cosmos DB for MongoDB aún no está disponible en el flujo de datos. No obstante, podrá usar el punto de conexión de documentos de su cuenta con la interfaz de servicio vinculado "Azure Cosmos DB for NoSQL" como solución provisional hasta que se admita el servicio vinculado de Mongo. En un servicio vinculado NoSQL, seleccione "Introducir manualmente" para proporcionar la información de la cuenta de Cosmos DB y usar el punto de conexión del documento de la cuenta (p. ej.: https://[your-database-account-uri].documents.azure.com:443/) en lugar del punto de conexión de MongoDB (p. ej.: mongodb://[your-database-account-uri].mongo.cosmos.azure.com:10255/)

Requisitos previos

Habilitación del almacén analítico

En primer lugar, habilite Azure Synapse Link en el nivel de cuenta y luego habilite el almacén analítico para los contenedores adecuados para la carga de trabajo.

  1. Habilitación de Azure Synapse Link: Habilitación de Azure Synapse Link para una cuenta de Azure Cosmos DB

  2. Habilite el almacén analítico para loa contenedores:

    Opción Guía
    Habilitar para un contenedor nuevo específico Habilitación de Azure Synapse Link para contenedores nuevos
    Habilitar para un contenedor existente específico Habilitación de Azure Synapse Link para contenedores existentes

Creación de un recurso de Azure de destino mediante flujos de datos

La característica de captura de datos modificados del almacén analítico está disponible a través de la característica de flujo de datos de Azure Data Factory o Azure Synapse Analytics. Para esta guía, use Azure Data Factory.

Importante

También puede usar Azure Synapse Analytics. Primero, cree un área de trabajo de Azure Synapse si no tiene ninguna aún. En el área de trabajo recién creada, seleccione la pestaña Desarrollar, Agregar nuevo recurso, y luego seleccione Flujo de datos.

  1. Cree una instancia de Azure Data Factory si aún no tiene uno.

    Sugerencia

    Si es posible, cree la factoría de datos en la misma región donde reside la cuenta de Azure Cosmos DB.

  2. Inicie la factoría de datos recién creada.

  3. En la factoría de datos, seleccione la pestaña Flujos de datos, y luego seleccione Nuevo flujo de datos.

  4. Asigne un nombre único al flujo de datos recién creado. En este ejemplo, el flujo de datos se denomina cosmoscdc.

    Screnshot of a new data flow with the name cosmoscdc.

Configuración de los valores de origen para el contenedor del almacén analítico

Ahora cree y configure un origen para que fluya datos desde el almacén analítico de la cuenta de Azure Cosmos DB.

  1. Seleccione Agregar origen.

    Screenshot of the add source menu option.

  2. En el campo Nombre del flujo de salida, escriba cosmos.

    Screenshot of naming the newly created source cosmos.

  3. En la sección Tipo de origen, seleccione Insertado.

    Screenshot of selecting the inline source type.

  4. En el campo Conjunto de datos, seleccione Azure: Azure Cosmos DB for NoSQL.

    Screenshot of selecting Azure Cosmos DB for NoSQL as the dataset type.

  5. Cree un nuevo servicio vinculado para la cuenta que se denomine cosmoslinkedservice. Seleccione la cuenta existente de Azure Cosmos DB for NoSQL en el cuadro de diálogo emergente Nuevo servicio vinculado, y luego seleccione Aceptar. En este ejemplo, se selecciona una cuenta de Azure Cosmos DB for NoSQL existente denominada msdocs-cosmos-source y una base de datos denominada cosmicworks.

    Screenshot of the New linked service dialog with an Azure Cosmos DB account selected.

  6. Seleccione Analítico para el tipo de almacén.

    Screenshot of the analytical option selected for a linked service.

  7. Seleccione la pestaña Opciones de origen.

  8. En Opciones de origen, seleccione el contenedor de destino y habilite Depuración del flujo de datos. En este ejemplo, el contenedor se denomina products.

    Screenshot of a source container selected named products.

  9. Seleccione Depuración de flujo de datos. En el cuadro de diálogo emergente Activar depuración del flujo de datos, conserve las opciones predeterminadas y luego seleccione Aceptar.

    Screenshot of the toggle option to enable data flow debug.

  10. La pestaña Opciones de origen también contiene otras opciones que puede habilitar. Esta tabla describe esas opciones:

Opción Descripción
Capturar actualizaciones intermedias Habilite esta opción si desea capturar el historial de cambios en los elementos, incluidos los cambios intermedios entre las lecturas de captura de datos modificados.
Capturar eliminaciones Habilite esta opción para capturar registros eliminados por el usuario y aplicarlos en el receptor. Las eliminaciones no se pueden aplicar en los receptores de Azure Data Explorer ni Azure Cosmos DB.
Capturar los TTL del almacén transaccional Habilite esta opción para capturar los registros eliminados de los TTL (período de vida) del almacén transaccional de Azure Cosmos DB y aplicarlos en el receptor. Las eliminaciones de TTL no se pueden aplicar en los receptores de Azure Data Explorer ni Azure Cosmos DB.
Tamaño por lotes en bytes Esta configuración es de hecho gigabytes. Especifique el tamaño en gigabytes si desea realizar la captura de datos de cambios por lotes.
Configuraciones adicionales Configuraciones adicionales del almacén analítico de Azure Cosmos DB y sus valores. (por ejemplo: spark.cosmos.allowWhiteSpaceInFieldNames -> true)

Trabajar con opciones de origen

Al comprobar cualquiera de las opciones Capture intermediate updates, Capture Deltes y Capture Transactional store TTLs, el proceso CDC creará y rellenará el campo __usr_opType en el receptor con los valores siguientes:

Value Descripción Opción
1 UPDATE Capturar actualizaciones intermedias
2 INSERT No hay ninguna opción para las inserciones, está activada de forma predeterminada
3 USER_DELETE Capturar eliminaciones
4 TTL_DELETE Capturar los TTL del almacén transaccional

Si tiene que diferenciar los registros eliminados de TTL de los documentos eliminados por usuarios o aplicaciones, compruebe las opciones Capture intermediate updates y Capture Transactional store TTLs. A continuación, tiene que adaptar los procesos CDC, las aplicaciones o consultas para usar __usr_opType según lo necesario para sus necesidades empresariales.

Sugerencia

Si es necesario que los consumidores de bajada restauren el orden de las actualizaciones con la opción "capturar actualizaciones intermedias", el campo de marca de tiempo del sistema _ts se puede usar como campo de ordenación.

Creación y configuración de los valores de receptor para las operaciones de actualización y eliminación

En primer lugar, cree un receptor de Azure Blob Storage sencillo y luego configure el receptor para que filtre los datos a solo operaciones específicas.

  1. Cree una cuenta y un contenedor de Azure Blob Storage, si aún no los tiene. Para los ejemplos siguientes, usaremos una cuenta denominada msdocsblobstorage y un contenedor denominado output.

    Sugerencia

    Si es posible, cree la cuenta de almacenamiento en la misma región donde reside la cuenta de Azure Cosmos DB.

  2. De nuevo en Azure Data Factory, cree un nuevo receptor para la captura de datos modificados desde el origen cosmos.

    Screenshot of adding a new sink that's connected to the existing source.

  3. Asigne un nombre único al receptor. En este ejemplo, el receptor se denomina storage.

    Screenshot of naming the newly created sink storage.

  4. En la sección Tipo de receptor, seleccione Insertado. En el campo Conjunto de datos, seleccione Delta.

    Screenshot of selecting and Inline Delta dataset type for the sink.

  5. Cree un nuevo servicio vinculado para su cuenta mediante Azure Blob Storage denominado storagelinkedservice. Seleccione la cuenta existente de Azure Blob Storage en el cuadro de diálogo emergente Nuevo servicio vinculado, y luego seleccione Aceptar. En este ejemplo, se selecciona una cuenta de Azure Blob Storage existente denominada msdocsblobstorage.

    Screenshot of the service type options for a new Delta linked service.

    Screenshot of the New linked service dialog with an Azure Blob Storage account selected.

  6. Seleccione la pestaña Configuración.

  7. En Configuración, establezca la ruta de acceso de la carpeta según el nombre del contenedor de blobs. En este ejemplo, el nombre del contenedor es output.

    Screenshot of the blob container named output set as the sink target.

  8. Busque la sección Método de actualización y cambie las selecciones para permitir solo las operaciones de eliminación y actualización. Además, especifique las Columnas de clave como una Lista de columnas usando el campo {_rid} como identificador único.

    Screenshot of update methods and key columns being specified for the sink.

  9. Seleccione Validar para asegurarse de que no haya ningún error u omisión. A continuación, seleccione Publicar para publicar el flujo de datos.

    Screenshot of the option to validate and then publish the current data flow.

Programación de la ejecución de la captura de datos modificados

Una vez publicado un flujo de datos, puede agregar una nueva canalización para mover y transformar los datos.

  1. Crear una canalización. Asigne un nombre único a la canalización. En este ejemplo, la canalización se denomina cosmoscdcpipeline.

    Screenshot of the new pipeline option within the resources section.

  2. En la sección Actividades, expanda la opción Mover y transformar, y luego seleccione Flujo de datos.

    Screenshot of the data flow activity option within the activities section.

  3. Asigne un nombre único a la actividad de flujo de datos. En este ejemplo, la actividad se denomina cosmoscdcactivity.

  4. En la pestaña Configuración, seleccione el flujo de datos denominado cosmoscdc que ha creado anteriormente en esta guía. A continuación, seleccione un tamaño de proceso basado en el volumen de datos y la latencia necesaria para la carga de trabajo.

    Screenshot of the configuration settings for both the data flow and compute size for the activity.

    Sugerencia

    En el caso de los tamaños de datos incrementales mayores de 100 GB, se recomienda el tamaño personalizado con un recuento de núcleos de 32 (+16 núcleos de controladores).

  5. Seleccione Agregar desencadenador. Programe esta canalización para que se ejecute con una cadencia que sea adecuada para la carga de trabajo. En este ejemplo, la canalización está configurada para ejecutarse cada cinco minutos.

    Screenshot of the add trigger button for a new pipeline.

    Screenshot of a trigger configuration based on a schedule, starting in the year 2023, that runs every five minutes.

    Nota:

    La ventana de periodicidad mínima para las ejecuciones de captura de datos modificados es de un minuto.

  6. Seleccione Validar para asegurarse de que no haya ningún error u omisión. A continuación, seleccione Publicar para publicar la canalización.

  7. Observe los datos colocados en el contenedor de Azure Blob Storage como salida del flujo de datos mediante la captura de datos modificados del almacén analítico de Azure Cosmos DB.

    Screnshot of the output files from the pipeline in the Azure Blob Storage container.

    Nota:

    El tiempo de arranque inicial del clúster puede tardar hasta tres minutos. Para evitar el tiempo de inicio del clúster en las ejecuciones posteriores de captura de datos modificados, configure el valor Tiempo de vida del clúster de flujo de datos. Para más información sobre el entorno de ejecución de integración y el TTL, consulte Entorno de ejecución de integración en Azure Data Factory.

Trabajos simultáneos

El tamaño del lote en las opciones de origen, o las situaciones en las que el receptor tarda en ingerir el flujo de cambios, pueden provocar la ejecución de varios trabajos al mismo tiempo. Para evitar esta situación, establezca la opción Simultaneidad en 1 en la configuración de la canalización, para asegurarse de que no se desencadenan nuevas ejecuciones hasta que finalice la ejecución actual.

Pasos siguientes