Compartir vía


Actividad de Flujo de Datos en Azure Data Factory y Azure Synapse Analytics

SE APLICA A: Azure Data Factory Azure Synapse Analytics

Sugerencia

Data Factory en Microsoft Fabric es la próxima generación de Azure Data Factory, con una arquitectura más sencilla, inteligencia artificial integrada y nuevas características. Si no está familiarizado con la integración de datos, comience con Fabric Data Factory. Las cargas de trabajo de ADF existentes pueden actualizarse a Fabric para acceder a nuevas funcionalidades en ciencia de datos, análisis en tiempo real e informes.

Usa la actividad Data Flow para transformar y mover datos mediante flujos de datos de mapeo. Si no está familiarizado con los flujos de datos, consulte introducción a Mapping Data Flow

Creación de una actividad de Data Flow con la interfaz de usuario

Para usar una actividad de Data Flow en una canalización, complete los pasos siguientes:

  1. Busque Data Flow en el panel Actividades de canalización y arrastre una actividad de Data Flow al lienzo de la canalización.

  2. Seleccione la nueva actividad de Data Flow en el lienzo si aún no está seleccionada y su pestaña Settings para editar sus detalles.

    Muestra la interfaz de usuario para una actividad de flujo de datos.

  3. La clave de punto de control se utiliza para establecer el punto de control cuando el flujo de datos se emplea para la captura de datos modificados. Esta puede sobrescribirse. Las actividades del flujo de datos usan un valor GUID como clave de punto de control, en lugar de una combinación de los nombres de la canalización y la actividad, para que esta pueda hacer un seguimiento del estado de la captura de datos modificados del cliente a pesar de cualquier acción de cambio de nombre. Toda la actividad de flujo de datos ya existente usa la clave del patrón anterior para mantener la compatibilidad con versiones anteriores. A continuación se muestra la opción "clave de punto de control" después de publicar una nueva actividad de flujo de datos que se ha obtenido a partir de un recurso de flujo de datos para el cual se ha habilitado la función de captura de datos modificados.

    Muestra la interfaz de usuario para una actividad de flujo de datos con clave de punto de control.

  4. Seleccione un flujo de datos existente o cree uno nuevo con el botón Nuevo. Seleccione otras opciones según sea necesario para completar la configuración.

Sintaxis

