Share via


Lesen von Eingaben in beliebigen Formaten mithilfe benutzerdefinierter .NET-Deserialisierer (Vorschau)

Wichtig

Der benutzerdefinierte .net-Deserializer für Azure Stream Analytics wird am 30. September 2024 eingestellt. Nach diesem Datum ist es nicht möglich, das Feature zu verwenden. Wechseln Sie nach diesem Datum zu einem integrierten JSON-, AVRO- oder CSV-Deserializer.

Mithilfe benutzerdefinierter .NET-Deserialisierer können in Azure Stream Analytics-Aufträgen Daten in anderen Formaten als den drei integrierten Datenformaten gelesen werden. In diesem Artikel werden das Serialisierungsformat und die Schnittstellen erläutert, mit denen benutzerdefinierte .NET-Deserialisierer für Cloud- und Edgeaufträge in Azure Stream Analytics definiert werden. Außerdem enthält dieser Artikel Beispieldeserialisierer für das Protokollpuffer- und CSV-Format.

Benutzerdefinierter .NET-Deserialisierer

Die folgenden Codebeispiele sind die Schnittstellen, über die der benutzerdefinierte Deserialisierer definiert und StreamDeserializer<T> implementiert wird.

UserDefinedOperator ist die Basisklasse für alle benutzerdefinierten Streamingoperatoren. Sie initialisiert StreamingContext für den Kontext mit dem Mechanismus zum Veröffentlichen der Diagnose, die Sie zum Debuggen von Problemen mit dem Deserialisierer benötigen.

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

Der nachfolgende Codeausschnitt betrifft die Deserialisierung für Streamingdaten.

Überspringbare Fehler müssen mithilfe des Elements IStreamingDiagnostics ausgegeben werden, das über die Initialize-Methode von UserDefinedOperator übergeben wird. Alle Ausnahmen werden als Fehler behandelt, und der Deserialisierer wird neu erstellt. Nach ein paar Fehlern wechselt der Auftrag in einen Fehlerstatus.

StreamDeserializer<T> deserialisiert einen Stream in ein Objekt vom Typ T. Die folgenden Bedingungen müssen erfüllt sein:

  1. „T“ ist eine Klasse oder eine Struktur.
  2. Für alle öffentlichen Felder in „T“ gilt Folgendes:
    1. Sie weisen einen der Typen [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double] bzw. Entsprechungen mit zugelassenen Nullwerten auf.
    2. Sie sind eine andere Struktur oder Klasse, für die dieselben Regeln gelten.
    3. Sie sind ein Array vom Typ T2, für das dieselben Regeln gelten.
    4. IListT2, wobei für „T2“ dieselben Regeln gelten.
    5. Sie weisen keine rekursiven Typen auf.

Der Parameter stream ist der Stream, der das serialisierte Objekt enthält. Deserialize gibt eine Sammlung von T-Instanzen zurück.

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContext enthält den Kontext mit dem Mechanismus zum Veröffentlichen der Diagnose für benutzerdefinierte Operatoren.

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics ist die Diagnose für benutzerdefinierte Operatoren, einschließlich Serialisierungsmodul, Deserialisierer und benutzerdefinierter Funktionen.

WriteError schreibt eine Fehlermeldung in Ressourcenprotokolle und sendet den Fehler an die Diagnose.

briefMessage ist eine kurze Fehlermeldung. Diese Meldung wird in der Diagnose angezeigt und vom Produktteam für das Debuggen verwendet. Fügen Sie keine vertraulichen Informationen ein, und beschränken Sie die Meldung auf maximal 200 Zeichen.

detailedMessage ist eine ausführliche Fehlermeldung, die nur in den Ressourcenprotokollen im Speicher hinzugefügt wird. Diese Meldung darf maximal 2.000 Zeichen umfassen.

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

Beispiele für Deserialisierer

In diesem Abschnitt wird erläutert, wie Sie benutzerdefinierte Deserialisierer für Protobuf und CSV schreiben. Weitere Beispiele, z. B. AVRO-Format für Event Hubs Capture, finden Sie in GitHub unter Azure Stream Analytics.

Protokollpufferformat (Protobuf)

Es folgt ein Beispiel für die Verwendung des Protokollpufferformats.

Es wird von der folgenden Protokollpufferdefinition ausgegangen.

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

Durch Ausführen von protoc.exe über Google.Protobuf.Tools wird in NuGet eine CS-Datei mit der Definition generiert. Die generierte Datei wird hier nicht gezeigt. Sie müssen sicherstellen, dass die in Ihrem Stream Analytics-Projekt verwendete Version von Protobuf NuGet mit der Protobuf-Version übereinstimmt, die zum Generieren der Eingabe verwendet wurde.

Der folgende Codeausschnitt gilt für die Implementierung des Deserialisierers. Dabei wird vorausgesetzt, dass die generierte Datei im Projekt enthalten ist. Diese Implementierung ist nur ein einfacher Wrapper für die generierte Datei.

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

Der folgende Codeausschnitt gilt für einen einfachen CSV-Deserialisierer und gibt außerdem Verteilungsfehler an.

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

Serialisierungsformat für REST-APIs

Jede Stream Analytics-Eingabe hat ein Serialisierungsformat. Weitere Informationen zu Eingabeoptionen finden Sie in der Dokumentation zu Eingabe-REST-APIs.

Der folgende JavaScript-Code ist ein Beispiel für das Serialisierungsformat des .NET-Deserialisierers bei Verwendung der REST-API:

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName muss eine Klasse sein, die StreamDeserializer<T> implementiert. Dies wird im folgenden Abschnitt beschrieben.

Unterstützung für Regionen

Diese Funktion ist bei Verwendung der Standard-SKU in den folgenden Regionen verfügbar:

  • USA, Westen-Mitte
  • Nordeuropa
  • East US
  • USA (Westen)
  • USA (Ost) 2
  • Europa, Westen

Sie können Unterstützung anfordern für weitere Regionen. Bei Verwendung von Stream Analytics-Clustern gibt es jedoch keine solche Regionseinschränkung.

Häufig gestellte Fragen

Wann ist diese Funktion in allen Azure-Regionen verfügbar?

Diese Funktion steht in 6 Regionen zur Verfügung. Wenn Sie diese Funktion in einer anderen Region verwenden möchten, können Sie eine entsprechende Anforderung senden. Die Unterstützung für alle Azure-Regionen ist in Planung.

Kann ich ähnlich wie bei der GetMetadataPropertyValue-Funktion in meinen Eingaben auf MetadataPropertyValue zugreifen?

Diese Funktionalität wird nicht unterstützt. Wenn Sie diese Funktion benötigen, können Sie in UserVoice für diese Anforderung abstimmen.

Kann ich meine Implementierung des Deserialisierers für die Community freigeben, sodass andere Benutzer sie verwenden können?

Nach der Implementierung Ihres Deserialisierers können Sie anderen helfen, indem Sie ihn für die Community freigeben. Übermitteln Sie den Code an das Azure Stream Analytics-Repository in GitHub.

Welche sonstigen Einschränkungen gelten für die Verwendung benutzerdefinierter Deserialisierer in Stream Analytics?

Wenn Ihre Eingabe im Protobuf-Format mit einem Schema ist, das den MapField-Typ enthält, können Sie keinen benutzerdefinierten Deserialisierer implementieren. Außerdem unterstützen benutzerdefinierte Deserialisierer weder Beispiel- noch Vorschaudaten.

Nächste Schritte