Compartilhar via


Exemplo completo de uma origem de evento enumerável e de um coletor de eventos (StreamInsight)

Este ponta a ponta simples demonstra o uso de uma origem de eventos e um coletor de eventos que implementam a interface IEnumerable para criar um aplicativo StreamInsight completo:

  1. Step 1 - Provide an observable or enumerable event source

  2. Step 2 - Transform the input to a stream and describe the temporal characteristics

  3. Step 3 - Write a query

  4. Step 4 - Convert the query output to an observable or enumerable stream

  5. Step 5 - Consume the output

Este exemplo usa o StreamInsight em combinação com o SQL Server e o ADO.NET Entity Framework para responder a uma consulta relacionada a tempo em dados históricos do banco de dados de exemplo do Northwind. A consulta encontra intervalos de tempo durante os quais mais de 3 ordens estiveram ativas dentro de uma região.

Este exemplo usa uma origem de evento IEnumerable. As etapas para usar uma origem de evento que implementa IObservable são semelhantes. No entanto, uma saída observável envia dados ao observador: o consumidor não tem que receber os dados, como faz quando chama foreach em uma origem enumerável.

Etapa 1 - Fornecer uma origem de evento observável ou enumerável

Primeiro, defina os dados de origem para a consulta emitindo um LINQ para consulta de Entidades no banco de dados Northwind. O resultado (databaseQuery) por padrão implementa a interface IEnumerable.

// Connect to the Northwind sample database on SQL Server. Use the default Entity Model
// generated by the Entity Framework for the Northwind sample database.
using (NorthwindEntities northwind = new NorthwindEntities())
{
    // Query all Orders where there is a known order date, ship date and ship region.
    var databaseQuery = from o in northwind.Orders
                        where o.OrderDate.HasValue && o.ShippedDate.HasValue && o.ShipRegion != null
                        orderby o.OrderDate.Value
                        select o;
}

Etapa 2 - Transformar a entrada em um fluxo e descrever as características temporais de um fluxo

Em seguida, transformar o resultado da consulta em um fluxo de eventos de intervalo:

// Transform the query results into a stream of interval events whose start and end 
// times are defined by the order and ship timestamps. Keep track of the shipping region.
var streamSource = databaseQuery
    .ToStream(application, AdvanceTimeSettings.IncreasingStartTime, 
        o => IntervalEvent.CreateInsert(
            o.OrderDate.Value,
            o.ShippedDate.Value, 
            new { o.ShipRegion }));

Este código usa um auxiliar na classe AdvanceTimeSettings - IncreasingStartTime - para inserir eventos CTI depois de cada evento com um delay de 0 (zero). Como alternativa, você pode usar StrictlyIncreasingStartTime para especificar um delay de tique -1 (colocando a CTI um tique após o horário de início do evento), ou UnorderedTimestamps para especificar um valor personalizado para o delay.

Em seguida, o método CreateInsert da classe IntervalEvent converte os dados de origem em um fluxo de eventos fornecendo OrderDate como a hora de início, a Data de Envio como a hora de término do intervalo e ShipRegion como a carga do evento.

Um método CreateInsert análogo é fornecido pela classe PointEvent, enquanto a classe EdgeEvent tem os métodos CreateStart e CreateEnd. Todas as 3 classes de eventos têm um método CreateCti para inserir eventos CTI de maneira procedural, em vez de através do uso declarativo de AdvanceTimeSettings.

Etapa 3 - Escrever uma consulta

Em seguida, grave a consulta StreamInsight com reconhecimento de tempo que é apropriada para os fluxos de eventos de entrada:

// Find time intervals during which more than 3 orders are in process within a region.
var streamQuery = from o in streamSource
                  group o by o.ShipRegion into g
                  from window in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                  select new { OrderCount = window.Count(), ShipRegion = g.Key } into agg
                  where agg.OrderCount > 3
                  select agg;

Essa consulta agrupa os eventos em cada intervalo de tempo por região e seleciona somente os intervalos em que mais de 3 eventos estiverem ativos. Ela projeta os resultados em um novo fluxo com uma carga completamente diferente que inclui a contagem de ordens ativas e o identificador para a ShipRegion.

Para obter mais informações sobre como gravar modelos, consulte:

Etapa 4 - Converter a saída da consulta em um coletor de eventos observável e enumerável

Em seguida, transformar o fluxo de saída de uma consulta em um resultado enumerável:

// Convert temporal query results into an enumerable result of interval events. This example
// filters out CTI events, and projects the relevant portions of the interval event.
var results = from intervalEvent in streamQuery.ToIntervalEnumerable()
              where intervalEvent.EventKind != EventKind.CTI
              select new 
              { 
                  intervalEvent.StartTime, 
                  intervalEvent.EndTime, 
                  intervalEvent.Payload.OrderCount,
                  intervalEvent.Payload.ShipRegion,
              };

Esta consulta filtra os eventos CTI e projeta somente os eventos de inserção em um fluxo enumerável de eventos de intervalo. Um novo tipo anônimo com 4 campos contém a carga dos eventos.

Além do método ToIntervalEnumerable, os métodos de extensão relacionados incluem:

  • ToPointEnumerable e ToEdgeEnumerable

  • ToPointObservable, ToIntervalObservable e ToEdgeObservable

Estes métodos retornam interfaces ICepEnumerable ou ICepObservable que estendem as interfaces base IEnumerable e IObservable fornecendo um nome e uma descrição da consulta para identificá-la para fins de gerenciamento e depuração.

As interfaces ICepEnumerable ou ICepObservable também fornecem métodos auxiliares que filtram os eventos de saída através de seleção (Where) ou projeção (Select). Por exemplo:

observableOutput = result
    .ToPointObservable()
    .Where( e => e.EventKind != EventKind.Cti)
    .Select(e => e.Payload);

Etapa 5 - Consumir a saída

Por fim, consuma os resultados da consulta. Observe que, devido ao modelo de avaliação adiada do provedor LINQ típico, as consultas não são avaliadas até que o consumidor comece a enumerar ou observar os resultados:

// Enumerating the results triggers the underlying SQL Server and StreamInsight queries.
foreach (var activeInterval in results)
{
    Console.WriteLine("Between {0} and {1}, {2} orders were active in the '{3}' region.", 
        activeInterval.StartTime, 
        activeInterval.EndTime, 
        activeInterval.OrderCount,
        activeInterval.ShipRegion);
}