Share via


Creating and Binding Query Templates

[This topic is pre-release documentation and is subject to change in future releases. Blank topics are included as placeholders.]

This topic provides development guidance for creating query templates and binding the templates to event sources or sinks. Query templates are the fundamental unit of query composition in a complex event processing (CEP) application. A query template defines the business logic required to continuously analyze and process the event streams submitted to and emitted by the CEP server. For example, if the event stream contains power usage (wattage) data from several power meters, the query logic may filter for those meters that have a wattage value that meets or exceeds a specified threshold. The query logic may then perform some additional computation on the wattage values before sending the processed events to an output device.

The query template is a design-time (logical) concept, as opposed to the query itself, which is a run-time object. The query, represented by a query template, is bound to data sources and sinks and is instantiated at run time to perform the actual event processing.

Defining Query Templates

The query template defines the query logic event processing to be performed by the CEP server. The steps to defining a query template include the following:

  1. Create the event stream object.
  2. Create the query template definition. (That is, write the query logic in terms of CEP query operators on top of the event stream object.)

Creating an Event Stream Object

Query templates are written in LINQ. A LINQ expression is always defined over an object of type CEPStream<PayloadType>, which represents the stream source for the query template. An event stream can be created in one of the following ways:

  • From an unbound stream.
    The static method Create() of the class EventStream can produce a stream with only a shape defined and no binding information, as shown in the following example. This approach suggests the development of an unbound query template, for instance to be explicitly registered into a CEP server.

    CepStream<PayloadType> inputStream = CepStream<PayloadType>.Create("inputStream");
    
  • From a user-defined input adapter factory.
    The CepStream type implements static methods that take an input adapter factory and an input configuration, and produces an input stream of a given shape as shown in the following example. The adapter factory must provide for the instantiation of the adapter with the given stream shape. This approach corresponds to the implicit server development model in which the registration of adapters is not needed as a separate step.

    CEPStream<PayloadType> inputStream1 =
                             CepStream<PayloadType>.Create (streamName, typeof(AdapterFactory), myconfig, EventShape.Edge);
    
  • From an IObservable object.
    The IObservable interface provides the method ToCepStream(), which converts an IObservable data source into a CEP stream as shown in the following example. The ToCepStream() can either take a lambda expression that defines the timestamps of the events or a lambda expression that defines an entire type mapping. In this example the timestamps are specified as coming from a payload field of the events. Note that here only a single timestamp is defined, implicitly resulting in point events. For more information about point events, see CEP Server Concepts.

    CEPStream<PayloadType> inputStream1 = myObservableSource.ToCepStream<PayloadType>(e => e.myTimeStamp);
    

Create the Query Template Definition

The query logic is specified in terms of CEP operators through LINQ statements built on top of CepStream objects, as shown in the following example. For more information about LINQ query operators, see Using LINQ in Query Templates.

CepStream<PayloadType> outputStream = from e in inputStream
                                      where e.value < 10
                                      select e;

In the explicit server development model, in which an explicit CEP application is created, the specified query logic can be registered as a QueryTemplate object into the application, as shown in the following example.

QueryTemplate qt = application.CreateQueryTemplate("samplequerytemplate", outputStream);

The registered query template can now be reused in multiple bindings and, hence, instantiated in multiple queries each bound to potentially different input and output adapters. These bindings for registered query templates are defined through the QueryBinder object, which is described in the following section.

Binding Query Templates

How a runnable query is obtained from a query template depends on the development model.

Specifying the Event Producers and Consumers Through Binding

When using the explicit server development model, explicit adapter objects are created and the output stream is bound through the query binder, as part of the process of defining a query, as shown in the following example.

// Create input adapter object from existing factory
InputAdapter myInputAdapter = application.CreateInputAdapter<TextFileInputFactory>("CSV Input", "Reading tuples from a CSV file");
// Create output adapter object from existing factory
OutputAdapter myOutputAdapter = application.CreateOutputAdapter<TextFileOutputFactory>("CSV Output", "Writing result events to a file");

// create a query binder, wrapping the defined query template object
QueryBinder myQuerybinder = new QueryBinder(qt);

