StreamInsight 範例:伺服器 - 公開內嵌伺服器
此範例示範如何建立與公開內嵌 StreamInsight 伺服器,並且讓用戶端程式能用來當做遠端伺服器。 除了建立伺服器並使其能讓用戶端程式使用之外,此範例還能當做用戶端本身來使用,可讓您建立來源和查詢、繫結至接收,以及利用處理序來執行繫結。 如需有關 StreamInsight 實體的詳細資訊,請參閱<StreamInsight 概念>。
此範例中建立的實體主要是供本節中其他範例所使用。 若要一併使用本節中的範例,請執行下列動作:
執行此伺服器範例
執行下列其中一個或兩個用戶端範例:
逐步解說
典型的 StreamInsight 用戶端一般都會遵循下列基本步驟:
建立 StreamInsight 伺服器執行個體
建立或取得 StreamInsight 應用程式
定義或取得來源
針對來源撰寫查詢
定義或取得接收
繫結並執行查詢和接收
在此範例中,程式會建立所需的所有實體,並將這些實體部署至伺服器供用戶端使用。
建立伺服器執行個體
建立 StreamInsight 程式首先是從具現化 StreamInsight 伺服器執行個體開始。 在這個範例中,伺服器是內嵌在程式中。
server = Server.Create("Default");
您必須使用已經透過 StreamInsight 安裝程序在電腦上註冊的執行個體名稱來建立伺服器 (此範例中的 Default
)。 如需詳細資訊,請參閱<安裝 (StreamInsight)>。
接著,公開內嵌伺服器的端點,讓用戶端 StreamInsight 程式可以連接並當做遠端 StreamInsight 伺服器。
var host = new ServiceHost(server.CreateManagementService());
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "https://localhost/MyStreamInsightServer");
用戶端程式也可以使用您在安裝 StreamInsight 時建立的 StreamInsight 主機 Windows 服務。 如需有關連接到 StreamInsight 伺服器時可搭配使用之選項的詳細資訊,請參閱<發行及連接到 StreamInsight 伺服器>。
建立應用程式
「應用程式」(Application) 表示伺服器中的範圍單位, 而其他所有實體都是在應用程式中建立的。
var myApp = server.CreateApplication("serverApp");
定義和部署來源
接著,定義輸入來源再將其部署至伺服器並賦予名稱,讓其他 StreamInsight 用戶端都能使用此來源。 在此範例中,來源內的資料是每秒所產生之時間點事件的簡單暫時資料流。
var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);
mySource.Deploy("serverSource");
針對來源撰寫查詢
接著,針對輸入來源撰寫查詢。 此查詢使用 LINQ 當做查詢規格語言。 在此範例中,查詢會傳回每個偶數事件的值。
var myQuery = from e in mySource
where e % 2 == 0
select e;
技術上來說,這個定義會轉譯為一個篩選運算子,此運算子會從資料流序列中捨棄所有未滿足篩選述詞 (where e % 2 == 0
) 的事件並傳回事件值。 如需有關 LINQ 查詢運算子的詳細資訊,請參閱<使用 StreamInsight LINQ>。
定義和部署接收
接著,建立可繫結至查詢的輸出接收,然後處理產生的伺資料流序列。 此範例只是建立簡單的函式,可將資料流值寫入至主控台。
var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));
接著,再將接收部署至伺服器並賦予名稱。
mySink.Deploy("serverSink");
繫結並執行查詢和接收
此時您已經可以將可觀察的查詢繫結至觀察器輸出接收,然後在伺服器的處理序中執行。
var proc = myQuery.Bind(mySink).Run("serverProcess");
在下列完整範例中,此處理序會持續執行,直到使用者在主控台輸入內容將它停止為止。
完整範例
下列範例會結合之前所述的元件來建立完整應用程式。 為求簡化,此範例不會檢查潛在的錯誤狀況。
using System;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using System.Reactive;
using System.Reactive.Linq;
namespace StreamInsight21_example_Server
/* This example:
* creates an embedded server instance and makes it available to other clients
* defines, deploys, binds, and runs a simple source, query, and sink
* waits for the user to stop the server
*/
{
class Program
{
static void Main(string[] args)
{
// Create an embedded StreamInsight server
using (var server = Server.Create("Default"))
{
// Create a local end point for the server embedded in this program
var host = new ServiceHost(server.CreateManagementService());
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "https://localhost/MyStreamInsightServer");
host.Open();
/* The following entities will be defined and available in the server for other clients:
* serverApp
* serverSource
* serverSink
* serverProcess
*/
// CREATE a StreamInsight APPLICATION in the server
var myApp = server.CreateApplication("serverApp");
// DEFINE a simple SOURCE (returns a point event every second)
var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);
// DEPLOY the source to the server for clients to use
mySource.Deploy("serverSource");
// Compose a QUERY over the source (return every even-numbered event)
var myQuery = from e in mySource
where e % 2 == 0
select e;
// DEFINE a simple observer SINK (writes the value to the server console)
var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));
// DEPLOY the sink to the server for clients to use
mySink.Deploy("serverSink");
// BIND the query to the sink and RUN it
using (var proc = myQuery.Bind(mySink).Run("serverProcess"))
{
// Wait for the user stops the server
Console.WriteLine("----------------------------------------------------------------");
Console.WriteLine("MyStreamInsightServer is running, press Enter to stop the server");
Console.WriteLine("----------------------------------------------------------------");
Console.WriteLine(" ");
Console.ReadLine();
}
host.Close();
}
}
}
}