Odczytywanie danych wejściowych w dowolnym formacie przy użyciu niestandardowych deserializacji platformy .NET (wersja zapoznawcza)

Ważne

Deserializator niestandardowy platformy .net dla usługi Azure Stream Analytics zostanie wycofany 30 września 2024 r. Po tej dacie nie będzie można używać tej funkcji. Przejdź do wbudowanego deserializacji plików JSON, AVRO lub CSV według tej daty.

Niestandardowe deserializatory platformy .NET umożliwiają zadaniu usługi Azure Stream Analytics odczytywanie danych z formatów spoza trzech wbudowanych formatów danych. W tym artykule wyjaśniono format serializacji i interfejsy definiujące niestandardowe deserializatory platformy .NET dla zadań chmury i krawędzi usługi Azure Stream Analytics. Istnieją również przykładowe deserializatory dla buforu protokołu i formatu CSV.

Deserializator niestandardowy platformy .NET

Poniższe przykłady kodu to interfejsy, które definiują niestandardowy deserializator i implementują element StreamDeserializer<T>.

UserDefinedOperator jest klasą bazową dla wszystkich niestandardowych operatorów przesyłania strumieniowego. Inicjuje StreamingContextelement , który udostępnia kontekst, który zawiera mechanizm publikowania diagnostyki, dla którego należy debugować wszelkie problemy z deserializacji.

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

Poniższy fragment kodu to deserializacja danych przesyłanych strumieniowo.

Błędy pomijalne powinny być emitowane UserDefinedOperatorprzy użyciu IStreamingDiagnostics metody inicjowania przekazywanej. Wszystkie wyjątki będą traktowane jako błędy, a deserializator zostanie utworzony ponownie. Po wystąpieniu niektórych błędów zadanie przejdzie do stanu niepowodzenia.

StreamDeserializer<T> deserializuje strumień do obiektu typu T. Muszą zostać spełnione następujące warunki:

  1. T jest klasą lub strukturą.
  2. Wszystkie pola publiczne w języku T są albo
    1. Jeden z [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double] lub ich odpowiedniki dopuszczane do wartości null.
    2. Inna struktura lub klasa postępująca zgodnie z tymi samymi regułami.
    3. Tablica typu T2 , która jest zgodna z tymi samymi regułami.
    4. IListT2 gdzie T2 jest zgodny z tymi samymi regułami.
    5. Nie ma żadnych typów cyklicznych.

Parametr stream jest strumieniem zawierającym serializowany obiekt. Deserialize zwraca kolekcję T wystąpień.

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContext udostępnia kontekst, który obejmuje mechanizm publikowania diagnostyki dla operatora użytkownika.

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics to diagnostyka operatorów zdefiniowanych przez użytkownika, w tym serializatora, deserializacji i funkcji zdefiniowanych przez użytkownika.

WriteError Zapisuje komunikat o błędzie w dziennikach zasobów i wysyła błąd do diagnostyki.

briefMessage to krótki komunikat o błędzie. Ten komunikat jest wyświetlany w diagnostyce i jest używany przez zespół produktu do celów debugowania. Nie dołączaj informacji poufnych i nie umieszczaj wiadomości mniej niż 200 znaków

detailedMessage to szczegółowy komunikat o błędzie, który jest dodawany tylko do dzienników zasobów w magazynie. Ta wiadomość powinna być mniejsza niż 2000 znaków.

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

Przykłady deserializacji

W tej sekcji przedstawiono sposób pisania niestandardowych deserializatorów dla plików Protobuf i CSV. Aby uzyskać więcej przykładów, takich jak format AVRO dla przechwytywania usługi Event Hubs, odwiedź stronę Azure Stream Analytics w witrynie GitHub.

Format buforu protokołu (Protobuf)

Jest to przykład użycia formatu buforu protokołu.

Przyjmij następującą definicję buforu protokołu.

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

Uruchomienie protoc.exe z narzędzia Google.Protobuf.Tools NuGet generuje plik .cs z definicją. Wygenerowany plik nie jest tutaj wyświetlany. Upewnij się, że wersja narzędzia NuGet używanego w projekcie usługi Stream Analytics jest zgodna z wersją protobuf, która została użyta do wygenerowania danych wejściowych.

Poniższy fragment kodu to implementacja deserializacji przy założeniu, że wygenerowany plik jest uwzględniony w projekcie. Ta implementacja jest po prostu cienką otoką wygenerowanego pliku.

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

Poniższy fragment kodu to prosty deserializator CSV, który pokazuje również propagację błędów.

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

Format serializacji dla interfejsów API REST

Każde dane wejściowe usługi Stream Analytics ma format serializacji. Aby uzyskać więcej informacji na temat opcji wejściowych, zobacz dokumentację interfejsu API REST danych wejściowych.

Poniższy kod JavaScript jest przykładem formatu serializacji deserializatora platformy .NET podczas korzystania z interfejsu API REST:

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName powinna być klasą implementającą StreamDeserializer<T>element . Opisano to w poniższej sekcji.

Obsługa regionów

Ta funkcja jest dostępna w następujących regionach w przypadku korzystania z jednostki SKU w warstwie Standardowa:

  • Zachodnio-środkowe stany USA
  • Europa Północna
  • Wschodnie stany USA
  • Zachodnie stany USA
  • Wschodnie stany USA 2
  • West Europe

Możesz poprosić o pomoc techniczną dla większej liczby regionów. Nie ma jednak takiego ograniczenia regionu w przypadku korzystania z klastrów usługi Stream Analytics.

Często zadawane pytania

Kiedy ta funkcja będzie dostępna we wszystkich regionach świadczenia usługi Azure?

Ta funkcja jest dostępna w 6 regionach. Jeśli interesuje Cię użycie tej funkcji w innym regionie, możesz przesłać żądanie. Obsługa wszystkich regionów platformy Azure znajduje się w harmonogramie działania.

Czy mogę uzyskać dostęp do właściwości MetadataPropertyValue z moich danych wejściowych podobnych do funkcji GetMetadataPropertyValue?

Ta funkcja nie jest obsługiwana. Jeśli potrzebujesz tej możliwości, możesz głosować na to żądanie w witrynie UserVoice.

Czy mogę udostępnić swoją implementację deserializacji społeczności, aby inni mogli korzystać?

Po zaimplementowaniu deserializatora możesz pomóc innym osobom, udostępniając je społeczności. Prześlij kod do repozytorium GitHub usługi Azure Stream Analytics.

Jakie są inne ograniczenia dotyczące używania deserializacji niestandardowych w usłudze Stream Analytics?

Jeśli dane wejściowe mają format Protobuf ze schematem zawierającym MapField typ, nie będzie można zaimplementować niestandardowego deserializacji. Ponadto niestandardowe deserializatory nie obsługują przykładowych danych ani danych podglądu.

Następne kroki