次の方法で共有


.NET カスタム逆シリアライザーを使用して任意の形式の入力を読み取る (プレビュー)

重要

Azure Stream Analytics 用のカスタム .NET 逆シリアライザーは、2024 年 9 月 30 日に廃止されます。 この日付を過ぎると、この機能を使用できなくなります。 この日付までに JSON、AVRO、または CSV の組み込み逆シリアライザーに移行してください。

.NET のカスタム逆シリアライザーを使用すると、Azure Stream Analytics ジョブで、3 つの組み込みデータ形式に含まれない形式からデータを読み取ることができます。 この記事では、シリアル化の形式と、Azure Stream Analytics のクラウド ジョブとエッジ ジョブに対する .NET カスタム逆シリアライザーが定義されているインターフェイスについて説明します。 また、プロトコル バッファーと CSV 形式に対する逆シリアライザーの例もあります。

.NET カスタム逆シリアライザー

以下のコード サンプルのインターフェイスでは、カスタム逆シリアライザーが定義され、StreamDeserializer<T> が実装されています。

UserDefinedOperator は、すべてのカスタム ストリーミング演算子の基底クラスです。 それによって初期化される StreamingContext で提供されるコンテキストには、逆シリアライザーに関するすべての問題をデバッグする必要がある診断を発行するためのメカニズムが含まれます。

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

次のコード スニペットは、ストリーミング データに対する逆シリアル化です。

スキップ可能なエラーは、UserDefinedOperator の Initialize メソッドを通して渡される IStreamingDiagnostics を使用して出力する必要があります。 すべての例外はエラーとして扱われ、逆シリアライザーが再作成されます。 いくつかのエラーが発生すると、ジョブは失敗状態になります。

StreamDeserializer<T> により、ストリームは T 型のオブジェクトに逆シリアル化されます。 次の条件を満たす必要があります。

  1. T は、クラスまたは構造体です。
  2. T のすべてのパブリック フィールドは、次のいずれかです
    1. [sbyte、byte、short、ushort、int、uint、long、DateTime、string、float、double] のいずれか、またはそれらと同等の Null 許容型。
    2. 同じ規則に従う別の構造体またはクラス。
    3. 同じ規則に従う T2 型の配列。
    4. T2 が同じ規則に従う IListT2
    5. どのような再帰型も持たない。

パラメーター stream は、シリアル化されたオブジェクトを含むストリームです。 Deserialize からは、T インスタンスのコレクションが返されます。

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContext からは、ユーザー演算子に対する診断を発行するためのメカニズムが含まれるコンテキストが提供されます。

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics は、シリアライザー、逆シリアライザー、ユーザー定義関数などのユーザー定義演算子に対する診断です。

WriteError を使うと、リソース ログにエラー メッセージを書き込み、診断にエラーを送信することができます。

briefMessage は、簡単なエラー メッセージです。 このメッセージは診断に表示され、製品チームによってデバッグのために使用されます。 機密情報を含めないでください。また、メッセージが 200 文字以上にならないようにします

detailedMessage は、ストレージのリソース ログにのみ追加される詳細なエラー メッセージです。 このメッセージは、2000 文字未満でなければなりません。

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

逆シリアライザーの例

このセクションでは、Protobuf と CSV のカスタム逆シリアライザーを記述する方法について説明します。 Event Hubs Capture の AVRO 形式などのその他の例については、GitHub の Azure Stream Analytics を参照してください。

プロトコル バッファー (Protobuf) 形式

これは、プロトコル バッファー形式を使用する例です。

次のプロトコル バッファー定義を想定します。

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

Google.Protobuf.Tools NuGet から protoc.exe を実行すると、定義を含む .cs ファイルが生成されます。 生成されたファイルは、ここには記載されていません。 Stream Analytics プロジェクトで使用する Protobuf NuGet のバージョンが、入力の生成に使用された Protobuf のバージョンと一致していることを確認する必要があります。

次のコード スニペットは、生成されたファイルがプロジェクトに含まれていることを想定した逆シリアライザーの実装です。 この実装は、生成されたファイルに対するシン ラッパーにすぎません。

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

次のコード スニペットは、エラーの伝達も示す簡単な CSV の逆シリアライザーです。

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

REST API のシリアル化形式

Stream Analytics のすべての入力には、シリアル化形式があります。 入力オプションについて詳しくは、入力 REST API のドキュメントをご覧ください。

次の JavaScript コードは、REST API を使用する場合の .NET 逆シリアライザーのシリアル化形式の例です。

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName は、StreamDeserializer<T> を実装するクラスでなければなりません。 これについては、次のセクションで説明します。

リージョンのサポート

Standard SKU を使用する場合、この機能は次のリージョンで利用できます。

  • 米国中西部
  • 北ヨーロッパ
  • 米国東部
  • 米国西部
  • 米国東部 2
  • 西ヨーロッパ

追加リージョンのサポートを要求することができます。 ただし、Stream Analytics クラスターを使用する場合、このようなリージョンの制限はありません。

よく寄せられる質問

この機能がすべての Azure リージョンで使用できるようになるのはいつですか?

この機能は、6 つのリージョンで使用できます。 別のリージョンでこの機能を使用することに関心がある場合は、要求を提出することができます。 すべての Azure リージョンでのサポートは、ロードマップ上にあります。

GetMetadataPropertyValue 関数と同様の入力から MetadataPropertyValue にアクセスすることはできますか?

この機能はサポートされていません。 この機能が必要な場合は、UserVoice でこの要求に投票することができます。

他のユーザーが利用できるように、逆シリアライザーの実装をコミュニティと共有できますか?

逆シリアライザーを実装した後は、コミュニティと共有することにより、他のユーザーに役立てることができます。 コードを Azure Stream Analytics GitHub リポジトリに送信してください。

Stream Analytics でカスタム デシリアライザーを使用する場合、他にどのような制限がありますか?

入力が MapField 型を含むスキーマの Protobuf 形式の場合、カスタム デシリアライザーを実装することはできません。 また、カスタム デシリアライザーでは、サンプル データやプレビュー データはサポートされません。

次の手順