共用方式為


StreamInsight 範例:伺服器 - 公開內嵌伺服器

 

此範例示範如何建立與公開內嵌 StreamInsight 伺服器,並且讓用戶端程式能用來當做遠端伺服器。 除了建立伺服器並使其能讓用戶端程式使用之外,此範例還能當做用戶端本身來使用,可讓您建立來源和查詢、繫結至接收,以及利用處理序來執行繫結。 如需有關 StreamInsight 實體的詳細資訊,請參閱<StreamInsight 概念>。

此範例中建立的實體主要是供本節中其他範例所使用。 若要一併使用本節中的範例,請執行下列動作:

  1. 執行此伺服器範例

  2. 執行下列其中一個或兩個用戶端範例:

逐步解說

典型的 StreamInsight 用戶端一般都會遵循下列基本步驟:

  • 建立 StreamInsight 伺服器執行個體

  • 建立或取得 StreamInsight 應用程式

  • 定義或取得來源

  • 針對來源撰寫查詢

  • 定義或取得接收

  • 繫結並執行查詢和接收

在此範例中,程式會建立所需的所有實體,並將這些實體部署至伺服器供用戶端使用。

建立伺服器執行個體

建立 StreamInsight 程式首先是從具現化 StreamInsight 伺服器執行個體開始。 在這個範例中,伺服器是內嵌在程式中。

server = Server.Create("Default");  

您必須使用已經透過 StreamInsight 安裝程序在電腦上註冊的執行個體名稱來建立伺服器 (此範例中的 Default)。 如需詳細資訊,請參閱<安裝 (StreamInsight)>。

接著,公開內嵌伺服器的端點,讓用戶端 StreamInsight 程式可以連接並當做遠端 StreamInsight 伺服器。

  
var host = new ServiceHost(server.CreateManagementService());  
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "https://localhost/MyStreamInsightServer");  
  

用戶端程式也可以使用您在安裝 StreamInsight 時建立的 StreamInsight 主機 Windows 服務。 如需有關連接到 StreamInsight 伺服器時可搭配使用之選項的詳細資訊,請參閱<發行及連接到 StreamInsight 伺服器>。

建立應用程式

「應用程式」(Application) 表示伺服器中的範圍單位, 而其他所有實體都是在應用程式中建立的。

var myApp = server.CreateApplication("serverApp");  

定義和部署來源

接著,定義輸入來源再將其部署至伺服器並賦予名稱,讓其他 StreamInsight 用戶端都能使用此來源。 在此範例中,來源內的資料是每秒所產生之時間點事件的簡單暫時資料流。

  
var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);  
mySource.Deploy("serverSource");  
  

針對來源撰寫查詢

接著,針對輸入來源撰寫查詢。 此查詢使用 LINQ 當做查詢規格語言。 在此範例中,查詢會傳回每個偶數事件的值。

  
var myQuery = from e in mySource  
              where e % 2 == 0  
              select e;  
  

技術上來說,這個定義會轉譯為一個篩選運算子,此運算子會從資料流序列中捨棄所有未滿足篩選述詞 (where e % 2 == 0) 的事件並傳回事件值。 如需有關 LINQ 查詢運算子的詳細資訊,請參閱<使用 StreamInsight LINQ>。

定義和部署接收

接著,建立可繫結至查詢的輸出接收,然後處理產生的伺資料流序列。 此範例只是建立簡單的函式,可將資料流值寫入至主控台。

var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));  

接著,再將接收部署至伺服器並賦予名稱。

mySink.Deploy("serverSink");  

繫結並執行查詢和接收

此時您已經可以將可觀察的查詢繫結至觀察器輸出接收,然後在伺服器的處理序中執行。

var proc = myQuery.Bind(mySink).Run("serverProcess");  

在下列完整範例中,此處理序會持續執行,直到使用者在主控台輸入內容將它停止為止。

完整範例

下列範例會結合之前所述的元件來建立完整應用程式。 為求簡化,此範例不會檢查潛在的錯誤狀況。

  
using System;  
using System.ServiceModel;  
using Microsoft.ComplexEventProcessing;  
using Microsoft.ComplexEventProcessing.Linq;  
using Microsoft.ComplexEventProcessing.ManagementService;  
using System.Reactive;  
using System.Reactive.Linq;  
  
namespace StreamInsight21_example_Server  
  
    /* This example:  
     * creates an embedded server instance and makes it available to other clients  
     * defines, deploys, binds, and runs a simple source, query, and sink  
     * waits for the user to stop the server  
     */  
{  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            // Create an embedded StreamInsight server  
            using (var server = Server.Create("Default"))  
            {  
                // Create a local end point for the server embedded in this program  
                var host = new ServiceHost(server.CreateManagementService());  
                host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "https://localhost/MyStreamInsightServer");  
                host.Open();  
  
                /* The following entities will be defined and available in the server for other clients:  
                 * serverApp  
                 * serverSource  
                 * serverSink  
                 * serverProcess  
                 */  
  
                // CREATE a StreamInsight APPLICATION in the server  
                var myApp = server.CreateApplication("serverApp");  
  
                // DEFINE a simple SOURCE (returns a point event every second)  
                var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);  
  
                // DEPLOY the source to the server for clients to use  
                mySource.Deploy("serverSource");  
  
                // Compose a QUERY over the source (return every even-numbered event)  
                var myQuery = from e in mySource  
                              where e % 2 == 0  
                              select e;  
  
                // DEFINE a simple observer SINK (writes the value to the server console)  
                var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));  
  
                // DEPLOY the sink to the server for clients to use  
                mySink.Deploy("serverSink");  
  
                // BIND the query to the sink and RUN it  
                using (var proc = myQuery.Bind(mySink).Run("serverProcess"))  
                {  
                    // Wait for the user stops the server  
                    Console.WriteLine("----------------------------------------------------------------");  
                    Console.WriteLine("MyStreamInsightServer is running, press Enter to stop the server");  
                    Console.WriteLine("----------------------------------------------------------------");  
                    Console.WriteLine(" ");  
                    Console.ReadLine();  
                }  
                host.Close();  
            }  
        }  
    }  
}  
  

請參閱

StreamInsight 範例
StreamInsight 概念