Uso de datos de referencia para las búsquedas en Stream Analytics

Los datos de referencia son un conjunto de datos finito que son estáticos o que cambian lentamente por naturaleza. Se usa para realizar una búsqueda o para aumentar los flujos de datos. Los datos de referencia también se conocen como tabla de búsqueda.

Tome un escenario de IoT como ejemplo. Puede almacenar los metadatos sobre los sensores, que no cambian con frecuencia, en los datos de referencia. Después, podría combinarlos con los flujos de datos de IoT en tiempo real.

Azure Stream Analytics carga los datos de referencia en la memoria para lograr un procesamiento de flujos de baja latencia. Para usar los datos de referencia en el trabajo de Stream Analytics, por lo general usará una cláusula JOIN de datos de referencia en la consulta.

Ejemplo

Puede generar un flujo de eventos en tiempo real cuando los automóviles pasen por una cabina de peaje. La cabina de peaje puede capturar las matrículas en tiempo real. Estos datos se pueden combinar con un conjunto de datos estático que tiene detalles de registro para identificar las matrículas que han expirado.

SELECT I1.EntryTime, I1.LicensePlate, I1.TollId, R.RegistrationId  
FROM Input1 I1 TIMESTAMP BY EntryTime  
JOIN Registration R  
ON I1.LicensePlate = R.LicensePlate  
WHERE R.Expired = '1'

Stream Analytics admite Azure Blob Storage, Azure Data Lake Storage Gen2 y Azure SQL Database como capa de almacenamiento de los datos de referencia. Si tiene los datos de referencia en otros almacenes de datos, intente usar Azure Data Factory para extraer, transformar y cargar los datos en uno de los almacenes de datos admitidos. Para obtener más información, consulte Información general de la copia de actividad con Azure Data Factory.

Azure Blob Storage o Azure Data Lake Storage Gen2

Los datos de referencia se modelan como una secuencia de blobs en orden ascendente por la fecha y hora especificada en el nombre del blob. Solo se pueden agregar blobs al final de la secuencia con una fecha y hora mayor que la especificada por el último blob de la secuencia. Los blobs se definen en la configuración de entrada.

Para más información, consulte Protección de datos en Azure Stream Analytics.

Configuración de los datos de referencia de blob

Para configurar los datos de referencia, tiene que crear primero una entrada que sea del tipo datos de referencia. En la siguiente tabla, se explica cada propiedad que tendrá que proporcionar al crear la entrada de los datos de referencia con su descripción.

Nombre de propiedad Descripción
Alias de entrada Nombre descriptivo utilizado en la consulta del trabajo para hacer referencia a esta entrada.
Cuenta de almacenamiento El nombre de la cuenta de almacenamiento donde se encuentran los blobs. Si está en la misma suscripción que el trabajo de Stream Analytics, selecciónela en el menú desplegable.
Clave de cuenta de almacenamiento La clave secreta asociada con la cuenta de almacenamiento. Esta clave se rellena automáticamente si la cuenta de almacenamiento está en la misma suscripción que el trabajo de Stream Analytics.
Contenedor de almacenamiento Los contenedores proporcionan una agrupación lógica para los blobs almacenados en Blob Storage. Cuando carga un blob a Blob Storage, debe especificar un contenedor para ese blob.
Patrón de la ruta de acceso Esta propiedad obligatoria se usa para ubicar los blobs dentro del contenedor especificado. Dentro de la ruta de acceso, puede elegir especificar una o más instancias de las variables {date} y {time}.
Ejemplo 1: products/{date}/{time}/product-list.csv
Ejemplo 2: products/{date}/product-list.csv
Ejemplo 3: product-list.csv

Si el blob no existe en la ruta de acceso especificada, el trabajo de Stream Analytics esperará indefinidamente a que el blob esté disponible.
Formato de fecha [opcional] Si ha usado {date} dentro del patrón de ruta de acceso especificado, seleccione el formato de fecha en el que están organizados los blobs en la lista desplegable de formatos admitidos.
Ejemplo: AAAA/MM/DD o MM/DD/AAAA
Formato de hora [opcional] Si ha usado {time} dentro del patrón de ruta de acceso especificado, seleccione el formato de hora en el que están organizados los blobs en la lista desplegable de formatos admitidos.
Ejemplo: HH, HH/mm o HH: mm
Formato de serialización de eventos Para asegurarse de que las consultas funcionen como se espera, Stream Analytics debe saber cuál es el formato de serialización que usa para los flujos de datos entrantes. Para los datos de referencia, los formatos admitidos son CSV y JSON.
Encoding Por el momento, UTF-8 es el único formato de codificación compatible.

