共用方式為


建立輸入和輸出配接器

本主題針對使用 StreamInsight 平台的複雜事件處理 (CEP) 應用程式提供了建立輸入和輸出配接器所需的一般資訊。配接器是傳遞事件給 StreamInsight 伺服器或從此伺服器傳遞事件的軟體轉換器。

了解事件流程和控制

建立配接器時,請務必了解事件通過 StreamInsight 伺服器的流程,以及輸入和輸出配接器如何控制這個流程。如下圖所示,事件從來源、通過常設查詢而進入接收的流程是單向的。輸入配接器會從來源中讀取事件,以便將事件傳遞給查詢。輸入事件或處理輸入事件所產生的新事件則從某個運算子發送至查詢中的下一個運算子。然後,查詢會將處理過的事件傳遞給輸出配接器,以便將事件傳遞給接收。下圖描述 StreamInsight 查詢繫結至兩個輸入配接器執行個體 a1 和 a2 及輸出配接器執行個體 a4 的案例。

從輸入配接器到輸出配接器的事件流程

雖然從來源到接收的事件流程是單向的,不過元件之間某些互動點之事件擷取和傳送的流程與執行控制可以是雙向的。在上圖中,這些互動點顯示成 READ、ENQUEUE、DEQUEUE 和 WRITE。

您的輸入配接器實作應該使用來源裝置 (例如檔案或資料庫) 所特有的存取機制來執行 READ 作業,並使用配接器 API 執行 ENQUEUE 作業。同樣地,輸出配接器實作應該使用接收裝置所特有的存取機制來執行 WRITE 作業,並使用配接器 API 執行 DEQUEUE 作業。您必須根據配接器狀態轉換圖表所指定的設計模式來實作 ENQUEUE 和 DEQUEUE 作業,本主題稍後會加以描述。

從事件流程控制的觀點,您可以想像事件從提供者發送到取用者 (以從左到右的箭號圖案表示) 或是想像事件由取用者從提供者提取 (以彎形箭號表示)。您的配接器實作可以在 READ 和 WRITE 互動點,針對事件流程控制來採用發送或提取方法。這個互動所要考量的某些因素包括來源或接收能夠處理的事件速率、配接器對來源或接收的流速控制能力,以及您可以實作的任何緩衝處理能力。

如果來源裝置以非常低的延遲將事件吸取出來而且很難進行流速控制,則典型的作法是實作一個配接器,讓來源裝置將事件發送到配接器。這類裝置的範例包括感應器 (機器導向事件)、即時看板及網路連接埠。如果裝置的延遲較高 (檔案、資料庫),請考慮配接器從來源提取資料的實作方式。同樣在輸出端,可以針對以極高輸送量接受事件的裝置來實作輸出配接器,將事件發送到裝置。比較慢的輸出裝置可能會採用一個方法,也就是每當裝置準備取用事件時都輪詢配接器。

在 ENQUEUE 互動點,StreamInsight 伺服器會支援發送模型。這表示,配接器設計模式可讓您在任何時間點,將引擎可以取用的任何數量的事件加入佇列中。在 DEQUEUE 互動點,StreamInsight 伺服器會支援提取模型。這表示,配接器設計模式預期您會依照引擎可以提供的任何速度從伺服器提取事件。

在這個給定的情況下,StreamInsight 伺服器的流速控制原則非常直接。假設有一個簡單的通過查詢沒有任何封鎖作業,則 StreamInsight 伺服器可以在 ENQUEUE 互動點取用輸入配接器中事件的速率只會受到以下的條件所限制:輸出配接器可以在 DEQUEUE 互動點取用伺服器中事件的速率。StreamInsight 伺服器在 ENQUEUE 期間於輸入配接器上發送回的範圍取決於查詢能夠釋放輸出的速度以及輸出配接器能夠取用這個輸出的速度。StreamInsight 提供一組擴充的診斷檢視,幫助您在每一個互動點上測量事件速率。如需詳細資訊,請參閱<監視 StreamInsight 伺服器和查詢>。

配接器開發工作

