Delen via


Invoer lezen in elke indeling met behulp van aangepaste .NET-deserializers (preview)

Belangrijk

Custom .net deserializer voor Azure Stream Analytics wordt op 30 september 2024 buiten gebruik gesteld. Na die datum is het niet mogelijk om de functie te gebruiken. Schakel op die datum over naar een ingebouwde JSON-, AVRO- of CSV-deserializer .

Met aangepaste .NET-deserializers kan uw Azure Stream Analytics-taak gegevens lezen uit indelingen buiten de drie ingebouwde gegevensindelingen. In dit artikel worden de serialisatie-indeling en de interfaces beschreven waarmee aangepaste .NET-deserializers worden gedefinieerd voor Azure Stream Analytics-cloud- en edge-taken. Er zijn ook voorbeelden van deserializers voor protocolbuffer- en CSV-indeling.

Aangepaste .NET-deserializer

De volgende codevoorbeelden zijn de interfaces die de aangepaste deserializer definiëren en implementeren StreamDeserializer<T>.

UserDefinedOperator is de basisklasse voor alle aangepaste streamingoperators. Het initialiseert StreamingContext, wat context biedt, waaronder het mechanisme voor het publiceren van diagnostische gegevens waarvoor u eventuele problemen met uw deserializer moet opsporen.

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

Het volgende codefragment is de deserialisatie voor streaminggegevens.

Overslaande fouten moeten worden verzonden met behulp van IStreamingDiagnosticsUserDefinedOperatorde initialisatiemethode. Alle uitzonderingen worden behandeld als fouten en de deserializer wordt opnieuw gemaakt. Na enkele fouten wordt de taak naar een mislukte status verzonden.

StreamDeserializer<T> deserializeert een stroom naar object van het type T. Aan de volgende voorwaarden moet worden voldaan:

  1. T is een klasse of een struct.
  2. Alle openbare velden in T zijn beide
    1. Een van [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double] of hun nullable equivalenten.
    2. Een andere struct of klasse die dezelfde regels volgt.
    3. Matrix van het type T2 dat volgt op dezelfde regels.
    4. IListT2 waarbij T2 dezelfde regels volgt.
    5. Heeft geen recursieve typen.

De parameter stream is de stroom die het geserialiseerde object bevat. Deserialize retourneert een verzameling T exemplaren.

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContext biedt context, waaronder het mechanisme voor het publiceren van diagnostische gegevens voor gebruikersoperator.

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics is de diagnostische gegevens voor door de gebruiker gedefinieerde operators, waaronder serializer, deserializer en door de gebruiker gedefinieerde functies.

WriteError schrijft een foutbericht naar resourcelogboeken en verzendt de fout naar diagnostische gegevens.

briefMessage is een kort foutbericht. Dit bericht wordt weergegeven in diagnostische gegevens en wordt gebruikt door het productteam voor foutopsporingsdoeleinden. Neem geen gevoelige informatie op en bewaar het bericht minder dan 200 tekens

detailedMessage is een gedetailleerd foutbericht dat alleen wordt toegevoegd aan uw resourcelogboeken in uw opslag. Dit bericht mag uit minder dan 2000 tekens bestaan.

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

Voorbeelden van deserializer

In deze sectie wordt beschreven hoe u aangepaste deserializers schrijft voor Protobuf en CSV. Ga naar Azure Stream Analytics op GitHub voor meer voorbeelden, zoals AVRO-indeling voor Event Hubs Capture.

Protocolbufferindeling (Protobuf)

Dit is een voorbeeld met de protocolbufferindeling.

Ga ervan uit dat de volgende protocolbufferdefinitie wordt gebruikt.

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

NuGet wordt uitgevoerd protoc.exe vanuit Google.Protobuf.Tools genereert een .cs bestand met de definitie. Het gegenereerde bestand wordt hier niet weergegeven. U moet ervoor zorgen dat de versie van Protobuf NuGet die u in uw Stream Analytics-project gebruikt, overeenkomt met de Protobuf-versie die is gebruikt om de invoer te genereren.

Het volgende codefragment is de implementatie van de deserializer, ervan uitgaande dat het gegenereerde bestand is opgenomen in het project. Deze implementatie is slechts een dunne wrapper over het gegenereerde bestand.

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

Het volgende codefragment is een eenvoudige CSV-deserializer waarmee ook fouten worden doorgegeven.

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

Serialisatie-indeling voor REST API's

Elke Stream Analytics-invoer heeft een serialisatie-indeling. Zie de documentatie over de Input REST API voor meer informatie over invoeropties.

De volgende JavaScript-code is een voorbeeld van de serialisatie-indeling van .NET deserializer wanneer u de REST API gebruikt:

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName moet een klasse zijn die implementeert StreamDeserializer<T>. Dit wordt beschreven in de volgende sectie.

Ondersteuning voor regio

Deze functie is beschikbaar in de volgende regio's wanneer u standard-SKU gebruikt:

  • VS - west-centraal
  • Europa - noord
  • VS - oost
  • VS - west
  • VS - oost 2
  • Europa -west

U kunt ondersteuning aanvragen voor meer regio's. Er is echter geen dergelijke regiobeperking bij het gebruik van Stream Analytics-clusters.

Veelgestelde vragen

Wanneer is deze functie beschikbaar in alle Azure-regio's?

Deze functie is beschikbaar in 6 regio's. Als u deze functionaliteit in een andere regio wilt gebruiken, kunt u een aanvraag indienen. Ondersteuning voor alle Azure-regio's staat op de roadmap.

Heb ik toegang tot MetadataPropertyValue vanuit mijn invoer die vergelijkbaar is met de functie GetMetadataPropertyValue?

Deze functionaliteit wordt niet ondersteund. Als u deze mogelijkheid nodig hebt, kunt u stemmen op deze aanvraag op UserVoice.

Kan ik mijn deserializer-implementatie delen met de community, zodat anderen kunnen profiteren?

Zodra u uw deserializer hebt geïmplementeerd, kunt u anderen helpen door deze te delen met de community. Dien uw code in bij de GitHub-opslagplaats van Azure Stream Analytics.

Wat zijn de andere beperkingen voor het gebruik van aangepaste deserializers in Stream Analytics?

Als uw invoer een Protobuf-indeling heeft met een schema met MapField een type, kunt u geen aangepaste deserializer implementeren. Aangepaste deserializers bieden ook geen ondersteuning voor voorbeeldgegevens of voorbeeldgegevens.

Volgende stappen