Creating Input and Output Adapters
This topic provides the general information you need to create input and output adapters for your complex event processing (CEP) application using the StreamInsight platform. Adapters are software transformers that deliver events into or out of a StreamInsight server.
Understanding Event Flow and Control
When creating adapters, it is important to understand the flow of events through the StreamInsight server and how the input and output adapters control this flow. As shown in the following illustration, the flow of events from the source, through the standing query, and to the sink is unidirectional. Events are read from a source by the input adapter, which delivers them to the query. The input events, or new events resulting from the processing of the input events, are pushed from one operator to the next in the query. The query delivers the processed events to the output adapter, which delivers the events to the sink. The illustration depicts a scenario in which a StreamInsight query is bound to two input adapter instances a1 and a2, and output adapter instance a4.
While the event flow is unidirectional from source to sink, the flow and execution control for event retrieval and transfer at some of the interaction points between the components can be bi-directional. These interaction points are shown in the illustration as READ, ENQUEUE, DEQUEUE, and WRITE.
Your input adapter implementation should perform the READ operation using access mechanisms specific to the source device (such as a file or database) and perform the ENQUEUE operation using the adapter APIs. Similarly, the output adapter implementation should perform the WRITE operation using access mechanisms specific to the sink device, and perform the DEQUEUE operation using the adapter APIs. You must implement the ENQUEUE and DEQUEUE operations according to a design pattern specified by an adapter state transition diagram, described later in this topic.
From an event flow control perspective, you can imagine events being pushed from a provider to a consumer (denoted by block arrows from left to right) or being pulled by the consumer from a provider (denoted by the hooked arrows). At the READ and WRITE interaction points, your adapter implementation can adopt either the push or pull approaches for event flow control. Some of the factors to consider for this interaction are the event rates that the source or sink is capable of, your adapter's ability to throttle the source or sink, and any buffering capabilities that you can implement.
For source devices that pump out events at very low latency and are difficult to throttle, a typical approach is to implement an adapter where the source device pushes events into the adapter. Examples of such devices are sensors (machine-driven events), ticker plants, and network ports. For devices with higher latencies (files, databases), consider an implementation where the adapter pulls data from the source. Similarly, on the output side, an output adapter for a device that can accept events at very high throughput can be implemented to push events into the device. Slower output devices may adopt an approach where the device polls the adapter whenever it is ready to consume events.
At the ENQUEUE interaction point, the StreamInsight server supports a push model. This means that the adapter design pattern allows you to enqueue as many events as much the engine can consume at any point in time. At the DEQUEUE interaction point, the StreamInsight server supports a pull model. This means that the adapter design pattern expects you to pull events from the server as fast as the engine can provide.
Given this, the throttling policy for the StreamInsight server is very straightforward. Assuming a simple pass-through query with no blocking operations, the rate at which a StreamInsight server can consume events from an input adapter at the ENQUEUE interaction point is only limited by the rate at which the output adapter can consume events from the server at the DEQUEUE interaction point. The extent to which the StreamInsight server pushes back on the input adapter during ENQUEUE is determined by how fast the query is able to release the output, and how fast the output adapter is able to consume this output. StreamInsight offers an extensive set of diagnostic views that help you measure the event rates at each of these interaction points. For more information, see Monitoring the StreamInsight Server and Queries.
Adapter Development Tasks
Use the following checklist to develop the adapter.
Determine the type of adapter (input or output) that you need.
An input adapter reads the incoming events in the format in which they are supplied and transforms this data into a format that is consumable by the StreamInsight server.
An output adapter receives events processed by the StreamInsight server, transforms the events into a format expected by the output device, and emits the data to that device.
Determine the event type.
For an input adapter, define the event type that describes the payload of events provided by the source. For an output adapter, specify the event type that describes the payload of events consumed by the sink. For more information about event payloads, see StreamInsight Server Concepts.
You specify and build a typed adapter for a source or sink that always produces or consumes events of a fixed payload format in which the number of fields and their types are known in advance. The main advantage of the typed adapter is that the implementation of creating events for enqueue into the StreamInsight server is relatively easy. Because the field types are already known, you can use IntelliSense in Visual Studio (or equivalent feature in another integrated development environment) to populate the fields.
You specify and build an untyped adapter if the source or sink that produces or consumes different payload formats. The main advantage of an untyped adapter is in the flexibility it provides to specify the event type at the time of query binding, rather than tie the adapter implementation to a specific event type. Compared to the typed adapter, the implementation of the untyped adapter is more involved. The untyped input adapter must be written in such a way that the type of each field can be determined from configuration parameters provided during query bind time, populate the fields one at a time, and then enqueue the event. Similarly, the untyped output adapter must be able to retrieve the result of query processing from a dequeued event based on configuration information provided on output.
It is important to note that an adapter instance (typed or untyped) that is bound to the query always emits events containing payloads of one specific type. For more information, see Creating Event Types.
Determine the event model.
Determine the event model for the input and output events. StreamInsight supports three event models: point, interval, and edge. If the source provides events of a fixed event model, you can design an input adapter for that event model alone. Similarly, if the sink requires events of a particular model, you can design an output adapter for that event model alone. However, most applications may have the need for all event models for a particular event type. We recommend that you build a typed or untyped adapter for each of the event models. For more information about event models, see StreamInsight Server Concepts.
The input and output AdapterFactory classes enable you to package these adapters together. The correct adapter can be instantiated at the time of query binding based on configuration parameters.
Choose the corresponding adapter base class.
Based on the event type and model, select the appropriate adapter base class. The class nomenclature follows the pattern [Typed][Point | Interval | Edge][Input | Output]. Untyped adapters do not have the typed prefix.
Adapter type
Input adapter base class
Output adapter base class
Typed Point
TypedPointInputAdapter
TypedPointOutputAdapter
Untyped Point
PointInputAdapter
PointOutputAdapter
Typed Interval
TypedIntervalInputAdapter
TypedIntervalOutputAdapter
Untyped Interval
IntervalInputAdapter
IntervalOutputAdapter
Typed Edge
TypedEdgeInputAdapter
TypedEdgeOutputAdapter
Untyped Edge
EdgeInputAdapter
EdgeOutputAdapter
For more information, see Microsoft.ComplexEventProcessing.Adapters.
Design the input and output AdapterFactory classes.
An AdapterFactory is a container class for adapters. You must implement a factory class. The base factory classes are organized as shown below.
Adapter type
Input adapter base class
Output adapter base class
Typed
ITypedInputAdapterFactory
ITypedOutputAdapterFactory
Untyped
IInputAdapterFactory
IOutputAdapterFactory
Typed with resiliency support
IHighWaterMarkTypedInputAdapterFactory
IHighWaterMarkTypedOutputAdapterFactory
Untyped with resiliency support
IHighWaterMarkInputAdapterFactory
IHighWaterMarkOutputAdapterFactory
The factory class serves the following purposes:
It enables sharing of resources between different adapter implementations for a given class of devices (CSV file, SQL Server database, web server Common Log Format) or application requirement, and facilitates passing configuration parameters to the adapter constructor. For example, an application may require all three event models (point, interval, and edge). A single factory can support three adapter implementations, one for each event model. As another example, the application may have the same event source such as a database table, but the source generates multiple event payload structures from the same source based on the queries that are executed. In this case, a single factory can support adapter implementations to handle each payload structure.
It provides a gateway for the adapter to the server runtime. The adapter developer must implement the Create() and Dispose() methods in the adapter factory for the adapter class. These methods are invoked by the server during query startup and shutdown.
It provides a gateway for the adapter to pre-runtime configuration information. This is especially important for untyped adapters, which must determine the type of each field in the structure from configuration parameters provided during query bind time. You can define the configuration structure in the factory class and pass this configuration structure through the Create() method to the constructor method of your adapter class. This configuration structure is serialized using DataContractSerialization. Apart from this constraint, the development methodology gives you the full flexibility in the definition and use of this configuration structure in terms of how you populate it, and use it in the adapter constructor.
It provides a way to produce current time increments (CTIs) without explicitly enqueueing them through the input adapter. By implementing the ITypedDeclareAdvanceTimePolicy (for a typed adapter factory) and IDeclareAdvanceTimePolicy (for an untyped adapter factory) interface in the adapter factory class, the user can specify CTI frequency and time stamps. This simplifies the adapter code and can affect every event stream that the factory produces through its adapter instances. For more information, see [AdvanceTimeSettingsClass].
In resilient applications, it supports resiliency by providing the high-water mark to the input adapter for replay of missed events, and by providing the high-water mark and offset to the output adapter for the elimination of duplicate events. For more information see StreamInsight Resiliency.
Build and test the adapter.
Compile and build the adapter as a .NET assembly. Test the adapter for basic operations against a simple passthrough query that reads events from an input adapter and outputs it to the output adapter without any complex query processing. This will validate that the adapter is reading and writing from devices and is able to enqueue and dequeue events.
State Machine of the Adapter
The state machine that defines the interaction between an adapter and the StreamInsight server is the same for both input and output adapters. This is significant because the state machine provides you with a consistent development model. The state machine is shown in the following illustration.
The main features and requirements to make this state machine work are as follows:
Start() and Resume() are methods called by the StreamInsight server and must be implemented by you as the adapter developer. In addition, you must also implement the constructor method for your adapter class and the Dispose() method, which is inherited from the base class.
In turn, your adapter implementation must call the following methods provided by the adapter SDK:
Enqueue() for the input adapter. This returns the values EnqueueOperationResult.Success or EnqueueOperationResult.Full.
Dequeue() for the output adapter. This returns the values DequeueOperationResult.Success or DequeueOperationResult.Empty.
Ready(). This returns a Boolean value TRUE or FALSE.
Stopped(). This returns a Boolean value TRUE or FALSE.
The StreamInsight server asynchronously calls the internal method (denoted as StopQuery()) on behalf of the user when an administrator or query developer stops query execution through methods in the server API.
Calls to Enqueue() and Dequeue() return the status Full and Empty respectively when the adapter is in one the following states:
Suspended
Stopping
Calls to Enqueue() and Dequeue() cause an exception to be raised when the adapter is in one of the following states:
Created
Stopped
Calls to Ready() cause an exception to be raised when the adapter is in one of the following states:
Created
Running
Stopped
An adapter transitions through some or all of the five states (Created, Running, Suspended, Stopping and Stopped) during its course of operation. A state transition occurs before the StreamInsight server calls Start() or Resume() and after the adapter calls Enqueue(), Dequeue(), Ready(), and Stopped().
The StreamInsight server and the adapter never share the same thread. The server always calls Start() or Resume() on a separate worker thread. The server obtains this thread from an operating system thread pool on behalf of the adapter. This implies that the Start() and Resume() methods have the full power and flexibility to use the worker thread as needed (for example, for spawning off more threads for asynchronous reads or writes). Given this, you must exercise caution and best practices in your use of system resources from this thread.
The API eliminates the need for inherent synchronization between Start() and Resume() operations (threads). The server always calls Resume() after (and only after) Ready() is called by the adapter. However, be aware that synchronization might be required for the device-facing tasks of reading, writing, or buffering events, especially in asynchronous I/O scenarios. We recommend using nonblocking I/O as a best practice.
If the adapter can be idle, the adapter should periodically check the state to determine whether it was asked to stop.
Life Cycle of Adapter Interaction with the Server
The handshake between the StreamInsight server and the adapter is always synchronous. So at any point in its execution, the adapter can check its state and react accordingly. The life cycle of the adapter interaction with the StreamInsight server consists of the following operations, which correspond to the state machine shown in the earlier illustration.
Created
An adapter instance begins interacting with the StreamInsight server when the query is started (by making a corresponding call in the StreamInsight server API).
Running
The server puts the adapter in a Running state and calls Start() on the adapter asynchronously, and guarantees to make this call only one time. When the adapter is in the Running state, the adapter can enqueue or dequeue events into or from the server.
Ideally, the adapter will be in the Running state most of the time. The recommended design pattern is to invoke the reader or writer routine, preferably in a separate thread, from the Start() method, and return from the Start() routine, thereby quickly relinquishing the worker thread.
The reader routine (assume it is called ProduceEvents() as an example) reads events from the source and calls Enqueue() to push events into the server. In the case of an output adapter, a writer routine (assume it is called ConsumeEvents() as an example) calls Dequeue() to pull events from the server and writes them into the sink.
Suspended
When the server is unable to receive an enqueued event, or output an event for dequeue, the input or output adapter is put into a Suspended state. This causes the invocations of Enqueue() and Dequeue() to return FULL and EMPTY status respectively. In the Suspended state, you can implement housekeeping operations such as saving the position of the last read record from the database or line from the file. At the end of this optional section, you must invoke the Ready() method to communicate to the server that the adapter is ready to be resumed. If the routine is executing on the same worker thread as Start() itself, you must return from the Start() routine itself.
In response to a Ready() invocation, the server returns the adapter to the Running state and always calls Resume() asynchronously on a different worker thread. You can design Resume() to enqueue or dequeue the last failed iteration, and then call ProduceEvents() or ConsumeEvents().This pattern can continue until the adapter transitions to a Stopped or a Stopping state.
Stopping
At any point in the Running or Suspended state, the server can move the adapter to a Stopping state in response to an asynchronous request to stop the query. In this state, invoking an Enqueue() or Dequeue() also returns the status FULL or EMPTY respectively.
The Stopping state provides the adapter implementation a staging area to correctly prepare itself to stop. You can implement the adapter to relinquish all the resources that it has obtained (threads, memory) and then invoke the Stopped() method. Until this method is called, the server will not stop the adapter.
Note that the adapter may be transitioned into a Stopping state asynchronously. The adapter requires some means to detect that it has entered the Stopping state. As discussed above, the design pattern is for the adapter to invoke Ready() when it is suspended. In response, the server invokes the Resume() method one more time, thereby enabling the detection of the Stopping state in the Resume() method. As a best practice, we recommend placing the check for the Stopping state as the first block of code in your Start() and Resume() implementation.
Stopped
The adapter code can call Stopped() at any point. This puts the adapter in the Stopped state. As a good design practice, we recommend that you clean up the resources that the adapter obtained before you call Stopped().
Important
Failure to call the Stopped() method causes the last page of memory that is associated with the query to remain allocated. This can cause small memory leaks that can accumulate over time if there are many query start-and-stop cycles in a process.
In the Stopped state, the adapter cannot refer to any StreamInsight server–specific constructs or event memory, or perform enqueue or dequeue operations. Such actions will raise an exception. However, operating-system and device-facing cleanup activities can continue.
Examples
For examples of a variety of input and output adapters and adapter factories, see the samples available at StreamInsight Samples.