StreamInsight 端對端範例
本主題描述與建立 StreamInsight 應用程式有關的各種元件和步驟,並包含應用程式的端對端範例。StreamInsight 應用程式結合了事件來源、事件接收和查詢,以便實作複雜的事件處理案例。StreamInsight API 會提供各種介面來支援建立及維護事件處理應用程式的各種控制和複雜性等級。
應用程式部署的最小單位為可以啟動及停止的「查詢」(Query)。下圖顯示建立查詢的一個方式。事件來源是由「輸入配接器」(Input Adapter) 表示。此配接器會將事件資料流送到運算子樹狀結構,此樹狀結構表示所要的查詢邏輯,由設計者以「查詢範本」(Query Template) 的形式來指定。然後處理過的事件資料流會載入事件接收中,通常是「輸出配接器」(Output Adapter)。
不熟悉複雜事件處理方法的開發人員應該詳讀<StreamInsight 伺服器概念>和<StreamInsight 伺服器架構>。
應用程式程序
本節會逐步說明建立端對端應用程式的一般使用經驗。
具現化伺服器執行個體和應用程式
此程序從 StreamInsight 伺服器執行個體和應用程式的具現化開始。
server = Server.Create(”MyInstance”);
Application myApp = server.CreateApplication("MyApp");
您必須使用已經透過 StreamInsight 安裝程序在電腦上註冊的執行個體名稱來建立伺服器 (上述範例中的 MyInstance)。如需詳細資訊,請參閱<安裝 (StreamInsight)>。
「應用程式」(Application) 表示伺服器中包含其他中繼資料實體的範圍單位。
上一個範例會在相同程序中建立伺服器執行個體。但是,另一個常見的部署是連接到遠端伺服器,並在那裡處理現有的應用程式。下列範例示範如何連接到遠端伺服器並存取現有的應用程式。
server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/StreamInsight/MyInstance"));
Application myApp = server.Applications["ExistingApp"];
如需有關本機和遠端伺服器的詳細資訊,請參閱<發行及連接到 StreamInsight 伺服器>。
建立輸入資料流
接下來,將會在現有配接器實作的上層建立輸入資料流。為了更精確,必須指定配接器 Factory,如下列範例所示。
var inputstream = CepStream<MyDataType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new InputAdapterConfig { someFlag = true },
EventShape.Point);
這樣會建立表示事件資料流的 CepStream 物件,該物件是由透過給定 Factory 類別具現化的配接器所產生 (一旦啟動查詢之後)。系統會提供一個名稱給此資料流,之後可以使用此名稱來擷取資料流特有的診斷。此外,也會提供配接器 Factory 之組態結構的執行個體。組態結構會將執行階段特有的資訊傳遞給此 Factory 以及所要的事件圖形 (事件模型)。如需有關此 Factory 如何使用這些參數的詳細資訊,請參閱<建立輸入和輸出配接器>。
定義查詢
CepStream 物件會當做實際查詢邏輯定義的基礎來使用。此查詢使用 LINQ 當做查詢規格語言:
var filtered = from e in inputstream
where e.Value > 95
select e;
在此範例中,我們假設上一個範例定義的 MyDataType 類別或結構 (用來建立輸入資料流物件) 包含名為 Value 的欄位。這個定義會轉譯為一個篩選運算子,此運算子會從資料流中捨棄所有未滿足 where e.Value > 95 篩選述詞的事件。如需有關 LINQ 查詢運算子的詳細資訊,請參閱<以 LINQ 撰寫查詢範本>。
建立輸出配接器
此時,變數 filtered 的類型依然是 CepStream。如此可讓資料流轉換成可以啟動的查詢。為了產生可以啟動的查詢執行個體,必須指定輸出配接器,如下列範例所示。
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
輸出配接器需要指定輸出配接器 Factory、組態物件、想要的輸出資料流圖形及暫時順序,這一點與輸入資料流一樣。
事件圖形規格會確保查詢輸出的個別事件圖形:
EventShape.Point:任何結果事件存留期間都會縮減為點事件。
EventShape.Interval:任何結果事件都會解譯為間隔事件。也就是說,如果其完整存留期間是由目前時間增量 (CTI) 事件所認可,則只有輸出。
EventShape.Edge:任何結果事件都會解譯為邊緣事件。也就是說,其開始時間會輸出為開始邊緣,其結束時間會輸出為對應的結束邊緣。
資料流事件順序參數會影響間隔事件輸出資料流的有效性。FullyOrdered 表示間隔事件一定會是依據其開始時間順序的輸出,而 ChainOrdered 則會產生由間隔結束時間所排序的輸出順序。
此外,應用程式物件必須當做第一個參數來提供,此物件現在包含查詢以及查詢名稱和描述,這些內容會進一步在中繼資料存放區內識別這個查詢。
開始查詢
最後一個步驟是開始查詢。在此範例中,查詢是由使用者提供的按鍵所停止。
query.Start();
Console.ReadLine();
query.Stop();
這個端對端範例示範如何透過 CepStream.Create() 和 ToQuery() 多載來搭配查詢範本使用事件來源的隱含繫結,以快速建立工作查詢。如果需要對 CEP 物件繫結擁有更多的明確控制權,請參閱<使用查詢繫結器>。
完整範例
下列範例會結合之前所述的元件來建立完整應用程式。
Server server = null;
using (Server server = Server.Create(”MyInstance”))
{
try
{
Application myApp = server.CreateApplication("MyApp");
var inputstream = CepStream<MyDataType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new InputAdapterConfig { someFlag = true },
EventShape.Point);
var filtered = from e in inputstream
where e.Value > 95
select e;
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
query.Start();
Console.ReadLine();
query.Stop();
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}