Ler entradas em qualquer formato usando desserializadores .NET personalizados (versão prévia)

Importante

O desserializador do .NET personalizado para o Azure Stream Analytics será desativado em 30 de setembro de 2024. Após essa data, não será possível usar o recurso. Faça a transição para um desserializador interno de JSON, AVRO ou CSV até essa data.

Os desserializadores .NET personalizados permitem que o trabalho do Azure Stream Analytics leia dados de formatos fora dos três formatos de dados internos. Este artigo explica o formato de serialização e as interfaces que definem desserializadores .NET personalizados para trabalhos de nuvem e borda do Azure Stream Analytics. Também há exemplos de desserializadores para buffer de protocolo e formato CSV.

Desserializador .NET personalizado

Os exemplos de código a seguir são as interfaces que definem o desserializador personalizado e implementam o StreamDeserializer<T>.

UserDefinedOperator é a classe base para todos os operadores de streaming personalizados. Ele inicializa o StreamingContext, que fornece um contexto que inclui o mecanismo de publicação de diagnósticos cujos problemas você precisará depurar com o desserializador.

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

O trecho de código a seguir é a desserialização para streaming de dados.

Os erros ignoráveis devem ser emitidos usando IStreamingDiagnostics passado pelo método Initialize do UserDefinedOperator. Todas as exceções serão tratadas como erros, e o desserializador será recriado. Após alguns de erros, o trabalho irá para um status de falha.

StreamDeserializer<T> desserializa um fluxo em um objeto do tipo T. As seguintes condições devem ser atendidas:

  1. T é uma classe ou struct.
  2. Todos os campos públicos em T são
    1. Um de [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double] ou seus equivalentes anuláveis.
    2. Outra struct ou classe seguindo as mesmas regras.
    3. Matriz do tipo T2 que segue as mesmas regras.
    4. IListT2 em que T2 segue as mesmas regras.
    5. Não tem nenhum tipo recursivo.

O parâmetro stream é o fluxo que contém o objeto serializado. Deserialize retorna um conjunto de instâncias de T.

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContext fornece o contexto que inclui o mecanismo de publicação de diagnóstico do operador de usuário.

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics é o diagnóstico para operadores definidos pelo usuário, incluindo serializador, desserializador e funções definidas pelo usuário.

WriteError grava uma mensagem de erro nos logs de recursos e envia o erro para diagnóstico.

briefMessage é uma breve mensagem de erro. Essa mensagem aparece no diagnóstico e é usada pela equipe do produto para fins de depuração. Não inclua informações confidenciais e mantenha a mensagem com menos de 200 caracteres

detailedMessage é uma mensagem de erro detalhada que só é adicionada aos logs de recursos em seu armazenamento. A mensagem deve ter menos de 2000 caracteres.

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

Exemplos de desserializador

Esta seção mostra como gravar desserializadores personalizados para Protobuf e CSV. Para ver mais exemplos, como o formato AVRO para a Captura dos Hubs de Eventos, visite Azure Stream Analytics no GitHub.

Formato de buffer de protocolo (Protobuf)

Este é um exemplo que usa o formato de buffer de protocolo.

Suponha a seguinte definição de buffer de protocolo.

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

A execução de protoc.exe do NuGet Google.Protobuf.Tools gera um arquivo .cs com a definição. O arquivo gerado não aparece aqui. Você precisa garantir que a versão do Protobuf NuGet usada no projeto do Stream Analytics corresponda à versão do Protobuf usada para gerar a entrada.

O trecho de código a seguir é a implementação do desserializador, supondo que o arquivo gerado esteja incluído no projeto. Essa implementação é apenas um wrapper fino no arquivo gerado.

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

O trecho de código a seguir é um desserializador CSV simples que também demonstra erros de propagação.

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

Formato de serialização para APIs REST

Cada entrada do Azure Stream Analytics tem um formato de serialização. Para obter mais informações sobre opções de entrada, consulte a documentação API REST de entrada.

O seguinte código JavaScript é um exemplo do formato de serialização do desserializador do .NET ao usar a API REST:

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName deve ser uma classe que implementa StreamDeserializer<T>. Isso é descrito na seção a seguir.

Suporte de regiões

Esse recurso está disponível nas seguintes regiões ao usar o SKU padrão:

  • Centro-Oeste dos EUA
  • Norte da Europa
  • Leste dos EUA
  • Oeste dos EUA
  • Leste dos EUA 2
  • Europa Ocidental

Você pode solicitar suporte para mais regiões. No entanto, não há nenhuma restrição de região ao usar os clusters do Stream Analytics.

Perguntas frequentes

Quando esse recurso estará disponível em todas as regiões do Azure?

Esse recurso está disponível em 6 regiões. Se quiser usar essa funcionalidade em outra região, envie uma solicitação. Estamos trabalhando para oferecer suporte a todas as regiões do Azure.

Posso acessar o MetadataPropertyValue de minhas entradas semelhantes à função GetMetadataPropertyValue?

Não há suporte para essa funcionalidade. Se você precisar desse recurso, vote nessa solicitação em UserVoice.

Posso compartilhar minha implementação de desserializador com a comunidade para que outras pessoas se beneficiem?

Depois de implementar o desserializador, você poderá ajudar outras pessoas compartilhando-o com a comunidade. Envie seu código para o repositório do Azure Stream Analytics no GitHub.

Quais são as outras limitações do uso de desserializadores personalizados no Stream Analytics?

Se a entrada for do formato Protobuf com um esquema que contenha o tipo MapField, você não poderá implementar um desserializador personalizado. Além disso, os desserializadores personalizados não dão suporte a dados de exemplo ou dados de visualização.

Próximas etapas