Partager via


Exemple de bout en bout de source et de récepteur d'événements énumérables (StreamInsight)

Ce scénario simple de bout en bout illustre l'utilisation d'une source d'événement et d'un récepteur d'événements qui implémentent l'interface IEnumerable afin de créer une application StreamInsight complète :

  1. Step 1 - Provide an observable or enumerable event source

  2. Step 2 - Transform the input to a stream and describe the temporal characteristics

  3. Step 3 - Write a query

  4. Step 4 - Convert the query output to an observable or enumerable stream

  5. Step 5 - Consume the output

Cet exemple utilise StreamInsight avec SQL Server et ADO.NET Entity Framework pour répondre à une requête temporelle sur des données historiques à partir de l'exemple de base de données Northwind. La requête recherche des intervalles de temps durant lesquels plus de 3 commandes ont été actives dans une région.

Cet exemple utilise une source d'événement IEnumerable. Les étapes pour l'utilisation d'une source d'événement qui implémente IObservable sont similaires. Toutefois, une sortie observable envoie les données (push) à l'observateur ; le consommateur n'est pas obligé d'extraire les données, comme il le fait lorsqu'il appelle foreach sur une source énumérable.

Étape 1 - Fournir une source d'événement observable ou énumérable

Tout d'abord, définissez la source de l'événement pour la requête à l'aide d'une requête LINQ to Entities sur la base de données Northwind. Le résultat (databaseQuery) implémente par défaut l'interface IEnumerable.

// Connect to the Northwind sample database on SQL Server. Use the default Entity Model
// generated by the Entity Framework for the Northwind sample database.
using (NorthwindEntities northwind = new NorthwindEntities())
{
    // Query all Orders where there is a known order date, ship date and ship region.
    var databaseQuery = from o in northwind.Orders
                        where o.OrderDate.HasValue && o.ShippedDate.HasValue && o.ShipRegion != null
                        orderby o.OrderDate.Value
                        select o;
}

Étape 2 - Transformer l'entrée en flux et décrire les caractéristiques temporelles de ce flux

Transformez ensuite le résultat de la requête en un flux d'événements intervalle :

// Transform the query results into a stream of interval events whose start and end 
// times are defined by the order and ship timestamps. Keep track of the shipping region.
var streamSource = databaseQuery
    .ToStream(application, AdvanceTimeSettings.IncreasingStartTime, 
        o => IntervalEvent.CreateInsert(
            o.OrderDate.Value,
            o.ShippedDate.Value, 
            new { o.ShipRegion }));

Ce code utilise une application auxiliaire sur la classe AdvanceTimeSettings - IncreasingStartTime pour insérer des événements CTI après chaque événement dont le delay est de 0 (zéro). Vous pouvez également utiliser StrictlyIncreasingStartTime pour spécifier un cycle d'un delay de -1 (plaçant ainsi le CTI un cycle après l'heure de début de l'événement), ou UnorderedTimestamps pour spécifier une valeur personnalisée pour le delay.

La méthode CreateInsert de la classe IntervalEvent convertit ensuite les données sources en un flux d'événements en fournissant l'heure de début et l'heure de fin de l'intervalle, respectivement OrderDate et Shipped Date, et la charge utile de l'événement, ShipRegion.

Une méthode CreateInsert analogue est fournie par la classe PointEvent et la classe EdgeEvent comporte les méthodes CreateStart et CreateEnd. Ces trois classes d'événements comportent une méthode CreateCti pour insérer les événements CTI de façon procédurale, et non par l'utilisation déclarative de AdvanceTimeSettings.

Étape 3 - Écrire une requête

Écrivez ensuite la requête StreamInsight prenant en charge les données temporelles adaptée aux événements du flux entrant :

// Find time intervals during which more than 3 orders are in process within a region.
var streamQuery = from o in streamSource
                  group o by o.ShipRegion into g
                  from window in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                  select new { OrderCount = window.Count(), ShipRegion = g.Key } into agg
                  where agg.OrderCount > 3
                  select agg;

Cette requête regroupe les événements de chaque intervalle de temps par région, puis sélectionne uniquement les intervalles dans lesquels plus de 3 événements ont été actifs. Les résultats sont projetés dans un nouveau flux avec une charge utile totalement différente qui inclut le nombre de commandes actives et l'identificateur de ShipRegion.

Pour plus d'informations sur l'écriture de modèles de requête, consultez :

Étape 4 - Convertir le résultat de la requête en un récepteur d'événements observables ou énumérables

Transformez ensuite le flux de sortie de la requête en un résultat énumérable :

// Convert temporal query results into an enumerable result of interval events. This example
// filters out CTI events, and projects the relevant portions of the interval event.
var results = from intervalEvent in streamQuery.ToIntervalEnumerable()
              where intervalEvent.EventKind != EventKind.CTI
              select new 
              { 
                  intervalEvent.StartTime, 
                  intervalEvent.EndTime, 
                  intervalEvent.Payload.OrderCount,
                  intervalEvent.Payload.ShipRegion,
              };

Cette requête filtre les événements CTI et projète uniquement les événements insérés dans un flux énumérable d'événements intervalle. Un nouveau type anonyme comportant 4 champs contient la charge utile des événements.

Outre la métode ToIntervalEnumerable, les méthodes d'extension associées incluent :

  • ToPointEnumerable et ToEdgeEnumerable

  • ToPointObservable, ToIntervalObservable et ToEdgeObservable

Ces méthodes retournent les interfaces ICepEnumerable ou ICepObservable qui étendent les interfaces de base IEnumerable et IObservable en fournissant le nom et la description de la requête à des fins de gestion et de débogage.

Les interfaces ICepEnumerable ou ICepObservable fournissent également des méthodes d'assistance qui filtrent les événements de sortie par le biais de la sélection (Where) ou de la projection (Select). Par exemple :

observableOutput = result
    .ToPointObservable()
    .Where( e => e.EventKind != EventKind.Cti)
    .Select(e => e.Payload);

Étape 5 - Consommer le résultat

Enfin, consommez le résultat de la requête. Notez qu'en raison du modèle d'évaluation différée du fournisseur LINQ standard, les requêtes ne sont évaluées que lorsque le consommateur commence à énumérer ou observer les résultats :

// Enumerating the results triggers the underlying SQL Server and StreamInsight queries.
foreach (var activeInterval in results)
{
    Console.WriteLine("Between {0} and {1}, {2} orders were active in the '{3}' region.", 
        activeInterval.StartTime, 
        activeInterval.EndTime, 
        activeInterval.OrderCount,
        activeInterval.ShipRegion);
}