StreamInsight 範例:用戶端 B - 建立主旨
此範例示範如何建立 StreamInsight 用戶端,這個用戶端會使用遠端伺服器和其中定義的實體。 此範例會特別示範如何建立繫結至多個來源和接收的主旨。 主旨會使用從來源資料流傳遞而來的資料,並將該資料傳遞至接收。 如需有關 StreamInsight 實體的詳細資訊,請參閱<StreamInsight 概念>。
此範例會使用本節所建立之伺服器範例中的遠端伺服器和實體。 若要一併使用本節中的範例,請執行下列動作:
執行伺服器範例<StreamInsight 範例:伺服器 - 公開內嵌伺服器>。
執行下列其中一個或兩個用戶端範例:
逐步解說
典型的 StreamInsight 用戶端一般都會遵循下列基本步驟:
建立 StreamInsight 伺服器執行個體
建立或取得 StreamInsight 應用程式
定義或取得來源
針對來源撰寫查詢
定義或取得接收
繫結並執行查詢和接收
在此範例中,用戶端會從伺服器取得現有的應用程式和實體,但是程式也會建立主旨以及其他來源和接收。
連接到伺服器
建立 StreamInsight 用戶端程式首先是從具現化 StreamInsight 伺服器執行個體開始。 在此範例中,用戶端會連接到名為 "MyStreamInsightServer" 的遠端伺服器。
var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer"))
如需有關連接到 StreamInsight 伺服器時可搭配使用之選項的詳細資訊,請參閱<發行及連接到 StreamInsight 伺服器>。
取得伺服器應用程式
在此範例中,用戶端會使用已在遠端伺服器中建立的 StreamInsight 應用程式, 而其中已定義此用戶端會用到的所有伺服器實體,並且也將會建立新實體。
myApp = server.Applications["serverApp"];
取得伺服器來源並定義新來源
接著,取得此伺服器上已定義的來源,以及定義新來源。 在此範例中,第二個來源內的資料是每秒所產生之時間點事件的簡單暫時資料流。
var mySource = myApp.GetObservable<int>("serverSource");
var mySourceB = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);
針對來源撰寫查詢
接著,針對這兩個來源撰寫查詢。 在此範例中,第一個查詢會從伺服器來源擷取每個偶數資料值,並且傳回該值加上 2000 後的值;而第二個查詢則會從第二個來源擷取每個奇數資料值,並且傳回該值加上 3000 後的值。
var myQuery = from e in mySource
where e % 2 == 0
select e + 2000;
var myQueryB = from e in mySourceB
where e % 2 == 1
select e + 3000;
取得伺服器接收並定義新接收
接著,取得此伺服器上已定義的接收,以及定義新接收。 在此範例中,第二個接收只是簡單的函式,可將資料流值寫入至主控台。
var mySink = myApp.GetObserver<int>("serverSink");
var mySinkB = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Client_B: {0}", x)));
建立主旨
接著,建立主旨。 主旨是伺服器中的物件,可以繫結至來源和接收,以取用來自來源的資料以及將資料傳遞至接收。
var mySubject = myApp.CreateSubject<long,long>("serverSubject_Client_B", () => new Subject<long>());
繫結並執行查詢和接收
現在,將主旨繫結至每個接收 (包含之前在伺服器中定義的接收,以及此用戶端定義的接收)。 執行繫結時,每個繫結都是各自獨立的處理序。 因為尚未繫結來源,所以當主旨繫結至接收時,資料並不會流動。 如果主旨在繫結至接收前就先繫結至來源,則資料會立即開始流向主旨而導致遺失。
var procB1 = mySubject.Bind(mySink).Run("serverProcess_Client_B_1");
var procB2 = mySubject.Bind(mySinkB).Run("serverProcess_Client_B_2");
接著,將主旨繫結至查詢。 執行下列處理序時,資料隨即開始從來源經由查詢流向每個接收。
var procB3 = myQuery.Bind(mySubject).Run("serverProcess_Client_B_3");
var procB4 = myQueryB.Bind(mySubject).Run("serverProcess_Client_B_4");
此用戶端目前已在伺服器中定義下列數個實體:
serverSubject_Client_B
serverProcess_Client_B_1
serverProcess_Client_B_2
serverProcess_Client_B_3
serverProcess_Client_B_4
完整範例
下列範例會結合之前所述的元件來建立完整應用程式。 為求簡便,此範例不會檢查潛在的錯誤狀況,並且會假設<StreamInsight 範例:伺服器 - 公開內嵌伺服器>範例中建立的伺服器已正在執行,而且所需的實體也都已經建立。
using System;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace StreamInsight21_example_Client_B
/* This example:
* connects to a remote server
* gets the app and source defined in the server
* defines a second source
* creates simple queries over the 2 sources
* gets the sink defined in the server
* defines a second sink
* binds and runs the subject to both sinks
* binds and runs the subject to both queries
* waits for the user to stop the program
*/
{
class Program
{
static void Main(string[] args)
{
// Connect to the StreamInsight server
using (var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer")))
{
/* The following entities are expected to be defined in the server:
* serverApp
* serverSource
* serverSink
*/
/* The following entities will be defined in the server by this client:
* serverSubject_Client_B
* serverProcess_Client_B_1
* serverProcess_Client_B_2
* serverProcess_Client_B_3
* serverProcess_Client_B_4
*/
// Get the existing StreamInsight APPLICATION
var myApp = server.Applications["serverApp"];
// GET the SOURCE from the server
var mySource = myApp.GetStreamable<long>("serverSource");
// DEFINE a second SOURCE (returns a point event every second)
var mySourceB = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);
// COMPOSE a QUERY on the server source (return every even-numbered item + 2000)
var myQuery = from e in mySource
where e % 2 == 0
select e + 2000;
// COMPOSE a QUERY on the second source (return every odd-numbered item + 3000)
var myQueryB = from e in mySourceB
where e % 2 == 1
select e + 3000;
// GET the SINK from the server
var mySink = myApp.GetObserver<long>("serverSink");
// DEFINE a second SINK
var mySinkB = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Client_B: {0}", x)));
// CREATE a SUBJECT
var mySubject = myApp.CreateSubject<long,long>("serverSubject_Client_B", () => new Subject<long>());
// BIND the SINKS to the SUBJECT
var procB1 = mySubject.Bind(mySink).Run("serverProcess_Client_B_1");
var procB2 = mySubject.Bind(mySinkB).Run("serverProcess_Client_B_2");
// BIND the SOURCES to the SUBJECT
var procB3 = myQuery.Bind(mySubject).Run("serverProcess_Client_B_3");
var procB4 = myQueryB.Bind(mySubject).Run("serverProcess_Client_B_4");
// Wait for the user to stop the program
Console.WriteLine("----------------------------------------------------------------");
Console.WriteLine("Client B is running, press Enter to exit the client");
Console.WriteLine("----------------------------------------------------------------");
Console.WriteLine(" ");
Console.ReadLine();
// Remove the entities we created
myApp.Entities["serverSubject_Client_B"].Delete();
procB1.Dispose();
procB2.Dispose();
procB3.Dispose();
procB4.Dispose();
}
}
}
}