Condividi tramite


Streaming con Orleans

Orleans v.1.0.0 ha aggiunto il supporto per le estensioni di streaming al modello di programmazione. Le estensioni di streaming offrono un set di astrazioni e API che rendono più semplice e affidabile l'uso di flussi. Le estensioni di streaming consentono agli sviluppatori di scrivere applicazioni reattive che operano su una sequenza di eventi in modo strutturato. Il modello di estendibilità dei provider di flusso rende il modello di programmazione compatibile con e portabile in un'ampia gamma di tecnologie di accodamento esistenti, ad esempio Hub eventi, ServiceBus, Code di Azure e Apache Kafka. Non è necessario scrivere codice speciale o eseguire processi dedicati per interagire con tali code.

Perché dovrei curarmi?

Se si conosce già tutte le informazioni sull'elaborazione di flusso e si ha familiarità con le tecnologie come Hub eventi, Kafka, Analisi di flusso di Azure, Apache Storm, Apache Spark Streaming e Estensioni reattive (Rx) in .NET, è possibile chiedere perché prestare attenzione. Perché è necessario un altro sistema di elaborazione di flusso e come gli attori sono correlati all'Flussi?"Perché Orleans Flussi?" è destinato a rispondere a tale domanda.

Modello di programmazione

Esistono diversi principi dietro a Orleans Flussi Modello di programmazione:

  1. I flussi di Orleans sono virtuali. Vale a dire, esiste sempre un flusso. Non viene creata o eliminata in modo esplicito e non può mai non riuscire.
  2. Flussi sono identificati dagli ID di flusso, che sono solo nomi logici costituiti da GUID e stringhe.
  3. Orleans Flussi consente di separare la generazione di dati dalla sua elaborazione, sia in tempo che nello spazio. Ciò significa che il produttore di flusso e il consumer di flusso possono trovarsi in server diversi o in fusi orari diversi e resistere agli errori.
  4. I flussi di Orleans sono leggeri e dinamici. Orleans Streaming Runtime è progettato per gestire un numero elevato di flussi che vengono e passano a una frequenza elevata.
  5. Le associazioni di flusso di Orleans sono dinamiche. Orleans Streaming Runtime è progettato per gestire i casi in cui i cereali si connettono e si disconnetteno dai flussi a una frequenza elevata.
  6. Orleans Streaming Runtime gestisce in modo trasparente il ciclo di vita del consumo di flusso. Dopo che un'applicazione sottoscrive un flusso, riceverà quindi gli eventi del flusso, anche in presenza di errori.
  7. I flussi di Orleans funzionano uniformemente tra i cereali e i client Orleans.

API di programmazione

Le applicazioni interagiscono con i flussi usando Orleans.Streams.IAsyncStream<T>, che implementa le Orleans.Streams.IAsyncObserver<T> interfacce e Orleans.Streams.IAsyncObservable<T> . Queste API sono simili alle note estensioni reattive (Rx) in .NET.

In un esempio tipico seguente, un dispositivo genera alcuni dati, che vengono inviati come richiesta HTTP al servizio in esecuzione nel cloud. Il client Orleans in esecuzione nel server front-end riceve questa chiamata HTTP e pubblica i dati in un flusso di dispositivi corrispondente:

public async Task OnHttpCall(DeviceEvent deviceEvent)
{
     // Post data directly into the device's stream.
     IStreamProvider streamProvider =
        GrainClient.GetStreamProvider("MyStreamProvider");

    IAsyncStream<DeviceEventData> deviceStream =
        streamProvider.GetStream<DeviceEventData>(
            deviceEvent.DeviceId, "MyNamespace");

     await deviceStream.OnNextAsync(deviceEvent.Data);
}

In un altro esempio seguente, un utente di chat (implementato come Orleans Grain) partecipa a una chat room, ottiene un handle per un flusso di messaggi di chat generati da tutti gli altri utenti in questa stanza e lo sottoscrive. Si noti che l'utente della chat non deve conoscere il granularità della chat room stessa (potrebbe non essere tale grana nel nostro sistema) o circa altri utenti di tale gruppo che producono messaggi. Non è necessario dire, per pubblicare nel flusso di chat, gli utenti non devono sapere chi è attualmente sottoscritto al flusso. Questo dimostra come gli utenti della chat possano essere completamente disaccoppiati nel tempo e nello spazio.

public class ChatUser: Grain
{
    public async Task JoinChat(Guid chatGroupId)
    {
        IStreamProvider streamProvider =
            base.GetStreamProvider("MyStreamProvider");

        IAsyncStream<string> chatStream =
            streamProvider.GetStream<string>(chatGroupId, "MyNamespace");

        await chatStream.SubscribeAsync(
            async (message, token) => Console.WriteLine(message))
    }
}

Esempio di avvio rapido

L'esempio di avvio rapido è una panoramica rapida del flusso di lavoro complessivo dell'uso dei flussi nell'applicazione. Dopo la lettura, è necessario leggere le API di programmazione Flussi per ottenere una comprensione più approfondita dei concetti.

API di programmazione Flussi

Un Flussi API di programmazione fornisce una descrizione dettagliata delle API di programmazione.

Provider di flusso

Flussi possono venire tramite canali fisici di varie forme e forme e possono avere semantiche diverse. Orleans Streaming è progettato per supportare questa diversità tramite il concetto di provider di flusso, che è un punto di estendibilità nel sistema. Orleans ha attualmente implementazioni di provider di due flussi: provider di flusso di messaggi semplice basato su TCP e provider di flusso di code di Azure basato su Code di Azure. Altri dettagli sui provider di flusso sono disponibili in Provider di flusso.

Semantica di flusso

Semantica della sottoscrizione di flusso:

Orleans Flussi garantire coerenza sequenziale per le operazioni di sottoscrizione di flusso. In particolare, quando un consumer sottoscrive un flusso, una volta Task risolta l'operazione di sottoscrizione, il consumer visualizzerà tutti gli eventi generati dopo la sottoscrizione. Inoltre, i flussi rewindable consentono di sottoscrivere da un punto arbitrario nel tempo passato usando StreamSequenceToken. Per altre informazioni, vedere Provider di flusso Di Orleans.

Garanzie di distribuzione di eventi di flusso individuali:

Le garanzie di recapito di eventi individuali dipendono da singoli provider di flusso. Alcuni forniscono solo un impegno ottimale per il recapito alla maggior parte delle volte (ad esempio Simple Message Flussi (SMS)), mentre altri forniscono un recapito almeno una volta (ad esempio La coda di Azure Flussi). È anche possibile creare un provider di streaming che garantisce esattamente una volta il recapito (non abbiamo ancora un provider di questo tipo, ma è possibile crearne uno).

Ordine di recapito eventi:

L'ordine di evento dipende anche da un determinato provider di flusso. In SMS flussi, il produttore controlla in modo esplicito l'ordine degli eventi visualizzati dal consumer controllando il modo in cui li pubblica. I flussi della coda di Azure non garantiscono l'ordine FIFO, poiché le code di Azure sottostanti non garantiscono l'ordine nei casi di errore. Le applicazioni possono anche controllare l'ordinamento di recapito del flusso usando StreamSequenceToken.

implementazione Flussi

L'implementazione di Orleans Flussi offre una panoramica generale dell'implementazione interna.

Esempi di codice

Altri esempi di come usare le API di streaming all'interno di una granularità sono disponibili qui. Si prevede di creare altri esempi in futuro.

Vedi anche