Datos de referencia estáticos

Es posible que los datos de referencia no cambien. Para habilitar la compatibilidad con datos de referencia estáticos, especifique una ruta de acceso estática en la configuración de entrada.

Stream Analytics selecciona el blob desde la ruta de acceso especificada. Los tokens de sustitución {date} y {time} no son necesarios. Debido a que los datos de referencia son inmutables en Stream Analytics, no se recomienda sobrescribir un blob de datos de referencia estáticos.

Generación de datos de referencia en una programación

Los datos de referencia pueden ser un conjunto de datos de cambio lento. Para actualizar los datos de referencia, especifique un patrón de ruta de acceso en la configuración de entrada mediante los tokens de sustitución {date} y {time}. Stream Analytics selecciona las definiciones actualizadas de los datos de referencia según este patrón de ruta de acceso.

Por ejemplo, el patrón sample/{date}/{time}/products.csv con el formato de fecha AAAA-MM-DD y el formato de hora HH-mm indica a Stream Analytics que seleccione el blob actualizado sample/2015-04-16/17-30/products.csv a las 5:30 p.m. UTC del 16 de abril de 2015.

Azure Stream Analytics examina automáticamente los blobs de datos de referencia actualizados en un intervalo de un minuto. Un blob con la marca de tiempo 10:30:00 podría cargarse con un pequeño retraso, por ejemplo, a las 10:30:30. Observará un pequeño retraso en el trabajo de Stream Analytics que hace referencia a este blob.

Para evitar estos escenarios, cargue el blob antes de la hora efectiva de destino, que son las 10:30:00 en este ejemplo. El trabajo de Stream Analytics ahora tiene tiempo suficiente para detectar y cargar el blob en memoria y realizar operaciones.

Nota:

Actualmente, los trabajos de Stream Analytics buscan la actualización del blob solo cuando la hora de la máquina avanza hasta la hora codificada en el nombre del blob. Por ejemplo, el trabajo busca sample/2015-04-16/17-30/products.csv tan pronto como sea posible pero no antes de las 5:30 p.m. UTC del 16 de abril de 2015. No buscará nunca un blob con una hora codificada anterior a la última detectada.

Por ejemplo, después de que el trabajo encuentre el blob sample/2015-04-16/17-30/products.csv, omite los archivos con una fecha codificada anterior a las 5:30 p. m. del 16 de abril de 2015. Si se crea el blob sample/2015-04-16/17-25/products.csv de forma tardía en el mismo contenedor, el trabajo no lo usará.

En otro ejemplo, sample/2015-04-16/17-30/products.csv solo se produce el 16 de abril de 2015 a las 10:03 p. m., pero no hay ningún blob con una fecha anterior en el contenedor. A continuación, el trabajo usa este archivo a partir del 16 de abril de 2015 a las 10:03 p. m. y usa los datos de referencia anteriores hasta entonces.

Se produce una excepción a este comportamiento cuando el trabajo tiene que volver a procesar los datos retrocediendo en el tiempo o cuando el trabajo se inicia por primera vez.

En la hora de inicio, el trabajo busca el blob más reciente generado antes de la hora de inicio del trabajo especificada. Este comportamiento garantiza que haya un conjunto de datos de referencia no vacío cuando se inicie el trabajo. Si no se encuentra ninguno, el trabajo muestra el diagnóstico siguiente: Initializing input without a valid reference data blob for UTC time <start time>.

Cuando se actualiza un conjunto de datos de referencia, se genera un registro de diagnóstico: Loaded new reference data from <blob path>. Por muchas razones, es posible que un trabajo deba volver a cargar un conjunto de datos de referencia anterior. A menudo, el motivo es volver a procesar los datos anteriores. En ese momento, se genera el mismo registro de diagnóstico. Esta acción no implica que los datos del flujo actual usen datos de referencia anteriores.

