Condividi tramite


Esempio end-to-end di StreamInsight

In questo argomento verranno descritti i diversi componenti e passaggi coinvolti nella creazione di un'applicazione di StreamInsight ed è incluso un esempio end-to-end di un'applicazione. Un'applicazione di StreamInsight combina origini evento, sink di evento e query per implementare uno scenario di elaborazione di eventi complessi. L'API di StreamInsight offre diverse interfacce per supportare vari livelli di controllo e complessità nella creazione e nella gestione di applicazioni di elaborazione di eventi. 

La più piccola unità di distribuzione di un'applicazione è una query, che può essere avviata e arrestata. Nella figura seguente viene illustrata una modalità di compilazione di una query. L'origine evento è rappresentata da un adattatore di input. L'adattatore inserisce un flusso di eventi nell'albero dell'operatore, che rappresenta la logica di query desiderata, specificato dal progettista come modello di query. Il flusso di eventi elaborato comporta quindi la generazione di un sink di evento, in genere un adattatore di output.

Query con adattatori di input e di output

Per gli sviluppatori che non hanno familiarità con la terminologia relativa all'elaborazione di eventi complessi, è consigliabile leggere Concetti relativi al server StreamInsight e Architettura del server StreamInsight.

Processo dell'applicazione

In questa sezione vengono descritti i passaggi che costituiscono l'esperienza tipica di creazione di un'applicazione end-to-end.

Creare un'istanza di un'istanza del server e di un'applicazione

Il processo inizia con la creazione dell'istanza di un'istanza del server StreamInsight e di un'applicazione.

server = Server.Create(”MyInstance”);
Application myApp = server.CreateApplication("MyApp");

È necessario creare un server con un nome di istanza registrato nel computer tramite il processo di installazione di StreamInsight (nell'esempio precedente, MyInstance). Per ulteriori informazioni, vedere Installazione (StreamInsight).

Un'applicazione rappresenta un'unità di definizione dell'ambito nel server che contiene altre entità di metadati.

Nell'esempio precedente è stata creata un'istanza del server nello stesso processo. Un altro tipo di distribuzione comune prevede tuttavia la connessione a un server remoto e l'utilizzo di un'applicazione esistente da tale posizione. Nell'esempio seguente viene illustrato come connettersi a un server remoto e accedere a un'applicazione esistente.

server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/StreamInsight/MyInstance"));
Application myApp = server.Applications["ExistingApp"];

Per ulteriori informazioni sui server locali e remoti, vedere Pubblicazione e connessione al server StreamInsight.

Creare un flusso di input

Viene quindi creato un flusso di input in un'implementazione di un adattatore esistente. Più precisamente, è necessario specificare il factory di adattatori come illustrato nell'esempio seguente.

var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

In questo modo, viene creato un oggetto CepStream, che rappresenta un flusso di eventi prodotto, dopo l'avvio della query, da un adattatore di cui è stata creata un'istanza tramite la classe factory specificata. Al flusso viene assegnato un nome che può essere utilizzato in seguito per recuperare i dati diagnostici specifici del flusso. Viene inoltre fornita un'istanza della struttura di configurazione per il factory di adattatori. La struttura di configurazione passa informazioni specifiche di runtime al factory, insieme alla forma di evento desiderata (modello di evento). Per ulteriori informazioni sull'utilizzo dei parametri da parte del factory, vedere Creazione di adattatori di input e di output.

Definire la query

L'oggetto CepStream viene utilizzato come base per la definizione della logica di query effettiva. La query utilizza LINQ come linguaggio di specifica di query:

var filtered = from e in inputstream
               where e.Value > 95
               select e;

In questo esempio si presuppone che la classe o lo struct denominato MyDataType definito nell'esempio precedente per creare l'oggetto flusso di input contenga un campo denominato Value. Questa definizione produce un operatore di filtro che rimuove dal flusso tutti gli eventi che non soddisfano il predicato di filtro where e.Value > 95. Per ulteriori informazioni sugli operatori di query LINQ, vedere Scrittura di modelli di query in LINQ.

Creare un adattatore di output

A questo punto, il tipo di variabile filtered è ancora CepStream. In questo modo, il flusso può essere trasformato in una query che è possibile avviare. Per produrre un'istanza di query che possa essere avviata, è necessario specificare un adattatore di output, come illustrato nell'esempio seguente.

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

Analogamente al flusso di input, l'adattatore di output richiede la specifica di un factory di adattatori di output, un oggetto configurazione, la forma del flusso di output desiderata e un ordine temporale.

La specifica della forma di evento garantisce la relativa forma di evento nel risultato della query:

  1. EventShape.Point: la durata di qualsiasi evento risultato viene ridotta a un evento punto.

  2. EventShape.Interval: qualsiasi evento risultato viene interpretato come evento intervallo. Questo significa che l'output viene restituito solo se viene effettuato il commit della durata completa da parte di un evento CTI (Current Time Increment).

  3. EventShape.Edge: qualsiasi evento risultato viene interpretato come evento Edge. Questo significa che l'ora di inizio viene restituita come Edge iniziale e l'ora di fine come il corrispondente Edge finale.

Il parametro dell'ordine degli eventi del flusso influisce sulla dinamicità dei flussi di output dell'evento intervallo. FullyOrdered indica dire che gli eventi intervallo vengono restituiti sempre in base all'ordine delle ore di inizio, mentre ChainOrdered produce una sequenza di output ordinata in base alle ore di fine dell'intervallo.

È inoltre necessario fornire un oggetto applicazione come primo parametro, che a questo punto contiene la query, e un nome e una descrizione della query, per identificare ulteriormente la query nell'archivio di metadati.

Avviare la query

L'ultimo passaggio consiste nell'avviare la query. In questo esempio la query viene arrestata dalla pressione di un tasto da parte dell'utente.

query.Start();

Console.ReadLine();

query.Stop();

In questo esempio end-to-end viene illustrato come utilizzare un'associazione implicita di un'origine evento con un modello di query tramite gli overload CepStream.Create() e ToQuery() per creare in modo rapido una query funzionante. Per un controllo più esplicito sull'associazione di oggetti CEP, vedere Utilizzo dello strumento di associazione di query.

Esempio completo

Nell'esempio seguente vengono combinati i componenti descritti in precedenza per creare un'applicazione completa.

Server server = null;

using (Server server = Server.Create(”MyInstance”))
{
    try
    {
        Application myApp = server.CreateApplication("MyApp");

        var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                       typeof(MyInputAdapterFactory),
                                                       new InputAdapterConfig { someFlag = true },
                                                       EventShape.Point);

        var filtered = from e in inputstream
                       where e.Value > 95
                       select e;

        var query = filtered.ToQuery(myApp,
                                     "filterQuery",
                                     "Filter out Values over 95",
                                     typeof(MyOutputAdapterFactory),
                                     new OutputAdapterConfig { someString = "foo" },
                                     EventShape.Point,
                                     StreamEventOrder.FullyOrdered);

        query.Start();
        Console.ReadLine();
        query.Stop();
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

Vedere anche

Concetti

Utilizzo dello strumento di associazione di query