使用查詢繫結器
「查詢繫結器」(Query Binder) 是一種用戶端開發模型,它會在建立 StreamInsight 應用程式時提供最大的彈性和可重複使用性等級。在這個模型中,配接器和查詢範本會註冊為個別的中繼資料物件,這些物件之後可以繫結在一起來具現化查詢。如此可讓開發人員使用物件模型 API 上層的明確查詢繫結來完整控制他的應用程式和開發環境。
明確伺服器開發模型的一般使用案例包括需要以下項目的 StreamInsight 應用程式:
對 StreamInsight 伺服器的完整控制和存取權。
透過靜態或動態查詢撰寫來重複使用查詢,或是重複使用協力廠商所定義的配接器、事件類型和查詢範本。
查詢繫結器開發模型的主要特性
查詢繫結器模型具有以下的主要特性:
開發人員必須明確建立所有中繼資料物件,並將其註冊到 StreamInsight 伺服器。
此模型支援多個物件的建立和使用 (查詢範本、查詢、應用程式和配接器)。所有物件都必須在應用程式之下註冊。
查詢範本和查詢執行個體必須先明確註冊到伺服器,然後才可執行查詢。輸入和輸出配接器必須先明確註冊之後,查詢範本或查詢才可參考這些物件。此外,所有物件都必須在應用程式之下註冊。配接器和查詢範本使用的事件類型會以隱含方式註冊。
範例
此範例會在伺服器上建立 StreamInsight 伺服器物件以及名為 myApp 的應用程式物件。然後它會建立及註冊匯入、處理及匯出事件資料流所需的所有必要 StreamInsight 物件。
首先會建立伺服器和應用程式物件。
server = Server.Create(“MyInstance”);
Application myApp = server.CreateApplication("MyApp");
接下來會在應用程式中註冊輸入和輸出配接器。
InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");
查詢範本會在未繫結資料流上層指定。建立未繫結資料流所需的唯一參數是資料流名稱,之後的配接器繫結需要使用它。
var inputstream = CepStream<MyDataType>.Create("filterInput");
var filtered = from e in inputstream
where e.Value > 95
select e;
QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", filtered);
- 最後一個呼叫會在應用程式中註冊查詢範本。已註冊的查詢範本現在可以在多個繫結中重複使用,因此會在多個查詢中具現化,每一個查詢都可能繫結到不同的輸入和輸出配接器。適用於已註冊之查詢範本的這些繫結會透過 QueryBinder 物件來定義:
QueryBinder queryBinder = new QueryBinder(filterQT);
queryBinder.BindProducer<MyDataType>("filterInput",
inputAdapter,
new InputAdapterConfig { someFlag = true },
EventShape.Point);
queryBinder.AddConsumer("filterOutput",
outputAdapter,
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);
BindProducer() 方法會將輸入配接器物件 (必須在應用程式中註冊) 繫結到具有指定之名稱的資料流,在這裡就是 "filterInput"。如此可讓您區分查詢範本的多個進入點。需要繫結特有的參數 (配接器組態及所需的事件圖形) 以及輸入配接器。
AddConsumer() 方法會將輸出配接器物件 (必須在應用程式內註冊) 繫結到查詢範本的單一外送資料流。提供的輸出資料流名稱 (這裡的 "validated") 可用來識別資料流進行診斷。如果是輸入配接器,將會針對輸出配接器提供繫結特有的參數。
根據查詢繫結器、查詢識別碼和文字描述來建立查詢物件。最後一個步驟是開始查詢。
query.Start();
具有多個輸入資料流的查詢
下列範例說明如何建立一個使用多個輸入資料流的查詢範本。查詢範本可以有多個進入點,每一個都是從不同的資料來源提供,例如必須聯結兩個資料流時。適當的資料流關聯會透過資料流名稱的指定來進行,如下列範例所示。
CepStream<SensorReading> sensorStream = CepStream<SensorReading>.Create("sensorInput");
CepStream<LocationData> locationStream = CepStream<LocationData>.Create("locationInput");
// Define query template in LINQ on top of sensorStream and locationStream
// ...
// Create query binder like in the previous example
// ...
InputAdapter inputAdapter = application.CreateInputAdapter<TextFileInputFactory>("CSVInput", "Reading tuples from a CSV file");
qb.BindProducer<SensorReading>("sensorInput", inputAdapter, sensorInputConf, EventShape.Interval);
qb.BindProducer<LocationData>("locationInput", inputAdapter, locationInputConf, EventShape.Edge);
修改現有的應用程式
請注意,您會在查詢繫結器模型中搭配查詢範本和配接器物件一起工作,不一定要在相同的應用程式中建立它們。下列範例假設已經連接到現有的伺服器,並透過 StreamInsight 物件模型 API 擷取現有的中繼資料實體,而不是建立這些實體。
Application myApp = server.Applications["app1"];
QueryTemplate myQueryTemplate = myApp.QueryTemplates["qt1"];
InputAdapter myInputAdapter = myApp.InputAdapters["sensorAdapter5"];
使用保存的中繼資料存放區
當建立 StreamInsight 伺服器時, Server.Create() 方法的選擇性參數就是要使用的中繼資料存放區類型。根據預設,中繼資料會儲存在記憶體中。您也可以選擇透過 SQL Server Compact 3.5 資料庫將中繼資料保存在磁碟上。下列範例示範如何將 SQL Server Compact 3.5 資料庫指定為中繼資料存放區。
SqlCeMetadataProviderConfiguration metadataConfiguration = new SqlCeMetadataProviderConfiguration();
metadataConfiguration.DataSource = "SIMetadata.sdf";
metadataConfiguration.CreateDataSourceIfMissing = streamHostConfig.CreateDataSourceIfMissing;
server = Server.Create(”MyInstance”, metadataConfiguration);
Application myApp = server.CreateApplication("MyApp");
請注意,建立伺服器時指定現有的中繼資料庫將會從指定的檔案讀取所有中繼資料。然後可以透過 StreamInsight 物件模型 API 來擷取中繼資料實體。
完整範例
using (Server server = Server.Create("MyInstance"))
{
try
{
Application myApp = server.CreateApplication("MyApp");
InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");
var inputstream = CepStream<MyDataType>.Create("filterInput");
var filtered = from e in inputstream
where e.Value > 95
select e;
QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", "Description of the query template", filtered);
QueryBinder queryBinder = new QueryBinder(filterQT);
queryBinder.BindProducer<MyDataType>("filterInput",
inputAdapter,
new InputAdapterConfig { someFlag = true },
EventShape.Point);
queryBinder.AddConsumer("filterOutput",
outputAdapter,
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);
query.Start();
Console.ReadLine();
query.Stop();
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}