Azure Data Factory puede usarse para organizar la tarea de crear los blobs actualizados requeridos por Stream Analytics para actualizar las definiciones de datos de referencia.

Factoría de datos es un servicio de integración de datos basado en la nube que organiza y automatiza el movimiento y la transformación de datos. Data Factory admite la conexión a un gran número de almacenes de datos locales y en la nube. Puede mover los datos fácilmente según una programación regular que especifique.

Para más información sobre cómo configurar una canalización de Data Factory para generar datos de referencia para Stream Analytics que se actualiza según una programación predefinida, consulte este ejemplo de GitHub.

Sugerencias sobre cómo actualizar los datos de referencia de blob

  • No sobrescriba los blobs de datos de referencia, ya que son inmutables.
  • El método recomendado para actualizar los datos de referencia es:
    • Utilizar {date} y {time} en el patrón de la ruta de acceso.
    • Agregar un nuevo blob con el mismo contenedor y patrón de ruta de acceso definido en la entrada del trabajo.
    • Usar una fecha y hora mayor que la especificada en el último blob de la secuencia.
  • Los blobs de datos de referencia no se ordenan por la hora de la última modificación del blob. Solo se ordenan por la fecha y hora especificadas en el nombre del blob mediante las sustituciones {date} y {time}.
  • Para evitar tener que enumerar un gran número de blobs, elimine los blobs antiguos para los que ya no se va a realizar un procesamiento. Es posible que Stream Analytics tenga que volver a procesar una pequeña cantidad en algunos escenarios, como un reinicio.

Azure SQL Database

El trabajo de Stream Analytics recupera los datos de referencia de SQL Database y los almacena como una instantánea en memoria para su procesamiento. La instantánea de los datos de referencia también se almacena en un contenedor de una cuenta de almacenamiento. Especifique la cuenta de almacenamiento en las opciones de configuración.

El contenedor se crea automáticamente cuando se inicia el trabajo. Si el trabajo se detiene o entra en un estado de error, se eliminan los contenedores creados automáticamente cuando se reinicia el trabajo.

Si los datos de referencia son un conjunto de datos de cambio lento, deberá actualizar periódicamente la instantánea que se usa en el trabajo.

Con Stream Analytics, puede establecer una frecuencia de actualización al configurar la conexión de entrada de SQL Database. El entorno de ejecución de Stream Analytics consulta la instancia de SQL Database con el intervalo especificado por la frecuencia de actualización. La frecuencia de actualización más rápida que se admite es de una vez por minuto. Para cada actualización, Stream Analytics almacena una nueva instantánea en la cuenta de almacenamiento proporcionada.

Stream Analytics proporciona dos opciones para consultar la instancia de SQL Database. Una consulta de instantánea es obligatoria y debe incluirse en cada trabajo. Stream Analytics ejecuta la consulta de la instantánea de forma periódica en función del intervalo de actualización. Usa el resultado de la consulta (la instantánea) como conjunto de datos de referencia.

La consulta de la instantánea debe ajustarse a la mayoría de los escenarios. Si tiene problemas de rendimiento con conjuntos de datos grandes y frecuencias de actualización rápidas, utilice la opción de consulta delta. Las consultas que tarden más de 60 segundos en devolver un conjunto de datos de referencia darán como resultado un error de tiempo de espera.

Con la opción de consulta delta, Stream Analytics ejecuta la consulta de la instantánea inicialmente para obtener un conjunto de datos de referencia de línea base. Después, Stream Analytics ejecuta la consulta delta periódicamente en función del intervalo de actualización para recuperar los cambios incrementales. Estos cambios incrementales se aplican de manera continua al conjunto de datos de referencia para mantenerlo actualizado. El uso de una consulta delta puede ayudar a reducir los costos de almacenamiento y las operaciones de E/S de la red.

Configuración de los datos de referencia de SQL Database

Para configurar los datos de referencia de SQL Database, tiene que crear primero una entrada de datos de referencia. En la siguiente tabla, se explica cada propiedad que tendrá que proporcionar al crear la entrada de los datos de referencia con su descripción. Para más información, consulte Uso de datos de referencia de una instancia de SQL Database para un trabajo de Azure Stream Analytics.