{
    "name": "MyDataFlowActivity",
    "type": "ExecuteDataFlow",
    "typeProperties": {
      "dataflow": {
         "referenceName": "MyDataFlow",
         "type": "DataFlowReference"
      },
      "compute": {
         "coreCount": 8,
         "computeType": "General"
      },
      "traceLevel": "Fine",
      "runConcurrently": true,
      "continueOnError": true,      
      "staging": {
          "linkedService": {
              "referenceName": "MyStagingLinkedService",
              "type": "LinkedServiceReference"
          },
          "folderPath": "my-container/my-folder"
      },
      "integrationRuntime": {
          "referenceName": "MyDataFlowIntegrationRuntime",
          "type": "IntegrationRuntimeReference"
      }
}

Propiedades de tipo

Propiedad Descripción Valores permitidos Obligatorio
flujo de datos Referencia al flujo de datos que se está ejecutando DataFlowReference
integrationRuntime Entorno de proceso en el que se ejecuta el flujo de datos. Si no se especifica, se usa el entorno de ejecución de integración de Azure de resolución automática. IntegrationRuntimeReference No
compute.coreCount Número de núcleos utilizados en el clúster de Spark. Solo se puede especificar si se usa la resolución automática del entorno de ejecución de integración de Azure 8, 16, 32, 48, 80, 144, 272 No
compute.computeType Tipo de cómputo utilizado en el clúster de Spark. Solo se puede especificar si se usa el Azure Integration Runtime de autoresolve "General" No
staging.linkedService Si usa un origen o receptor de Azure Synapse Analytics, especifique la cuenta de almacenamiento que se usa para el almacenamiento provisional de PolyBase.

Si Azure Storage está configurado con el punto de conexión de servicio de red virtual, tiene que utilizar la autenticación de identidad administrada con la opción para permitir el servicio de Microsoft de confianza habilitada en la cuenta de almacenamiento; consulte Efectos del uso de puntos de conexión de servicio de la red virtual con Azure Storage. Aprenda también las configuraciones necesarias para Azure Blob y Azure Data Lake Storage Gen2 respectivamente.
LinkedServiceReference Solo si el flujo de datos lee o escribe en una instancia de Azure Synapse Analytics.
staging.folderPath Si usa un origen o un receptor de Azure Synapse Analytics, es la ruta de la carpeta de la cuenta de almacenamiento de blobs que se utiliza como almacenamiento provisional de PolyBase. Cadena Solo si el flujo de datos realiza lecturas o escrituras en Azure Synapse Analytics
traceLevel Establecimiento del nivel de registro de la ejecución de actividades de flujo de datos Fino, Grueso, Ninguno No

Execute Data Flow

Dinamice el tamaño del cálculo del flujo de datos en tiempo de ejecución

Las propiedades Recuento de núcleos y Tipo de proceso se pueden configurar dinámicamente para ajustarse al tamaño de los datos de origen entrantes en tiempo de ejecución. Use actividades de canalización como Búsqueda u Obtener metadatos para averiguar el tamaño de los datos del conjunto de datos de origen. A continuación, use Agregar contenido dinámico en las propiedades de la actividad Data Flow. Puede elegir entre tamaños de computación pequeños, medianos o grandes. Opcionalmente, elija "Personalizado" y configure manualmente los tipos de proceso y el número de núcleos.

Dynamic Data Flow

Este es un breve tutorial en formato de vídeo en el que se explica esta técnica

Integración de flujo de datos en tiempo de ejecución

Elija qué Integration Runtime usar para la ejecución de la actividad de Data Flow. De forma predeterminada, el servicio usa el entorno de ejecución de integración de Azure con resolución automática y cuatro núcleos de trabajo. Este IR tiene un tipo de computación de propósito general y se ejecuta en la misma región que la instancia de servicio. En el caso de las canalizaciones operativas, se recomienda encarecidamente crear sus propios entornos de ejecución de integración de Azure que definan regiones específicas, tipo de proceso, recuentos de núcleos y TTL para la ejecución de la actividad del flujo de datos.

La recomendación mínima aconsejada para la mayoría de las cargas de trabajo de producción es un tipo de computación mínimo de propósito general con una configuración de 8+8 (16 núcleos virtuales en total) y un Tiempo de vida (TTL) de 10 minutos. Al establecer un valor de TTL pequeño, Azure IR puede mantener un clúster semiactivo que no empleará los minutos de tiempo de inicio que un clúster en frío necesita. Para obtener más información, consulte Azure integration Runtime.

Azure Integration Runtime

Importante

La selección del entorno de ejecución de integración en la actividad de Data Flow solo se aplica a las ejecuciones activadas de su canalización. La canalización con flujos de datos se depurará en el clúster que se haya especificado en la sesión de depuración.

PolyBase

Si usa un Azure Synapse Analytics como receptor o origen, debe elegir una ubicación de almacenamiento provisional para la carga por lotes de PolyBase. PolyBase utiliza la carga por lotes en lugar de cargar los datos fila a fila. PolyBase reduce drásticamente el tiempo de carga en Azure Synapse Analytics.

Clave de punto de control

Si se usa la opción de captura de cambios para orígenes de flujo de datos, ADF mantiene y administra el punto de control automáticamente. La clave de punto de control predeterminada es un hash de los nombres del flujo de datos y la canalización. Si usa un patrón dinámico para las tablas o carpetas de origen, puede invalidar este hash y establecer aquí su propio valor de clave de punto de control.

Nivel de registro

Si no es necesario que cada ejecución de canalización de las actividades de flujo de datos anote completamente todos los registros de telemetría detallados, tiene la opción de establecer el nivel de registro en "Básico" o "Ninguno". Al ejecutar los flujos de datos en "modo Verbose" (por defecto), está solicitando al servicio que registre completamente la actividad a nivel de cada partición durante la transformación de los datos. Esta puede ser una operación costosa; por tanto, habilitar solo el modo detallado al solucionar problemas puede mejorar el flujo de datos y el rendimiento de la canalización en general. El modo "Básico" solo registra las duraciones de las transformaciones, mientras que "Ninguno" solo proporciona un resumen de las duraciones.

Nivel de registro

Propiedades del sumidero

La característica de agrupación de los flujos de datos permite establecer el orden de ejecución de los receptores y agruparlos con el mismo número de grupo. Para facilitar la administración de los grupos, puede pedir al servicio que ejecute los receptores, en el mismo grupo, en paralelo. También puede establecer que el grupo de receptores continúe incluso después de que uno de los receptores encuentre un error.

El comportamiento predeterminado de los receptores de flujo de datos es ejecutar cada receptor de forma secuencial y hacer que el flujo de datos falle cuando se encuentra un error en el receptor. Además, todos los receptores se establecen de forma predeterminada en el mismo grupo, a menos que vaya a las propiedades del flujo de datos y establezca otras prioridades para los receptores.

Propiedades del sumidero

Solo la primera fila

Esta opción solo está disponible para los flujos de datos que tienen los receptores de caché habilitados para la "salida a la actividad". La salida del flujo de datos que se inyecta directamente en la canalización está limitada a 2 MB. Establecer "solo la primera fila" le ayuda a limitar la salida de datos del flujo de datos al insertar la salida de la actividad de flujo de datos directamente en la canalización.

Flujos de datos con parámetros

Conjuntos de datos con parámetros

Si el flujo de datos utiliza conjuntos de datos con parámetros, establezca los valores de los parámetros en la pestaña Configuración.

Ejecutar Parámetros de Flujo de Datos

Flujos de datos con parámetros

Si el flujo de datos tiene parámetros, establezca los valores dinámicos de los parámetros de flujo de datos en la pestaña Parámetros. Puede usar el lenguaje de expresiones de canalización o el lenguaje de expresiones de flujo de datos para asignar valores de parámetros dinámicos o literales. Para obtener más información, vea Data Flow Parameters.

Propiedades computacionales parametrizadas.

Puede parametrizar el número de núcleos o el tipo de proceso si usa autoresolve Azure Integration Runtime y especifica valores para compute.coreCount y compute.computeType.

Ejecutar el Ejemplo de Parámetro de Flujo de Datos

Depuración de la canalización de actividades de Data Flow

Para ejecutar una canalización de depuración con una actividad de Data Flow, debe activar el modo de depuración del flujo de datos a través del control deslizante Data Flow Debug (Depuración de flujo de datos) en la barra superior. El modo de depuración permite ejecutar el flujo de datos en un clúster de Spark activo. Para más información, consulte Modo de depuración.

Captura de pantalla que muestra dónde está el botón Depurar

La canalización de depuración se ejecuta en el clúster de depuración activo, no en el entorno de ejecución de integración especificado en la configuración de la actividad de Data Flow. Puede elegir el entorno de proceso de depuración al iniciar el modo de depuración.

Supervisión de la actividad de Data Flow

La supervisión de la actividad de Data Flow se realiza de forma especial, ya que permite ver información sobre las particiones, el tiempo de cada fase y el linaje de datos. Abra el panel supervisión utilizando el icono con forma de gafas que encontrará en Acciones. Para más información, consulte Supervisión de flujos de datos.

Usar los resultados de la actividad de flujo de datos en una actividad subsiguiente

La actividad de flujo de datos genera métricas con respecto al número de filas escritas en cada receptor y las filas leídas de cada origen. Estos resultados se devuelven en la sección output del resultado de ejecución de la actividad. Las métricas devueltas están en el formato del siguiente código JSON.

{
    "runStatus": {
        "metrics": {
            "<your sink name1>": {
                "rowsWritten": <number of rows written>,
                "sinkProcessingTime": <sink processing time in ms>,
                "sources": {
                    "<your source name1>": {
                        "rowsRead": <number of rows read>
                    },
                    "<your source name2>": {
                        "rowsRead": <number of rows read>
                    },
                    ...
                }
            },
            "<your sink name2>": {
                ...
            },
            ...
        }
    }
}

Por ejemplo, para obtener el número de filas escritas en un receptor denominado "sink1" en una actividad denominada "dataflowActivity", use @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten.

Para obtener el número de filas leídas de un origen denominado "source1" que se usó en ese receptor, use @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead.

Nota

Si un receptor tiene cero filas escritas, no se mostrará en las métricas. Se puede comprobar la existencia con la función contains. Por ejemplo, contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') comprueba si se escribieron filas en sink1.

Consulte actividades de control de flujo admitidas: