StreamInsight 전체 예
이 항목에서는 StreamInsight 응용 프로그램 작성과 관련된 다양한 구성 요소 및 단계에 대해 설명하며, 응용 프로그램의 전체 예를 제공합니다. StreamInsight응용 프로그램에서는 CEP(복합 이벤트 처리) 시나리오 구현을 위해 이벤트 원본, 이벤트 싱크 및 쿼리가 조합되어 사용됩니다. StreamInsight API는 이벤트 처리 응용 프로그램을 만들고 유지 관리하는 과정을 다양한 방식으로 제어하고 복잡한 작업을 진행할 수 있도록 다수의 인터페이스를 제공합니다.
응용 프로그램 배포 시의 최소 단위는 시작 및 중지가 가능한 쿼리입니다. 다음 그림에서는 쿼리를 작성하는 한 방식을 보여 줍니다. 이벤트 원본은 입력 어댑터로 표시됩니다. 어댑터는 연산자 트리로 이벤트 스트림을 공급합니다. 연산자 트리는 쿼리 템플릿 형식의 디자이너로 지정되는 적절한 쿼리 논리를 나타냅니다. 처리된 이벤트 스트림은 이벤트 싱크(보통 출력 어댑터)로 이동합니다.
CEP(복합 이벤트 처리) 기술에 익숙한 개발자는 StreamInsight 서버 개념 및 StreamInsight 서버 아키텍처의 내용을 확인하십시오.
응용 프로그램 프로세스
이 섹션에서는 일반적인 전체 응용 프로그램 작성 작업을 단계별로 설명합니다.
서버 인스턴스 및 응용 프로그램 인스턴스화
프로세스에서 가장 먼저 수행하는 작업은 StreamInsight 서버 인스턴스 및 응용 프로그램 인스턴스화입니다.
server = Server.Create(”MyInstance”);
Application myApp = server.CreateApplication("MyApp");
StreamInsight 설치 프로세스를 통해 컴퓨터에 등록된 인스턴스 이름(위의 예에서는 MyInstance)을 사용하여 서버를 만들어야 합니다. 자세한 내용은 설치(StreamInsight)를 참조하십시오.
응용 프로그램은 다른 메타데이터 엔터티를 포함하는 서버의 범위 지정 단위를 나타냅니다.
앞서 나온 예에서는 같은 프로세스에서 서버 인스턴스를 만듭니다. 그러나 원격 서버에 연결한 다음 해당 서버에서 기존 응용 프로그램에 대해 작업을 수행하는 배포 방식도 널리 사용됩니다. 다음 예에서는 원격 서버에 연결한 다음 기존 응용 프로그램에 액세스하는 방법을 보여 줍니다.
server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/StreamInsight/MyInstance"));
Application myApp = server.Applications["ExistingApp"];
로컬 및 원격 서버에 대한 자세한 내용은 StreamInsight Server 서버에 게시 및 연결을 참조하십시오.
입력 스트림 만들기
다음으로는 기존 어댑터 구현을 기반으로 입력 스트림을 만듭니다. 작업을 정확하게 수행하려면 다음 예와 같이 어댑터 팩터리를 지정해야 합니다.
var inputstream = CepStream<MyDataType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new InputAdapterConfig { someFlag = true },
EventShape.Point);
그러면 이벤트 스트림을 나타내는 CepStream 개체가 작성됩니다. 이벤트 스트림은 쿼리가 시작되고 나면 지정된 팩터리 클래스를 통해 인스턴스화된 어댑터에서 생성됩니다. 스트림에는 나중에 스트림 관련 진단을 검색하는 데 사용할 수 있는 이름이 지정됩니다. 또한 어댑터 팩터리의 구성 구조 인스턴스도 제공됩니다. 구성 구조는 런타임 관련 정보와 필요한 이벤트 셰이프(이벤트 모델)를 팩터리로 전달합니다. 팩터리에서 이러한 매개 변수를 사용하는 방법에 대한 자세한 내용은 입력 어댑터 및 출력 어댑터 만들기를 참조하십시오.
쿼리 정의
CepStream 개체는 실제 쿼리 논리 정의를 위한 기준으로 사용됩니다. 쿼리는 LINQ를 쿼리 사양 언어로 사용합니다.
var filtered = from e in inputstream
where e.Value > 95
select e;
이 예에서는 입력 스트림 개체를 만들기 위해 이전 예에서 정의했던 MyDataType 클래스 또는 구조체에 Value 필드가 포함된다고 가정합니다. 이 정의는 필터 조건자 where e.Value > 95를 충족하지 않는 모든 이벤트를 스트림에서 삭제하는 필터 연산자로 변환됩니다. LINQ 쿼리 연산자에 대한 자세한 내용은 LINQ에서 쿼리 템플릿 작성을 참조하십시오.
출력 어댑터 만들기
이 시점에서 filtered 변수 유형은 아직 CepStream입니다. 따라서 스트림을 시작 가능한 쿼리로 바꿀 수 있습니다. 시작 가능한 쿼리 인스턴스를 생성하려면 다음 예와 같이 출력 어댑터를 지정해야 합니다.
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
입력 스트림과 동일하게 출력 어댑터에 대해서도 출력 어댑터 팩터리, 구성 개체, 필요한 출력 스트림 셰이프 및 임시 순서 지정 사양이 필요합니다.
이벤트 셰이프 사양이 있으면 쿼리 출력에서 개별 이벤트 셰이프가 사용됩니다.
EventShape.Point: 모든 결과 이벤트 수명이 특정 시점 이벤트로 감소합니다.
EventShape.Interval: 모든 결과 이벤트가 간격 이벤트로 해석됩니다. 즉, 이벤트의 전체 수명이 CTI(현재 시간 증분) 이벤트로 커밋되는 경우에만 이벤트가 출력됩니다.
EventShape.Edge: 모든 결과 이벤트가 가장자리 이벤트로 해석됩니다. 즉, 이벤트 시작 시간이 가장자리 시작으로 출력되고 종료 시간이 해당하는 가장자리 종료로 출력됩니다.
스트림 이벤트 순서 매개 변수는 간격 이벤트 출력 스트림 활성화 여부에 영향을 줍니다. 매개 변수가 FullyOrdered이면 간격 이벤트가 항상 해당 시작 시간순으로 출력되고, ChainOrdered이면 간격 종료 시간으로 순서가 지정되는 출력 시퀀스가 생성됩니다.
또한 응용 프로그램 개체를 첫 번째 매개 변수로 제공해야 하는데, 이제 이 개체에는 쿼리와 메타데이터 저장소에서 이 쿼리를 보다 상세하게 식별하는 쿼리 이름 및 설명이 포함됩니다.
쿼리 시작
마지막 단계에서는 쿼리를 시작합니다. 이 예에서는 사용자가 키를 입력하여 쿼리를 중지합니다.
query.Start();
Console.ReadLine();
query.Stop();
이 전체 예에서는 CepStream.Create() 및 ToQuery() 오버로드를 통해 쿼리 템플릿이 포함된 이벤트 원본의 암시적 바인딩을 사용함으로써 작업 쿼리를 빠르게 만드는 방법을 보여 줍니다. CEP 개체의 바인딩을 보다 명시적으로 제어하려면 쿼리 바인더 사용을 참조하십시오.
전체 예
다음 예에서는 앞서 설명한 구성 요소를 조합해 완전한 응용 프로그램을 작성합니다.
Server server = null;
using (Server server = Server.Create(”MyInstance”))
{
try
{
Application myApp = server.CreateApplication("MyApp");
var inputstream = CepStream<MyDataType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new InputAdapterConfig { someFlag = true },
EventShape.Point);
var filtered = from e in inputstream
where e.Value > 95
select e;
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
query.Start();
Console.ReadLine();
query.Stop();
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}