Comparteix a través de


Lectura de entradas en cualquier formato mediante deserializadores personalizados de .NET (versión preliminar)

Importante

El deserializador personalizado de .net para Azure Stream Analytics se retirará el 30 de septiembre de 2024. Después de esa fecha, no será posible usar la característica. Por favor, pase a un deserializador incorporado JSON, AVRO o CSV para esa fecha.

Los deserializadores de .NET personalizados permiten que el trabajo de Azure Stream Analytics lea los datos en formatos distintos de los tres formatos de datos integrados. En este artículo se explica el formato de serialización y las interfaces que definen los deserializadores de .NET personalizados para trabajos perimetrales y en la nube de Azure Stream Analytics. También hay deserializadores de ejemplo para el búfer de protocolo y el formato CSV.

Deserializador personalizado de .NET

Los siguientes ejemplos de código son las interfaces que definen al deserializador personalizado e implementan StreamDeserializer<T>.

UserDefinedOperator es la clase base para todos los operadores de streaming personalizados. Inicializa StreamingContext, que proporciona contexto, incluido el mecanismo para publicar diagnósticos para los que necesitará depurar cualquier problema con el deserializador.

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

El siguiente fragmento de código es la deserialización de los datos de streaming.

Los errores que se pueden omitir deben emitirse mediante el elemento IStreamingDiagnostics pasado a través del método de inicialización de UserDefinedOperator. Todas las excepciones se tratarán como errores y se volverá a crear el deserializador. Después de un número determinado de errores, el trabajo pasará a un estado de error.

StreamDeserializer<T> deserializa una secuencia en un objeto de tipo T. Se deben cumplir las condiciones siguientes:

  1. T es una clase o una estructura.
  2. Todos los campos públicos de T son
    1. Uno de los valores [sbyte, byte, corto, ushort, entero, uint, largo, DateTime, cadena, float, doble] o sus equivalentes que aceptan valores NULL.
    2. Otra estructura o clase que sigue las mismas reglas.
    3. Matriz de tipo T2 que sigue las mismas reglas.
    4. IListT2, donde T2 aplica las mismas reglas.
    5. No tiene ningún tipo recursivo.

El parámetro stream es la secuencia que contiene el objeto serializado. Deserialize devuelve una colección de instancias de T.

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContext proporciona el contexto, incluido el mecanismo para publicar diagnósticos para el operador de usuario.

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics es el diagnóstico de los operadores definidos por el usuario, incluidos el serializador, el deserializador y las funciones definidas por el usuario.

WriteError escribe un mensaje de error en los registros de recursos y envía el error al diagnóstico.

briefMessage es un breve mensaje de error. Este mensaje se muestra en los diagnósticos y el equipo del producto lo usa para la depuración. No incluya información confidencial y mantenga la longitud del mensaje por debajo de 200 caracteres.

detailedMessage es un mensaje de error detallado que solo se agrega a los registros de recursos en el almacenamiento. El mensaje debe tener menos de 2000 caracteres.

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

Ejemplos de deserializador

En esta sección se muestra cómo codificar deserializadores personalizados para Protobuf y CSV. Para ver ejemplos adicionales, como el formato AVRO para Event Hubs Capture, visite Azure Stream Analytics en GitHub.

Formato de búfer de protocolo (Protobuf)

Este es un ejemplo que usa el formato de búfer de protocolo.

Asuma la siguiente definición de búfer de protocolo.

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

La ejecución de protoc.exe desde el paquete NuGet Google.Protobuf.Tools genera un archivo. CS con la definición. El archivo generado no se muestra aquí. Debe asegurarse de que la versión de Protobuf NuGet que usa en el proyecto Stream Analytics coincide con la versión de Protobuf que se usó para generar la entrada.

El siguiente fragmento de código es la implementación del deserializador, suponiendo que el archivo generado se incluye en el proyecto. Esta implementación es simplemente un contenedor fino sobre el archivo generado.

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

El siguiente fragmento de código es un deserializador de CSV sencillo que también muestra errores propagados.

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

Formato de serialización para las API REST

Cada entrada de Stream Analytics tiene un formato de serialización. Para obtener más información sobre las opciones de entrada, consulte la documentación sobre la API REST de entradas.

El siguiente código de JavaScript es un ejemplo del formato de serialización del deserializador de .NET cuando se usa la API REST:

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName debe ser una clase que implemente StreamDeserializer<T>. Estos pasos se describen en la siguiente sección.

Regiones admitidas

Esta característica está disponible en las siguientes regiones cuando se usa la SKU estándar:

  • Centro-Oeste de EE. UU.
  • Norte de Europa
  • Este de EE. UU.
  • Oeste de EE. UU.
  • Este de EE. UU. 2
  • Oeste de Europa

Puede solicitar al equipo de soporte técnico que agregue otras regiones. Sin embargo, no existe esta restricción de región al usar clústeres de Stream Analytics.

Preguntas más frecuentes

¿Cuándo estará disponible esta característica en todas las regiones de Azure?

Esta característica está disponible en 6 regiones. Si está interesado en usar esta funcionalidad en cualquier otra región, puede enviar una solicitud. La compatibilidad con todas las regiones de Azure está en el plan de desarrollo.

¿Puedo acceder a MetadataPropertyValue desde mis entradas de forma similar a la función GetMetadataPropertyValue?

Esta funcionalidad no se admite. Si necesita esta funcionalidad, puede votar por esta solicitud en UserVoice.

¿Puedo compartir mi implementación de deserializador con la comunidad para que otros puedan beneficiarse?

Una vez que haya implementado el deserializador, puede ayudar a otros usuarios al compartirlo en la comunidad. Envíe el código al repositorio de GitHub de Azure Stream Analytics.

¿Cuáles son las otras limitaciones de usar deserializadores personalizados en Stream Analytics?

Si la entrada tiene el formato Protobuf con un esquema que contiene el tipo MapField, no podrá implementar un deserializador personalizado. Además, los deserializadores personalizados no admiten datos de muestra ni de vista previa.

Pasos siguientes