Compartir a través de


Ejemplo completo de un origen y receptor del evento enumerable (StreamInsight)

Este ejemplo completo simple muestra el uso de un origen y un receptor del evento que implementan la interfaz IEnumerable para crear una aplicación StreamInsight completa:

  1. Step 1 - Provide an observable or enumerable event source

  2. Step 2 - Transform the input to a stream and describe the temporal characteristics

  3. Step 3 - Write a query

  4. Step 4 - Convert the query output to an observable or enumerable stream

  5. Step 5 - Consume the output

En este ejemplo se usa StreamInsight en combinación con SQL Server y ADO.NET Entity Framework para responder a una consulta relacionada con el tiempo sobre los datos históricos de la base de datos de ejemplo Northwind. La consulta busca intervalos de tiempo en que hubo más de tres pedidos activos en una región.

Este ejemplo usa un origen del evento IEnumerable. Los pasos para usar un origen del evento que implementa IObservable son similares. Sin embargo, una salida observable inserta datos en el observador; el consumidor no tiene que extraer los datos, como sucede cuando se llama a foreach en un origen enumerable.

Paso 1: proporcionar un origen del evento observable o enumerable

En primer lugar, defina los datos de origen para la consulta emitiendo una consulta LINQ to Entities en la base de datos Northwind. De forma predeterminada, el resultado (databaseQuery) implementa la interfaz IEnumerable.

// Connect to the Northwind sample database on SQL Server. Use the default Entity Model
// generated by the Entity Framework for the Northwind sample database.
using (NorthwindEntities northwind = new NorthwindEntities())
{
    // Query all Orders where there is a known order date, ship date and ship region.
    var databaseQuery = from o in northwind.Orders
                        where o.OrderDate.HasValue && o.ShippedDate.HasValue && o.ShipRegion != null
                        orderby o.OrderDate.Value
                        select o;
}

Paso 2: transformar la entrada en un flujo y describir las características temporales del flujo

A continuación, transforme el resultado de la consulta en un flujo de eventos de intervalo:

// Transform the query results into a stream of interval events whose start and end 
// times are defined by the order and ship timestamps. Keep track of the shipping region.
var streamSource = databaseQuery
    .ToStream(application, AdvanceTimeSettings.IncreasingStartTime, 
        o => IntervalEvent.CreateInsert(
            o.OrderDate.Value,
            o.ShippedDate.Value, 
            new { o.ShipRegion }));

Este código usa un elemento auxiliar en la clase AdvanceTimeSettings, IncreasingStartTime, para insertar eventos CTI después de cada evento con un valor de delay igual a 0 (cero). También se puede usar StrictlyIncreasingStartTime para especificar un valor de delay igual a -1 tic (este modo el CTI se coloca un tic después de la hora de inicio del evento), o UnorderedTimestamps para especificar un valor personalizado para delay.

Posteriormente, el método CreateInsert de la clase IntervalEvent convierte los datos de origen en un flujo de eventos proporcionando OrderDate como hora de inicio, ShippedDate como hora de finalización del intervalo y ShipRegion como carga del evento.

La clase PointEvent proporciona un método CreateInsert análogo, mientras que la clase EdgeEvent tiene los métodos CreateStart y CreateEnd. Las tres clases de eventos tienen un método CreateCti para insertar eventos CTI mediante procedimientos, en lugar de mediante el uso declarativo de AdvanceTimeSettings.

Paso 3: escribir una consulta

A continuación, escriba la consulta de StreamInsight relacionada con el tiempo que resulte apropiada para el flujo de entrada de eventos:

// Find time intervals during which more than 3 orders are in process within a region.
var streamQuery = from o in streamSource
                  group o by o.ShipRegion into g
                  from window in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                  select new { OrderCount = window.Count(), ShipRegion = g.Key } into agg
                  where agg.OrderCount > 3
                  select agg;

Esta consulta agrupa por región los eventos en cada intervalo de tiempo y, a continuación, selecciona solo los intervalos en que hubo más de tres eventos activos. Proyecta los resultados en un nuevo flujo con una carga totalmente diferente que incluye el número de pedidos activos y el identificador de ShipRegion.

Para obtener más información sobre cómo escribir plantillas de consulta, vea:

Paso 4: convertir la salida de la consulta en un receptor de eventos observable o enumerable

A continuación, transforme el flujo de salida de la consulta en un resultado enumerable:

// Convert temporal query results into an enumerable result of interval events. This example
// filters out CTI events, and projects the relevant portions of the interval event.
var results = from intervalEvent in streamQuery.ToIntervalEnumerable()
              where intervalEvent.EventKind != EventKind.CTI
              select new 
              { 
                  intervalEvent.StartTime, 
                  intervalEvent.EndTime, 
                  intervalEvent.Payload.OrderCount,
                  intervalEvent.Payload.ShipRegion,
              };

Esta consulta filtra los eventos CTI y proyecta solo los eventos de inserción en un flujo enumerable de eventos de intervalo. Un nuevo tipo anónimo con cuatro campos contiene la carga de los eventos.

Además del método ToIntervalEnumerable, entre los métodos de extensión relacionados se incluyen:

  • ToPointEnumerable y ToEdgeEnumerable

  • ToPointObservable, ToIntervalObservable y ToEdgeObservable

Estos métodos devuelven las interfaces ICepEnumerable o ICepObservable que extienden las interfaces IEnumerable e IObservable base proporcionando un nombre de consulta y una descripción de la misma para identificarla a efectos de administración y depuración.

Las interfaces ICepEnumerable o ICepObservable también proporcionan métodos auxiliares que filtran los eventos de salida mediante un proceso de selección (Where) o de proyección (Select). Por ejemplo:

observableOutput = result
    .ToPointObservable()
    .Where( e => e.EventKind != EventKind.Cti)
    .Select(e => e.Payload);

Paso 5: consumir la salida

Finalmente, consuma los resultados de la consulta. Tenga en cuenta que debido al modelo de evaluación diferida del proveedor LINQ habitual, las consultas no se evalúan hasta que el consumidor empieza a enumerar u observar los resultados:

// Enumerating the results triggers the underlying SQL Server and StreamInsight queries.
foreach (var activeInterval in results)
{
    Console.WriteLine("Between {0} and {1}, {2} orders were active in the '{3}' region.", 
        activeInterval.StartTime, 
        activeInterval.EndTime, 
        activeInterval.OrderCount,
        activeInterval.ShipRegion);
}