共用方式為


StreamInsight 範例:用戶端 B - 建立主旨

此範例示範如何建立 StreamInsight 用戶端,這個用戶端會使用遠端伺服器和其中定義的實體。 此範例會特別示範如何建立繫結至多個來源和接收的主旨。 主旨會使用從來源資料流傳遞而來的資料,並將該資料傳遞至接收。 如需有關 StreamInsight 實體的詳細資訊,請參閱<StreamInsight 概念>。

此範例會使用本節所建立之伺服器範例中的遠端伺服器和實體。 若要一併使用本節中的範例,請執行下列動作:

  1. 執行伺服器範例<StreamInsight 範例:伺服器 - 公開內嵌伺服器>。

  2. 執行下列其中一個或兩個用戶端範例:

逐步解說

典型的 StreamInsight 用戶端一般都會遵循下列基本步驟:

  • 建立 StreamInsight 伺服器執行個體

  • 建立或取得 StreamInsight 應用程式

  • 定義或取得來源

  • 針對來源撰寫查詢

  • 定義或取得接收

  • 繫結並執行查詢和接收

在此範例中,用戶端會從伺服器取得現有的應用程式和實體,但是程式也會建立主旨以及其他來源和接收。

連接到伺服器

建立 StreamInsight 用戶端程式首先是從具現化 StreamInsight 伺服器執行個體開始。 在此範例中,用戶端會連接到名為 "MyStreamInsightServer" 的遠端伺服器。

var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer"))

如需有關連接到 StreamInsight 伺服器時可搭配使用之選項的詳細資訊,請參閱<發行及連接到 StreamInsight 伺服器>。

取得伺服器應用程式

在此範例中,用戶端會使用已在遠端伺服器中建立的 StreamInsight 應用程式, 而其中已定義此用戶端會用到的所有伺服器實體,並且也將會建立新實體。

myApp = server.Applications["serverApp"];

取得伺服器來源並定義新來源

接著,取得此伺服器上已定義的來源,以及定義新來源。 在此範例中,第二個來源內的資料是每秒所產生之時間點事件的簡單暫時資料流。

var mySource = myApp.GetObservable<int>("serverSource");
var mySourceB = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);

針對來源撰寫查詢

接著,針對這兩個來源撰寫查詢。 在此範例中,第一個查詢會從伺服器來源擷取每個偶數資料值,並且傳回該值加上 2000 後的值;而第二個查詢則會從第二個來源擷取每個奇數資料值,並且傳回該值加上 3000 後的值。

var myQuery = from e in mySource
              where e % 2 == 0
              select e + 2000;
var myQueryB = from e in mySourceB
               where e % 2 == 1
               select e + 3000;

取得伺服器接收並定義新接收

接著,取得此伺服器上已定義的接收,以及定義新接收。 在此範例中,第二個接收只是簡單的函式,可將資料流值寫入至主控台。

var mySink = myApp.GetObserver<int>("serverSink");
var mySinkB = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Client_B: {0}", x)));

建立主旨

接著,建立主旨。 主旨是伺服器中的物件,可以繫結至來源和接收,以取用來自來源的資料以及將資料傳遞至接收。

var mySubject = myApp.CreateSubject<long,long>("serverSubject_Client_B", () => new Subject<long>());

繫結並執行查詢和接收

現在,將主旨繫結至每個接收 (包含之前在伺服器中定義的接收,以及此用戶端定義的接收)。 執行繫結時,每個繫結都是各自獨立的處理序。 因為尚未繫結來源,所以當主旨繫結至接收時,資料並不會流動。 如果主旨在繫結至接收前就先繫結至來源,則資料會立即開始流向主旨而導致遺失。

var procB1 = mySubject.Bind(mySink).Run("serverProcess_Client_B_1");
var procB2 = mySubject.Bind(mySinkB).Run("serverProcess_Client_B_2");

接著,將主旨繫結至查詢。 執行下列處理序時,資料隨即開始從來源經由查詢流向每個接收。

var procB3 = myQuery.Bind(mySubject).Run("serverProcess_Client_B_3");
var procB4 = myQueryB.Bind(mySubject).Run("serverProcess_Client_B_4");

此用戶端目前已在伺服器中定義下列數個實體:

  • serverSubject_Client_B

  • serverProcess_Client_B_1

  • serverProcess_Client_B_2

  • serverProcess_Client_B_3

  • serverProcess_Client_B_4

完整範例

下列範例會結合之前所述的元件來建立完整應用程式。 為求簡便,此範例不會檢查潛在的錯誤狀況,並且會假設<StreamInsight 範例:伺服器 - 公開內嵌伺服器>範例中建立的伺服器已正在執行,而且所需的實體也都已經建立。

using System;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace StreamInsight21_example_Client_B
    /* This example:
     * connects to a remote server
     * gets the app and source defined in the server
     * defines a second source
     * creates simple queries over the 2 sources
     * gets the sink defined in the server
     * defines a second sink
     * binds and runs the subject to both sinks
     * binds and runs the subject to both queries
     * waits for the user to stop the program
     */
{
    class Program
    {
        static void Main(string[] args)
        {
            // Connect to the StreamInsight server
            using (var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer")))
            {
                /* The following entities are expected to be defined in the server:
                 * serverApp
                 * serverSource
                 * serverSink
                 */
                /* The following entities will be defined in the server by this client:
                 * serverSubject_Client_B
                 * serverProcess_Client_B_1
                 * serverProcess_Client_B_2
                 * serverProcess_Client_B_3
                 * serverProcess_Client_B_4
                 */

                // Get the existing StreamInsight APPLICATION
                var myApp = server.Applications["serverApp"];
                   
                // GET the SOURCE from the server
                var mySource = myApp.GetStreamable<long>("serverSource");

                // DEFINE a second SOURCE (returns a point event every second)
                var mySourceB = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);

                // COMPOSE a QUERY on the server source (return every even-numbered item + 2000)
                var myQuery = from e in mySource
                              where e % 2 == 0
                              select e + 2000;

                // COMPOSE a QUERY on the second source (return every odd-numbered item + 3000)
                var myQueryB = from e in mySourceB
                               where e % 2 == 1
                               select e + 3000;
                    
                // GET the SINK from the server
                var mySink = myApp.GetObserver<long>("serverSink");

                // DEFINE a second SINK
                var mySinkB = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Client_B: {0}", x)));

                // CREATE a SUBJECT
                var mySubject = myApp.CreateSubject<long,long>("serverSubject_Client_B", () => new Subject<long>());

                // BIND the SINKS to the SUBJECT
                var procB1 = mySubject.Bind(mySink).Run("serverProcess_Client_B_1");
                var procB2 = mySubject.Bind(mySinkB).Run("serverProcess_Client_B_2");
                
                // BIND the SOURCES to the SUBJECT
                var procB3 = myQuery.Bind(mySubject).Run("serverProcess_Client_B_3");
                var procB4 = myQueryB.Bind(mySubject).Run("serverProcess_Client_B_4");
                
                // Wait for the user to stop the program
                Console.WriteLine("----------------------------------------------------------------");
                Console.WriteLine("Client B is running, press Enter to exit the client");
                Console.WriteLine("----------------------------------------------------------------");
                Console.WriteLine(" ");
                Console.ReadLine();

                // Remove the entities we created
                myApp.Entities["serverSubject_Client_B"].Delete();
                procB1.Dispose();
                procB2.Dispose();
                procB3.Dispose();
                procB4.Dispose();
            }
        }
    }
}

請參閱

其他資源

StreamInsight 範例:伺服器 - 公開內嵌伺服器

StreamInsight 範例

StreamInsight 概念