StreamInsight 範例:用戶端 A - 使用遠端伺服器
此範例示範如何建立 StreamInsight 用戶端,這個用戶端會使用遠端伺服器和其中定義的實體。 如需有關 StreamInsight 實體的詳細資訊,請參閱<StreamInsight 概念>。
此範例會使用本節所建立之伺服器範例中的遠端伺服器和實體。 若要一併使用本節中的範例,請執行下列動作:
執行伺服器範例<StreamInsight 範例:伺服器 - 公開內嵌伺服器>。
執行下列其中一個或兩個用戶端範例:
逐步解說
典型的 StreamInsight 用戶端一般都會遵循下列基本步驟:
建立 StreamInsight 伺服器執行個體
建立或取得 StreamInsight 應用程式
定義或取得來源
針對來源撰寫查詢
定義或取得接收
繫結並執行查詢和接收
在此範例中,用戶端會從伺服器取得現有的應用程式和實體。
連接到伺服器
建立 StreamInsight 用戶端程式首先是從具現化 StreamInsight 伺服器執行個體開始。 在此範例中,用戶端會連接到名為 "MyStreamInsightServer" 的遠端伺服器。
var server = Server.Connect
(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer"))
如需有關連接到 StreamInsight 伺服器時可搭配使用之選項的詳細資訊,請參閱<發行及連接到 StreamInsight 伺服器>。
取得伺服器應用程式
在此範例中,用戶端會使用已在遠端伺服器中建立的 StreamInsight 應用程式, 而其中已定義此用戶端會用到的所有伺服器實體。
myApp = server.Applications["serverApp"];
取得來源
接著,取得此伺服器上已定義的來源。
var mySource = myApp.GetObservable<int>("serverSource");
針對來源撰寫查詢
接著,針對輸入來源撰寫查詢。 此查詢使用 LINQ 當做查詢規格語言。 在此範例中,查詢會傳回每個偶數事件加上 1000 之後的值。
var myQuery = from e in mySource
where e % 2 == 0
select e + 1000;
取得接收
接著,取得此伺服器上已定義的接收。
var mySink = myApp.GetObserver<int>("serverSink");
繫結並執行查詢和接收
此時您已經可以將查詢繫結至接收,然後在伺服器的處理序中執行。
var proc = myQuery.Bind(mySink).Run("serverProcess_Client_A");
此處理序是該用戶端在伺服器中建立的唯一實體。
完整範例
下列範例會結合之前所述的元件來建立完整應用程式。 為求簡便,此範例不會檢查潛在的錯誤狀況,並且會假設<StreamInsight 範例:伺服器 - 公開內嵌伺服器>範例中建立的伺服器已正在執行,而且所需的實體也都已經建立。
using System;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using System.Reactive;
using System.Reactive.Linq;
namespace StreamInsight21_example_ClientA
/* This example:
* connects to a remote server
* gets the app, source, and sink defined in the server
* creates a simple query over the source, binds it to the sink, and runs it
* waits for the user to stop the program
*/
{
class Program
{
static void Main(string[] args)
{
// Connect to the StreamInsight server
using (var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer")))
{
/* The following entities are expected to be defined in the server:
* serverApp0
* serverSource0
* serverSink0
*/
/* The following entity will be defined in the server by this client:
* serverProcess_Client_A
*/
// Get the existing StreamInsight APPLICATION
var myApp = server.Applications["serverApp"];
// GET the SOURCE from the server
var mySource = myApp.GetStreamable<long>("serverSource");
// Compose a QUERY on the source (return every even-numbered item + 1000)
var myQuery = from e in mySource
where e % 2 == 0
select e + 1000;
// GET the SINK from the server
var mySink = myApp.GetObserver<long>("serverSink");
// BIND the QUERY to the SINK and RUN it
using (var proc = myQuery.Bind(mySink).Run("serverProcess_Client_A"))
{
// Wait for the user to stop the program
Console.WriteLine("----------------------------------------------------------------");
Console.WriteLine("Client A is running, press Enter to exit the client");
Console.WriteLine("----------------------------------------------------------------");
Console.WriteLine(" ");
Console.ReadLine();
}
}
}
}
}
請參閱
StreamInsight 範例:伺服器 - 公開內嵌伺服器
StreamInsight 範例
StreamInsight 概念