Puede usar Instancia administrada de Azure SQL como entrada de datos de referencia. Debe configurar un punto de conexión público en SQL Managed Instance. A continuación, configure manualmente las siguientes opciones en Stream Analytics. También se admite una máquina virtual de Azure que ejecute SQL Server con una base de datos adjunta mediante la configuración manual de estas opciones.

Nombre de propiedad Descripción
Alias de entrada Nombre descriptivo utilizado en la consulta del trabajo para hacer referencia a esta entrada.
Subscription Su suscripción.
Base de datos Instancia de SQL Database que contiene los datos de referencia. Para SQL Managed Instance, es necesario especificar el puerto 3342. Un ejemplo es sampleserver.public.database.windows.net,3342.
Nombre de usuario Nombre de usuario asociado a la instancia de SQL Database.
Contraseña Contraseña asociada a la instancia de SQL Database.
Actualizar periódicamente Esta opción le permite seleccionar una frecuencia de actualización. Seleccione Activado para especificar la frecuencia de actualización en formato DD:HH:MM.
Consulta de instantánea Esta opción de consulta predeterminada recupera los datos de referencia de la instancia de SQL Database.
Consulta delta Para escenarios avanzados con conjuntos de datos grandes y una frecuencia de actualización corta, agregue una consulta delta.

Limitación del tamaño

Utilice conjuntos de datos de referencia inferiores a 300 MB para obtener el mejor rendimiento. En los trabajos con 6 unidades de streaming o más, se admiten conjuntos de datos de referencia de hasta 5 GB. El uso de un conjunto de datos de referencia grande puede afectar a la latencia integral del trabajo.

La complejidad de las consultas puede aumentar para incluir el procesamiento con estado, como agregados en ventanas, combinaciones temporales y funciones analíticas temporales. Cuando aumenta la complejidad, disminuye el tamaño máximo admitido de los datos de referencia.

Si Stream Analytics no puede cargar los datos de referencia y realizar operaciones complejas, el trabajo se quedará sin memoria y producirá un error. En tales casos, la métrica de porcentaje de uso de las unidades de streaming alcanza el 100 %.

Número de unidades de streaming Tamaño recomendado
1 50 MB, o menos
3 150 MB, o menos
6 y más 5 GB, o menos

La compatibilidad con la compresión no está disponible para los datos de referencia. En el caso de los conjuntos de datos de referencia de más de 300 MB, utilice SQL Database como origen con la opción de consulta delta para obtener un rendimiento óptimo. Si no se usa la consulta delta en estos escenarios, se ven picos en la métrica de retraso de la marca de agua cada vez que se actualice el conjunto de datos de referencia.

Combinación de varios conjuntos de datos de referencia en un trabajo

Solo puede unir una entrada de datos de referencia a una entrada de streaming. Para combinar varios conjuntos de datos de referencia, desglose la consulta en varios pasos. Este es un ejemplo:

With Step1 as (
    --JOIN input stream with reference data to get 'Desc'
    SELECT streamInput.*, refData1.Desc as Desc
    FROM    streamInput
    JOIN    refData1 ON refData1.key = streamInput.key 
)
--Now Join Step1 with second reference data
SELECT *
INTO    output 
FROM    Step1
JOIN    refData2 ON refData2.Desc = Step1.Desc 

Trabajos de IoT Edge

Solo se admiten datos de referencia locales para los trabajos perimetrales de Stream Analytics. Cuando se implementa un trabajo en un dispositivo IoT Edge, este carga los datos de referencia de la ruta de acceso de archivo definida por el usuario. Tenga un archivo de datos de referencia preparado en el dispositivo.

Para un contenedor de Windows, coloque el archivo de datos de referencia en la unidad local y comparta la unidad local con el contenedor de Docker. Para un contenedor de Linux, cree un volumen de Docker y rellene el archivo de datos en el volumen.

En una actualización de IoT Edge, los datos de referencia se desencadenan mediante una implementación. Una vez desencadenada, el módulo de Stream Analytics toma los datos actualizados sin detener el trabajo en ejecución.

Puede actualizar los datos de referencia de dos maneras:

  • Actualizar la ruta de acceso de los datos de referencia en el trabajo de Stream Analytics desde Azure Portal.
  • Actualizar la implementación de IoT Edge.

Pasos siguientes