使用 .NET 自訂還原序列化程式讀取任何格式的輸入 (預覽)

.NET 自訂還原序列化程式可讓 Azure 串流分析作業從三種內建資料格式以外的格式讀取資料。 本文說明序列化格式,以及可為 Azure 串流分析雲端和邊緣作業定義 .NET 自訂還原序列化程式的介面。 另外還有通訊協定緩衝區和 CSV 格式的範例還原序列化程式。

.NET 自訂還原序列化程式

下列程式碼範例是可定義自訂還原序列化程式及實作 StreamDeserializer<T> 的介面。

UserDefinedOperator 是所有自訂串流運算子的基底類別。 它會初始化 StreamingContext ,它提供內容,其中包含發佈診斷的機制,您必須針對還原序列化程式進行偵錯。

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

下列程式碼片段是串流資料的還原序列化。

可略過錯誤應使用透過 UserDefinedOperator 的 Initialize 方法傳遞的 IStreamingDiagnostics 來發出。 所有例外狀況都會被視為錯誤,而且會重新建立還原序列化程序。 發生某些錯誤之後,作業會進入失敗狀態。

StreamDeserializer<T> 可將資料流還原序列化為 T 類型的物件。 必須符合下列條件:

  1. T 是類別或結構。
  2. T 中的所有公用欄位不是
    1. [sbyte、byte、short、ushort、int、uint、long、DateTime、string、float、double] 其中一項,就是其可為 Null 的對等項目。
    2. 另一個遵循相同規則的結構或類別。
    3. 遵循相同規則的 T2 類型陣列。
    4. IListT2 其中 T2 遵循相同的規則。
    5. 沒有任何遞迴類型。

參數 stream 是包含序列化物件的資料流。 Deserialize 可傳回 T 執行個體的集合。

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContext 提供內容,其中包含發佈使用者操作員診斷的機制。

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics 是使用者定義運算子 (包括序列化程式、還原序列化程式和使用者定義的函式) 的診斷。

WriteError 可將錯誤訊息寫入資源記錄,並將錯誤傳送至診斷。

briefMessage 是簡短的錯誤訊息。 此訊息會顯示在診斷中,並由產品小組用來進行偵錯。 請勿包含敏感性資訊,並將訊息保留少於 200 個字元

detailedMessage 是詳細的錯誤訊息,其只會新增至儲存體中的資源記錄。 此訊息不得超過 2000 個字元。

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

還原序列化程式範例

本節說明如何為 Protobuf 和 CSV 撰寫自訂還原序列化程式。 如需更多範例,例如事件中樞擷取的 AVRO 格式,請流覽 GitHub 上的 Azure 串流分析

通訊協定緩衝區 (Protobuf) 格式

這是使用通訊協定緩衝區格式的範例。

假設下列通訊協定緩衝區定義。

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

Google.Protobuf.Tools NuGet 執行 protoc.exe 可產生具有定義的 .cs 檔案。 此處未顯示產生的檔案。 您必須確定您在串流分析專案中使用的 Protobuf NuGet 版本符合用來產生輸入的 Protobuf 版本。

下列程式碼片段是假設所產生檔案包含在專案中的還原序列化程式實作。 此實作就是所產生檔案的精簡包裝函式。

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

下列程式碼片段是簡單的 CSV 還原序列化程式,其也會示範如何傳播錯誤。

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

REST API 的序列化格式

每個串流分析輸入都有序列化格式。 如需輸入選項的詳細資訊,請參閱輸入 REST API 文件。

下列 Javascript 程式碼是使用 REST API 時 .NET 還原序列化程式序列化格式的範例:

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName 應該是可實作 StreamDeserializer<T> 的類別。 以下章節會加以說明。

區域支援

使用標準 SKU 時,下列區域提供這項功能:

  • 美國中西部
  • 北歐
  • 美國東部
  • 美國西部
  • 美國東部 2
  • 西歐

您可以要求更多區域 的支援 。 不過,使用 串流分析叢集時沒有這類區域限制。

常見問題集

此功能何時會在所有 Azure 區域中提供?

這項功能會在 6 個區域中提供。 如果您有興趣在另一個區域中使用這項功能,您可以 提交要求。 所有 Azure 區域的支援都在藍圖規劃中。

我可以從類似 GetMetadataPropertyValue 函式的輸入存取 MetadataPropertyValue 嗎?

不支援此功能。 如果您需要這項功能,可以在 UserVoice 上投票同意此要求。

我可與社群分享我的還原序列化程式實作,讓其他人因此受惠嗎?

在實作還原序列化程式之後,您可以透過與社群共用來協助其他人。 將您的程式碼提交至 Azure 串流分析 GitHub 存放庫

在串流分析中使用自訂還原序列化程式的其他限制為何?

如果您的輸入是包含型別之架構 MapField 的 Protobuf 格式,您將無法實作自訂還原序列化程式。 此外,自訂還原序列化器不支援範例資料或預覽資料。

後續步驟