StreamInsight エンド ツー エンドの例
このトピックでは、さまざまなコンポーネントについて説明します。また、StreamInsight アプリケーションの作成に必要な手順についても説明します。アプリケーションのエンド ツー エンドの例も含まれています。StreamInsight アプリケーションは複雑なイベント処理のシナリオを実装するために、イベント ソース、イベント シンク、およびクエリを一体化します。StreamInsight API は、イベント処理アプリケーションの作成と保守において、さまざまなレベルの制御と複雑性をサポートする各種インターフェイスを提供します。
アプリケーション配置の最小単位は、開始と終了ができるクエリです。次の図は、クエリを作成する方法の 1 つを示しています。イベント ソースは入力アダプターで表されます。このアダプターはイベント ストリームを操作ツリーにフィードし、デザイナーが指定したクエリ ロジックをクエリ テンプレートという形で表します。処理されたイベント ストリームはイベント シンク (通常は出力アダプター) に入ります。
Complex Event Processing という用語に慣れていない開発者は、「StreamInsight サーバーの概念」および「StreamInsight サーバー アーキテクチャ」を確認してください。
アプリケーション プロセス
このセクションでは、エンド ツー エンドのアプリケーションを作成するときの一般的な手順を説明します。
サーバー インスタンスとアプリケーションのインスタンスを作成する
このプロセスは StreamInsight サーバー インスタンスとアプリケーションのインスタンスの作成から開始します。
server = Server.Create(”MyInstance”);
Application myApp = server.CreateApplication("MyApp");
サーバーは、StreamInsight のセットアップ プロセスでコンピューターに登録したインスタンス名 (前の例では MyInstance) を使用して作成する必要があります。詳細については、「インストール (StreamInsight)」を参照してください。
アプリケーションはサーバーのスコープ決定単位を表し、他のメタデータ エンティティを含んでいます。
前の例では、同じプロセスでサーバー インスタンスを作成していますが、リモート サーバーに接続して、そこにある既存のアプリケーションを操作する配置方法もよく使用されます。次の例では、リモート サーバーに接続して既存のアプリケーションにアクセスする方法を示します。
server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/StreamInsight/MyInstance"));
Application myApp = server.Applications["ExistingApp"];
ローカル サーバーとリモート サーバーの詳細については、「StreamInsight サーバーへのパブリッシュおよび接続」を参照してください。
入力ストリームを作成する
次に、既存のアダプター実装の上に入力ストリームを作成します。つまり、次の例で示すようにアダプター ファクトリを指定する必要があります。
var inputstream = CepStream<MyDataType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new InputAdapterConfig { someFlag = true },
EventShape.Point);
これで CepStream オブジェクトが作成されます。このオブジェクトは、特定のファクトリ クラスを使用してインスタンス化されたアダプターによって生成されるイベント ストリーム (クエリが開始されると生成される) を表します。このストリームには名前が付けられ、後でストリーム固有の診断を取得するために使用できます。さらに、アダプター ファクトリの構成構造のインスタンスも提供されます。構成構造は、実行時に固有の情報と共にイベントの形状 (イベント モデル) もファクトリに渡します。これらのパラメーターをファクトリが使用する方法については、「入力アダプターと出力アダプターの作成」を参照してください。
クエリを定義する
CepStream オブジェクトは、実際のクエリ ロジックの定義の基盤として使用されます。クエリ指定言語としては LINQ が使用されます。
var filtered = from e in inputstream
where e.Value > 95
select e;
この例では、前の例で入力ストリーム オブジェクトを作成するために定義した MyDataType というクラスまたは構造体に、Value というフィールドが含まれていると想定します。この定義はフィルター演算子に変換され、フィルターの述語 where e.Value > 95 を満たさないイベントをすべてストリームから削除します。LINQ クエリ演算子の詳細については、「LINQ でのクエリ テンプレートの記述」を参照してください。
出力アダプターを作成する
この時点で、変数 filtered の型はまだ CepStream です。これで、開始できるクエリにストリームを変換できます。開始できるクエリ インスタンスを生成するには、次の例のように出力アダプターを指定する必要があります。
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
入力ストリームと同様に、出力アダプターにも出力アダプター ファクトリ、構成オブジェクト、出力ストリームの形状、および一時的な順序付けの指定が必要です。
イベントの形状指定によって、次のようにクエリ出力時のそれぞれのイベント形状が確実になります。
EventShape.Point: 結果イベント lifetime はすべてポイント イベントに縮小されます。
EventShape.Interval: 結果イベントはすべて間隔イベントと解釈されます。つまり、その完全な lifetime が Current Time Increment (CTI) イベントによってコミットされる場合にのみ出力されます。
EventShape.Edge: 結果イベントはすべてエッジ イベントと解釈されます。つまり、その開始時刻が開始エッジ、終了時刻が終了エッジとして出力されます。
ストリーム イベントの順序パラメーターは、間隔イベント出力ストリームの活動状態に影響を及ぼします。FullyOrdered では、間隔イベントが常に開始時刻の順に出力されますが、ChainOrdered では、間隔終了時刻の順に出力シーケンスが生成されます。
また、最初のパラメーターとしてアプリケーション オブジェクトを指定する必要があります。これにはクエリとクエリ名、およびメタデータ ストアでこのクエリをさらに絞り込む説明が含まれています。
クエリを開始する
最後の手順はクエリの開始です。この例では、クエリはユーザーがキーを入力して終了します。
query.Start();
Console.ReadLine();
query.Stop();
このエンド ツー エンドの例では、CepStream.Create() と ToQuery() のオーバーロードを介してクエリ テンプレートでイベント ソースの暗黙的なバインドを使用して、実用クエリを簡単に作成する方法を示します。CEP オブジェクトのバインドを明示的に制御する方法の詳細については、「クエリ バインダーの使用」を参照してください。
完全なサンプル コード
次の例では、前に説明したコンポーネントを組み合わせて完全なアプリケーションを作成します。
Server server = null;
using (Server server = Server.Create(”MyInstance”))
{
try
{
Application myApp = server.CreateApplication("MyApp");
var inputstream = CepStream<MyDataType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new InputAdapterConfig { someFlag = true },
EventShape.Point);
var filtered = from e in inputstream
where e.Value > 95
select e;
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
query.Start();
Console.ReadLine();
query.Stop();
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}