Compartir a través de


Introducción a la captura de datos modificados en el almacén analítico de Azure Cosmos DB

SE APLICA A: NoSQL MongoDB

Use la captura de datos modificados (CDC) en el almacén analítico de Azure Cosmos DB como origen para Azure Data Factory o Azure Synapse Analytics para capturar cambios específicos en los datos.

Nota

Tenga en cuenta que la interfaz de servicio vinculado para la API de Azure Cosmos DB for MongoDB aún no está disponible en el flujo de datos. No obstante, podrá usar el punto de conexión de documentos de su cuenta con la interfaz de servicio vinculado "Azure Cosmos DB for NoSQL" como solución provisional hasta que se admita el servicio vinculado de Mongo. En un servicio vinculado NoSQL, seleccione "Introducir manualmente" para proporcionar la información de la cuenta de Cosmos DB y usar el punto de conexión del documento de la cuenta (p. ej.: https://[your-database-account-uri].documents.azure.com:443/) en lugar del punto de conexión de MongoDB (p. ej.: mongodb://[your-database-account-uri].mongo.cosmos.azure.com:10255/)

Requisitos previos

Habilitación del almacén analítico

En primer lugar, habilite Azure Synapse Link en el nivel de cuenta y luego habilite el almacén analítico para los contenedores adecuados para la carga de trabajo.

  1. Habilitación de Azure Synapse Link: Habilitación de Azure Synapse Link para una cuenta de Azure Cosmos DB

  2. Habilite el almacén analítico para loa contenedores:

    Opción Guía
    Habilitar para un contenedor nuevo específico Habilitación de Azure Synapse Link para contenedores nuevos
    Habilitar para un contenedor existente específico Habilitación de Azure Synapse Link para contenedores existentes

Creación de un recurso de Azure de destino mediante flujos de datos

La característica de captura de datos modificados del almacén analítico está disponible a través de la característica de flujo de datos de Azure Data Factory o Azure Synapse Analytics. Para esta guía, use Azure Data Factory.

Importante

También puede usar Azure Synapse Analytics. Primero, cree un área de trabajo de Azure Synapse si no tiene ninguna aún. En el área de trabajo recién creada, seleccione la pestaña Desarrollar, Agregar nuevo recurso, y luego seleccione Flujo de datos.

  1. Cree una instancia de Azure Data Factory si aún no tiene uno.

    Sugerencia

    Si es posible, cree la factoría de datos en la misma región donde reside la cuenta de Azure Cosmos DB.

  2. Inicie la factoría de datos recién creada.

  3. En la factoría de datos, seleccione la pestaña Flujos de datos, y luego seleccione Nuevo flujo de datos.

  4. Asigne un nombre único al flujo de datos recién creado. En este ejemplo, el flujo de datos se denomina cosmoscdc.

    Captura de pantalla de un nuevo flujo de datos con el nombre cosmoscdc.

Configuración de los valores de origen para el contenedor del almacén analítico

Ahora cree y configure un origen para que fluya datos desde el almacén analítico de la cuenta de Azure Cosmos DB.

  1. Seleccione Agregar origen.

    Captura de pantalla de la opción del menú agregar origen.

  2. En el campo Nombre del flujo de salida, escriba cosmos.

    Captura de pantalla de la nomenclatura del cosmos de origen recién creado.

  3. En la sección Tipo de origen, seleccione Insertado.

    Captura de pantalla de la selección del tipo de origen insertado.

  4. En el campo Conjunto de datos, seleccione Azure: Azure Cosmos DB for NoSQL.

    Captura de pantalla de la selección de Azure Cosmos DB for NoSQL como tipo de conjunto de datos.

  5. Cree un nuevo servicio vinculado para la cuenta que se denomine cosmoslinkedservice. Seleccione la cuenta existente de Azure Cosmos DB for NoSQL en el cuadro de diálogo emergente Nuevo servicio vinculado, y luego seleccione Aceptar. En este ejemplo, se selecciona una cuenta de Azure Cosmos DB for NoSQL existente denominada msdocs-cosmos-source y una base de datos denominada cosmicworks.

    Captura de pantalla del cuadro de diálogo Nuevo servicio vinculado con una cuenta de Azure Cosmos DB seleccionada.

  6. Seleccione Analítico para el tipo de almacén.

    Captura de pantalla de la opción analítico seleccionada para un servicio vinculado.

  7. Seleccione la pestaña Opciones de origen.

  8. En Opciones de origen, seleccione el contenedor de destino y habilite Depuración del flujo de datos. En este ejemplo, el contenedor se denomina products.

    Captura de pantalla de un contenedor de origen seleccionado denominado productos.

  9. Seleccione Depuración de flujo de datos. En el cuadro de diálogo emergente Activar depuración del flujo de datos, conserve las opciones predeterminadas y luego seleccione Aceptar.

    Captura de pantalla de la opción de alternancia para habilitar la depuración del flujo de datos.

  10. La pestaña Opciones de origen también contiene otras opciones que puede habilitar. Esta tabla describe esas opciones:

Opción Descripción
Capturar actualizaciones intermedias Habilite esta opción si desea capturar el historial de cambios en los elementos, incluidos los cambios intermedios entre las lecturas de captura de datos modificados.
Capturar eliminaciones Habilite esta opción para capturar registros eliminados por el usuario y aplicarlos en el receptor. Las eliminaciones no se pueden aplicar en los receptores de Azure Data Explorer ni Azure Cosmos DB.
Capturar los TTL del almacén transaccional Habilite esta opción para capturar los registros eliminados de los TTL (período de vida) del almacén transaccional de Azure Cosmos DB y aplicarlos en el receptor. Las eliminaciones de TTL no se pueden aplicar en los receptores de Azure Data Explorer ni Azure Cosmos DB.
Tamaño por lotes en bytes Esta configuración es de hecho gigabytes. Especifique el tamaño en gigabytes si desea realizar la captura de datos de cambios por lotes.
Configuraciones adicionales Configuraciones adicionales del almacén analítico de Azure Cosmos DB y sus valores. (por ejemplo: spark.cosmos.allowWhiteSpaceInFieldNames -> true)

Trabajar con opciones de origen

Al comprobar cualquiera de las opciones Capture intermediate updates, Capture Deltes y Capture Transactional store TTLs, el proceso CDC creará y rellenará el campo __usr_opType en el receptor con los valores siguientes:

Value Descripción Opción
1 UPDATE Capturar actualizaciones intermedias
2 INSERT No hay ninguna opción para las inserciones, está activada de forma predeterminada
3 USER_DELETE Capturar eliminaciones
4 TTL_DELETE Capturar los TTL del almacén transaccional

Si tiene que diferenciar los registros eliminados de TTL de los documentos eliminados por usuarios o aplicaciones, compruebe las opciones Capture intermediate updates y Capture Transactional store TTLs. A continuación, tiene que adaptar los procesos CDC, las aplicaciones o consultas para usar __usr_opType según lo necesario para sus necesidades empresariales.

Sugerencia

Si es necesario que los consumidores de bajada restauren el orden de las actualizaciones con la opción "capturar actualizaciones intermedias", el campo de marca de tiempo del sistema _ts se puede usar como campo de ordenación.

Creación y configuración de los valores de receptor para las operaciones de actualización y eliminación

En primer lugar, cree un receptor de Azure Blob Storage sencillo y luego configure el receptor para que filtre los datos a solo operaciones específicas.

  1. Cree una cuenta y un contenedor de Azure Blob Storage, si aún no los tiene. Para los ejemplos siguientes, usaremos una cuenta denominada msdocsblobstorage y un contenedor denominado output.

    Sugerencia

    Si es posible, cree la cuenta de almacenamiento en la misma región donde reside la cuenta de Azure Cosmos DB.

  2. De nuevo en Azure Data Factory, cree un nuevo receptor para la captura de datos modificados desde el origen cosmos.

    Captura de pantalla de la adición de un nuevo receptor conectado al origen existente.

  3. Asigne un nombre único al receptor. En este ejemplo, el receptor se denomina storage.

    Captura de pantalla de la nomenclatura del almacenamiento receptor recién creado.

  4. En la sección Tipo de receptor, seleccione Insertado. En el campo Conjunto de datos, seleccione Delta.

    Captura de pantalla de la selección y el tipo de conjunto de datos Delta insertado para el receptor.

  5. Cree un nuevo servicio vinculado para su cuenta mediante Azure Blob Storage denominado storagelinkedservice. Seleccione la cuenta existente de Azure Blob Storage en el cuadro de diálogo emergente Nuevo servicio vinculado, y luego seleccione Aceptar. En este ejemplo, se selecciona una cuenta de Azure Blob Storage existente denominada msdocsblobstorage.

    Captura de pantalla de las opciones de tipo de servicio para un nuevo servicio vinculado delta.

    Captura de pantalla del cuadro de diálogo Nuevo servicio vinculado con una cuenta de Azure Blob Storage seleccionada.

  6. Seleccione la pestaña Configuración.

  7. En Configuración, establezca la ruta de acceso de la carpeta según el nombre del contenedor de blobs. En este ejemplo, el nombre del contenedor es output.

    Captura de pantalla del contenedor de blobs denominado conjunto de resultados establecido como destino receptor.

  8. Busque la sección Método de actualización y cambie las selecciones para permitir solo las operaciones de eliminación y actualización. Además, especifique las Columnas de clave como una Lista de columnas usando el campo {_rid} como identificador único.

    Captura de pantalla de los métodos de actualización y las columnas de clave que se especifican para el receptor.

  9. Seleccione Validar para asegurarse de que no haya ningún error u omisión. A continuación, seleccione Publicar para publicar el flujo de datos.

    Captura de pantalla de la opción para validar y luego publicar el flujo de datos actual.

Programación de la ejecución de la captura de datos modificados

Una vez publicado un flujo de datos, puede agregar una nueva canalización para mover y transformar los datos.

  1. Crear una canalización. Asigne un nombre único a la canalización. En este ejemplo, la canalización se denomina cosmoscdcpipeline.

    Captura de pantalla de la nueva opción de canalización en la sección de recursos.

  2. En la sección Actividades, expanda la opción Mover y transformar, y luego seleccione Flujo de datos.

    Captura de pantalla de la opción de actividad de flujo de datos dentro de la sección de actividades.

  3. Asigne un nombre único a la actividad de flujo de datos. En este ejemplo, la actividad se denomina cosmoscdcactivity.

  4. En la pestaña Configuración, seleccione el flujo de datos denominado cosmoscdc que ha creado anteriormente en esta guía. A continuación, seleccione un tamaño de proceso basado en el volumen de datos y la latencia necesaria para la carga de trabajo.

    Captura de pantalla de las opciones de configuración para el flujo de datos y el tamaño de proceso de la actividad.

    Sugerencia

    En el caso de los tamaños de datos incrementales mayores de 100 GB, se recomienda el tamaño personalizado con un recuento de núcleos de 32 (+16 núcleos de controladores).

  5. Seleccione Agregar desencadenador. Programe esta canalización para que se ejecute con una cadencia que sea adecuada para la carga de trabajo. En este ejemplo, la canalización está configurada para ejecutarse cada cinco minutos.

    Captura de pantalla del botón Agregar desencadenador para una nueva canalización.

    Captura de pantalla de una configuración de desencadenador basada en una programación, a partir del año 2023, que se ejecuta cada cinco minutos.

    Nota:

    La ventana de periodicidad mínima para las ejecuciones de captura de datos modificados es de un minuto.

  6. Seleccione Validar para asegurarse de que no haya ningún error u omisión. A continuación, seleccione Publicar para publicar la canalización.

  7. Observe los datos colocados en el contenedor de Azure Blob Storage como salida del flujo de datos mediante la captura de datos modificados del almacén analítico de Azure Cosmos DB.

    Captura de pantalla de los archivos de salida de la canalización en el contenedor de Azure Blob Storage.

    Nota:

    El tiempo de arranque inicial del clúster puede tardar hasta tres minutos. Para evitar el tiempo de inicio del clúster en las ejecuciones posteriores de captura de datos modificados, configure el valor Tiempo de vida del clúster de flujo de datos. Para más información sobre el entorno de ejecución de integración y el TTL, consulte Entorno de ejecución de integración en Azure Data Factory.

Trabajos simultáneos

El tamaño del lote en las opciones de origen, o las situaciones en las que el receptor tarda en ingerir el flujo de cambios, pueden provocar la ejecución de varios trabajos al mismo tiempo. Para evitar esta situación, establezca la opción Simultaneidad en 1 en la configuración de la canalización, para asegurarse de que no se desencadenan nuevas ejecuciones hasta que finalice la ejecución actual.

Pasos siguientes