請使用下列檢查清單來開發配接器。

  • 判斷您所需要的配接器類型 (輸入或輸出)。

    輸入配接器會使用所提供的格式來讀取內送事件,並將這項資料轉換成可由 StreamInsight 伺服器取用的格式。

    輸出配接器會接收 StreamInsight 伺服器所處理的事件、將事件轉換成輸出裝置所預期的格式,並將資料發出到該裝置。

  • 判斷事件類型。

    如果是輸入配接器,請定義事件類型來描述來源所提供之事件的裝載。如果是輸出配接器,請指定事件類型來描述接收所取用之事件的裝載。如需有關事件裝載的詳細資訊,請參閱<StreamInsight 伺服器概念>。

    您會針對一律產生或取用固定裝載格式之事件的來源或接收來指定及建立具類型的配接器 (這樣會事先知道欄位的數目和其類型)。具類型的配接器的主要優點在於建立要加入 StreamInsight 伺服器佇列之事件的實作方法相當簡單。因為已經知道欄位類型,所以您可以使用 Visual Studio 中的 IntelliSense (或是另一個整合式開發環境中的同等功能) 來填入欄位。

    如果來源或接收產生或取用不同裝載格式,您會指定及建立不具類型的配接器。不具類型的配接器的主要優點在於它會在查詢繫結時提供指定事件類型的彈性,而不會將配接器實作繫結至特定的事件類型。與具類型的配接器相較之下,不具類型的配接器的實作涉入程度較深。不具型別的輸入配接器的撰寫方式,必須能夠讓它根據查詢繫結時間內所提供的組態參數來判斷每個欄位的類型、一次填入一個欄位,然後將事件加入佇列。同樣地,不具型別的輸出配接器必須能夠根據輸出上所提供的組態資訊,從清除佇列的事件中擷取查詢處理的結果。

    請務必注意一件事,繫結至查詢的配接器執行個體 (不論具不具型別) 一定會發出包含一個特定類型之裝載的事件。如需詳細資訊,請參閱<建立事件類型>。

  • 判斷事件模型。

    判斷輸入和輸出事件的事件模型。StreamInsight 可支援三種事件模型:點、間隔和邊緣。如果來源提供固定事件模型的事件,您可以單獨針對該事件模型來設計輸入配接器。同樣地,如果接收需要特定模型的事件,您也可以單獨針對該事件模型來設計輸出配接器。但是,大多數應用程式可能需要特定事件類型的所有事件模型。我們建議您最好針對每一個事件模型來建立具型別或不具類型的配接器。如需有關事件模型的詳細資訊,請參閱<StreamInsight 伺服器概念>。

    輸入和輸出 AdapterFactory 類別可讓您一起封裝這些配接器。您可以在查詢繫結時,根據組態參數具現化正確的配接器。

  • 選擇對應的配接器基底類別。

    根據事件類型和模型,選取適當的配接器基底類別。類別命名法的模式如下:[Typed][Point | Interval | Edge][Input | Output]。不具類型的配接器沒有具型別的前置詞。

    配接器類型

    輸入配接器基底類別

    輸出配接器基底類別

    具型別的點

    TypedPointInputAdapter

    TypedPointOutputAdapter

    不具型別的點

    PointInputAdapter

    PointOutputAdapter

    具型別的間隔

    TypedIntervalInputAdapter

    TypedIntervalOutputAdapter

    不具型別的間隔

    IntervalInputAdapter

    IntervalOutputAdapter

    具型別的邊緣

    TypedEdgeInputAdapter

    TypedEdgeOutputAdapter

    不具型別的邊緣

    EdgeInputAdapter

    EdgeOutputAdapter

    如需詳細資訊,請參閱<Microsoft.ComplexEventProcessing.Adapters>。

  • 設計輸入和輸出 AdapterFactory 類別。

    AdapterFactory 是配接器的容器類別。您必須實作 Factory 類別。基底 Factory 類別的組織方式如下所示。

    配接器類型

    輸入配接器基底類別

    輸出配接器基底類別

    具型別

    ITypedInputAdapterFactory

    ITypedOutputAdapterFactory

    不具型別

    IInputAdapterFactory

    IOutputAdapterFactory

    具型別且支援恢復功能

    IHighWaterMarkTypedInputAdapterFactory

    IHighWaterMarkTypedOutputAdapterFactory

    不具型別且支援恢復功能

    IHighWaterMarkInputAdapterFactory

    IHighWaterMarkOutputAdapterFactory

    此 Factory 類別會提供下列用途:

    • 它會針對給定的裝置類別 (CSV 檔案、SQL Server 資料庫、Web 伺服器通用記錄格式) 或應用程式需求,啟用不同配接器實作之間的資源共用,並加速組態參數傳遞給配接器建構函式的程序。例如,應用程式可能需要所有的這三種事件模型 (點、間隔和邊緣)。單一 Factory 可以支援三種配接器實作,每一個事件模型各一種。另一個範例為,雖然應用程式可能具有相同的事件來源 (例如資料庫資料表),不過來源會根據所執行的查詢從相同的來源產生多個事件裝載結構。在此情況下,單一 Factory 就可以支援配接器實作來處理每個裝載結構。

    • 它會提供伺服器執行階段的配接器閘道。配接器開發人員必須在配接器類別的配接器 Factory 中實作 Create() 和 Dispose() 方法。伺服器會在查詢啟動和關閉期間叫用這些方法。

    • 它會提供預備執行階段組態資訊的配接器閘道。這對於不具類型的配接器而言特別重要,因為這些配接器必須根據查詢繫結時間內所提供的組態參數判斷結構中每個欄位的類型。您可以在 Factory 類別中定義組態結構,並且透過 Create() 方法,將這個組態結構傳遞給配接器類別的建構函式方法。這個組態結構會使用 DataContractSerialization 加以序列化。除了這個條件約束以外,此開發方法會提供您定義及使用這個組態結構的完整彈性 (根據您擴展它以及在配接器建構函式中使用它的層面來看)。

    • 它會提供一種方式來產生目前時間增量 (CTI),而不需要透過輸入配接器將它們明確加入佇列。藉由在配接器 Factory 類別中實作 ITypedDeclareAdvanceTimePolicy (適用於具類型的配接器 Factory) 和 IDeclareAdvanceTimePolicy (適用於不具類型的配接器 Factory) 介面,使用者就可以指定 CTI 頻率和時間戳記。這樣做可簡化配接器程式碼,而且可能會影響 Factory 透過其配接器執行個體所產生的每個事件資料流。如需詳細資訊,請參閱<[AdvanceTimeSettingsClass]>。

    • 在具有恢復功能的應用程式中,它支援恢復功能,方法是,提供上限標準給輸入配接器以重新執行遺漏的事件,以及提供上限標準與位移給輸出配接器以排除重複的事件。如需詳細資訊,請參閱<StreamInsight 恢復功能>。

  • 建立並測試配接器。

    將配接器編譯和建立為 .NET 組件。您可以針對不包含任何複雜查詢處理的簡單通過查詢 (從輸入配接器中讀取事件並將它輸出至輸出配接器) 測試配接器的基本作業。這樣做將驗證配接器是否能夠從裝置中讀取和寫入,以及是否能夠將事件加入佇列或從佇列清除。

配接器的狀態機器

對輸入和輸出配接器而言,定義配接器與 StreamInsight 伺服器之間互動的狀態機器都是相同的。這一點很重要,因為狀態機器會提供您一致的開發模型。下圖顯示狀態機器。

配接器加入佇列與清除佇列狀態圖表

讓此狀態機器得以運作的主要功能和需求如下:

  • Start() 和 Resume() 是由 StreamInsight 伺服器呼叫的方法,而且您必須以配接器開發人員的身分來實作它們。此外,您也必須針對配接器類別和 Dispose() 方法實作建構函式方法,該方法繼承自基底類別。

  • 接下來,您的配接器實作必須呼叫配接器 SDK 所提供的下列方法:

    • 輸入配接器的 Enqueue()。這樣會傳回 EnqueueOperationResult.Success 或 EnqueueOperationResult.Full 值。

    • 輸出配接器的 Dequeue()。這樣會傳回 DequeueOperationResult.Success 或 DequeueOperationResult.Empty 值。

    • Ready().這樣會傳回布林值 TRUE 或 FALSE。

    • Stopped().這樣會傳回布林值 TRUE 或 FALSE。

  • 當系統管理員或查詢開發人員透過伺服器 API 中的方法來停止執行查詢時,StreamInsight 伺服器會代表使用者來以非同步方式呼叫內部方法 (表示為 StopQuery())。

  • 當配接器處於下列其中一個狀態時,呼叫 Enqueue() 和 Dequeue() 會分別傳回 Full 和 Empty 狀態:

    • 已暫停

    • 正在停止

  • 當配接器處於下列其中一個狀態時,呼叫 Enqueue() 和 Dequeue() 會引發例外狀況:

    • 已建立

    • 已停止

  • 呼叫 Ready() 會在配接器處於下列其中一個狀態時引發例外狀況:

    • 已建立

    • 執行中

    • 已停止

  • 配接器在它的操作過程中會在部分或所有五個狀態之間轉換 (已建立、執行中、已暫停、正在停止和已停止)。狀態轉換會在 StreamInsight 伺服器呼叫 Start() 或 Resume() 之前以及配接器呼叫 Enqueue()、Dequeue()、Ready() 和 Stopped() 之後發生。

  • StreamInsight 伺服器和配接器絕對不會共用相同的執行緒。伺服器一定會在個別的工作者執行緒上呼叫 Start() 或 Resume()。伺服器會代表配接器從作業系統執行緒集區取得這個執行緒。這意味著 Start() 和 Resume() 方法擁有所需的完整能力及彈性來使用工作者執行緒 (例如,繁衍更多的執行緒以供非同步讀取或寫入)。在這個給定的情況下,當您從這個執行緒使用系統資源時,一定要非常謹慎而且熟練。

  • 此 API 不需要 Start() 與 Resume() 作業 (執行緒) 之間原本的同步處理。伺服器一定只能在配接器呼叫 Ready() 之後呼叫 Resume()。但是請注意,面對裝置的工作可能需要同步處理,例如事件的讀取、寫入或緩衝處理等工作,特別是在非同步 I/O 情況下。建議的最佳作法是使用非封鎖的 I/O。

  • 如果配接器可以閒置,配接器應該定期檢查此狀態,以判斷系統是否要求它停止。

配接器與伺服器互動的週期

StreamInsight 伺服器與配接器之間的信號交換一定是同步的。因此,在其執行的任何時間點,配接器都可以檢查其狀態,並據此來進行互動。配接器與 StreamInsight 伺服器互動的週期是由下列作業所組成,這些作業對應到之前圖例所顯示的狀態機器。

  • 已建立

    當開始查詢時,配接器執行個體會開始與 StreamInsight 伺服器互動 (其方式是在 StreamInsight 伺服器 API 中發出對應的呼叫)。

  • 執行中

    伺服器會將配接器置於執行中狀態,並以非同步方式在配接器上呼叫 Start(),而且保證只會發出這個呼叫一次。當配接器處於執行中狀態時,配接器可以將事件加入伺服器佇列中,或是從伺服器佇列中清除事件。

    在理想情況下,配接器大多數的時間都會置於執行中狀態。建議的設計模式是從 Start() 方法叫用讀取器或寫入器常式 (最好在個別的執行緒中),然後從 Start() 常式傳回,藉此快速放棄工作者執行緒。

    讀取器常式 (假設它稱為 ProduceEvents()) 會從來源讀取事件,並呼叫 Enqueue() 將事件發送到伺服器。如果是輸出配接器,寫入器常式 (假設它稱為 ConsumeEvents()) 會呼叫 Dequeue() 來從伺服器提取事件,並將事件寫入接收中。

  • 已暫停

    當伺服器無法接收加入佇列的事件或輸出要從佇列清除的事件時,輸入或輸出配接器就會置於已暫停狀態。這樣會叫用 Enqueue() 和 Dequeue() 來分別傳回 FULL 和 EMPTY 狀態。您可以在已暫停狀態下實作清理作業,例如儲存上次從資料庫中的記錄讀取的位置或檔案中的字行。在這個選擇性區段的結尾,您必須叫用 Ready() 方法來與伺服器溝通,告知配接器已經準備好繼續進行。如果此常式在相同的工作者執行緒上當做 Start() 本身來執行,您必須從 Start() 常式本身傳回。

  • 為了回應 Ready() 的叫用,伺服器會讓配接器返回執行中狀態,而且一定會在另一個工作者執行緒上以非同步方式呼叫 Resume()。您可以設計 Resume() 來讓上一次失敗的反覆運算加入佇列或從佇列中清除,然後呼叫 ProduceEvents() 或 ConsumeEvents(),這個模式可以一直繼續到配接器轉換為已停止或正在停止的狀態為止。

  • 正在停止

    在執行中或已暫停狀態下的任何時間點,伺服器都可以將配接器移到正在停止狀態,以回應停止查詢的非同步要求。在此狀態下,叫用 Enqueue() 或 Dequeue() 也會分別傳回 FULL 或 EMPTY 狀態。

    正在停止狀態會提供一個臨時區域給配接器實作,正確讓它自己做好停止的準備。您可以實作配接器來放棄它取得的所有資源 (執行緒、記憶體),然後叫用 Stopped() 方法。在呼叫這個方法之前,伺服器不會停止配接器。

    請注意,配接器可能會以非同步方式轉換到正在停止的狀態。配接器需要某些方法來偵測它已經進入正在停止的狀態。如同上面所討論的內容,配接器的設計模式是在暫停的狀態下叫用 Ready()。為了回應,伺服器會再一次叫用 Resume() 方法,藉此在 Resume() 方法中偵測正在停止的狀態。我們建議的最佳作法是將正在停止狀態的檢查當做第一個程式碼區塊放在 Start() 和 Resume() 實作內。

  • 已停止

    配接器程式碼可以在任何時間點呼叫 Stopped(),這樣會將配接器置於已停止狀態。我們建議您先清除配接器取得的資源,然後再呼叫 Stopped(),這是很好的設計作法。

    重要事項重要事項

    呼叫 Stopped() 方法失敗會造成與查詢相關聯的最後一頁記憶體仍然維持配置狀態。這樣會造成少量的記憶體遺漏,而且如果有許多啟動和停止查詢循環位於處理序中,這樣的記憶體遺漏情形會隨著時間而累積。

    在已停止狀態下,配接器無法參考任何 StreamInsight 伺服器特有的建構或事件記憶體,或是執行加入佇列或清除佇列作業。這類動作將會引發例外狀況。但是,作業系統和面對裝置的清除活動仍然可以繼續。

範例

如需各種輸入和輸出配接器以及配接器 Factory 的範例,請參閱 StreamInsight 範例 (英文) 上所提供的範例。

請參閱

概念

StreamInsight 伺服器概念

StreamInsight 伺服器架構