// Bind input adapter to the query template's stream that was created with the name "querysource"
// and apply the runtime configuration.
myQuerybinder.BindProducer("querysource", myInputAdapter, inputConf, EventShape.Point);
// Bind output adapter to query and apply the runtime configuration.
myQuerybinder.AddConsumer("queryresult", myOutputAdapter, outputConf, EventShape.Point, StreamEventOrder.FullyOrdered);

// Create query in the application, from the query binder.
Query query = application.CreateQuery("query", myQuerybinder, "description for query");

The bindings of input and output require instances of the InputAdapter and OutputAdapter objects, which are obtained from their respective factories, as shown above.

Note that the explicit server development model allows for the separation of the following steps:

  1. Defining the query template and storing it on the server.
  2. Binding an existing query template to specific input and output adapters (using the binder), thus creating a query that can be run.

The second step can be carried out for a single query template many times potentially, creating multiple, distinct run-time instances of the same query logic.

A query can have one output adapter as the consumer, but multiple input adapters as event producers. The binding of the input streams, therefore, must refer to the stream names that were used when creating the CepStream objects in their respective query template, as shown in the following example. The example uses the same input adapter object in the input binding, but with different run-time configuration objects. When starting a query based on this binder, the CEP server creates two different runtime instances of the input adapter, each with a different event type and runtime configuration. Note that the actual query template, the creation of the binder, and the output adapter object are not shown in this example.

CepStream<SensorReading> sensorStream = CepStream<SensorReading>.Create("sensorInput");
CepStream<LocationData> locationStream = CepStream<LocationData>.Create("locationInput");

// Define query template in LINQ on top of sensorStream and locationStream
// ...
// Create query binder like above
// ...

InputAdapter csvInput = application.CreateInputAdapter<TextFileInputFactory>("CSV Input", "Reading tuples from a CSV file");

qb.BindProducer<SensorReading>("sensorInput", inputAdapter, sensorInputConf, EventShape.Interval);
qb.BindProducer<LocationData>("locationInput", inputAdapter, locationInputConf, EventShape.Edge);

AdvanceTime is also available in the implicit model (parameter in CepStream<PayloadType>. Create() ) and the observable model (parameter in ToCepStream() ).

Defining CTI Behavior in the Binding

CTIs are necessary in order to produce any output from a CEP query. However, they do not necessarily need to be enqueued in the input adapter code. Instead, CTI submission can be declaratively specified in the query binding as an optional parameter in BindProducer:

var ats = new AdvanceTimeSettings(5, TimeSpan.FromSeconds(0), AdvanceTimePolicy.Drop);
queryBinder.BindProducer<SensorReading>("sensorInput", inputAdapter, sensorInputConf, EventShape.Interval, ats);

In this example, the CTI behavior is defined to be as follows:

- Send a CTI every five events.

- Set the timestamp of the CTI to be the timestamp of the most recent event.

- If the adapter later enqueues an event that would violate the CTI (by having an earlier end timestamp), drop the event.

Another example for an AdvanceTime setting:

var ats = new AdvanceTimeSettings(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(3), AdvanceTimePolicy.Adjust);

This specifies:

- Send a CTI every ten seconds.

- Set the timestamp of the CTI to be the timestamp of the most recent event minus 3 seconds (i.e., in the past, allowing for a grace period of three seconds for subsequent late events).

- If the adapter later enqueues an event that would violate the CTI, adjust the event timestamp to the CTI's timestamp, so that the event becomes "valid".

Consuming an Event Stream Directly from the Query Template

When using the implicit server development model, once the query logic is defined, the output adapter can consume the event stream directly from the query, as shown in the following example. The CepStream.ToQuery() method takes a CepStreamConsumer object, which can be created from an output adapter factory through the CepStreamConsumer.Create() method.

// define the query logic in LINQ

CepStream<PayloadType> outputStream = from e in inputStream
                                      where e.value < 10
                                      select e;

// Create a query from the query template and feed it into the consumer.
Query query = outputStream.ToQuery<ResultType>(typeof(MyOutputAdapterFactory),
                                               outputConf,
                                               EventShape.Interval,
                                               StreamEventOrder.FullyOrdered);

Note that the output binding used in the IObservable/IObserver development model follows a different process. For more information, see Using Observables as Event Sources.

See Also

Concepts

CEP Server Concepts

Other Resources

Using LINQ in Query Templates