Condividi tramite


Orleans avvio rapido per lo streaming

Questa guida illustra un modo rapido per configurare e usare Stream Orleans. Per altre informazioni sui dettagli delle funzionalità di streaming, leggere altre parti di questa documentazione.

Configurazioni necessarie

In questa guida si userà un flusso basato sulla memoria che usa la messaggistica granulare per inviare dati di flusso ai sottoscrittori. Si userà il provider di archiviazione in memoria per archiviare elenchi di sottoscrizioni. L'uso di meccanismi basati sulla memoria per lo streaming e l'archiviazione è destinato solo allo sviluppo e ai test locali e non è destinato agli ambienti di produzione.

Nel silo, dove silo è un ISiloBuilder, chiamare AddMemoryStreams:

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

Nel client del cluster, dove client è IClientBuilder, chiamare AddMemoryStreams.

client.AddMemoryStreams("StreamProvider");

In questa guida si userà un semplice flusso basato su messaggi che usa la messaggistica granulare per inviare dati di flusso ai sottoscrittori. Si userà il provider di archiviazione in memoria per archiviare elenchi di sottoscrizioni, scelta quindi non propriamente saggia per le applicazioni destinate alla produzione reale.

Nel silo, dove hostBuilder è un ISiloHostBuilder, chiamare AddSimpleMessageStreamProvider:

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

Nel client del cluster, dove clientBuilder è IClientBuilder, chiamare AddSimpleMessageStreamProvider.

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Nota

Per impostazione predefinita, i messaggi trasmessi sul flusso di messaggi semplici sono considerati non modificabili e possono essere trasmessi per riferimento ad altri grani. Per disattivare questo comportamento, è necessario configurare il provider SMS per disattivare SimpleMessageStreamProviderOptions.OptimizeForImmutableData

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

È possibile creare flussi, inviare dati usandoli come producer e ricevere dati anche come sottoscrittori.

Generare eventi

È relativamente facile produrre eventi per i flussi. È prima necessario ottenere l'accesso al provider di flusso definito nella configurazione precedente ("StreamProvider"), quindi scegliere un flusso e eseguirne il push.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

È relativamente facile produrre eventi per i flussi. È prima necessario ottenere l'accesso al provider di flusso definito nella configurazione precedente ("SMSProvider"), quindi scegliere un flusso e eseguirne il push.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

Come si può notare, il flusso è dotato di un GUID e di uno spazio dei nomi. In questo modo sarà facile identificare flussi univoci. Ad esempio, lo spazio dei nomi di una chat room può essere "Rooms" e il GUID può essere il GUID del RoomGrain proprietario.

In questo caso viene utilizzato il GUID di alcune note chat room. Usando il metodo OnNextAsync del flusso è possibile eseguire il push dei dati. Facciamolo all'interno di un timer, usando numeri casuali. Per il flusso, è possibile usare anche qualsiasi altro tipo di dati.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

Sottoscrivere e ricevere dati di streaming

Per la ricezione di dati, è possibile usare sottoscrizioni implicite ed esplicite, descritte in modo più dettagliato in Sottoscrizioni esplicite e implicite. In questo esempio vengono usate sottoscrizioni implicite, che sono più semplici. Quando un tipo di granularità vuole sottoscrivere in modo implicito un flusso, usa l'attributo [ImplicitStreamSubscription(namespace)].

Per il proprio caso, definire un ReceiverGrain simile al seguente:

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

Ogni volta che i dati vengono inseriti nei flussi dello spazio dei nomi RANDOMDATA, come nel timer, un granulare di tipo ReceiverGrain con lo stesso Guid flusso riceverà il messaggio. Anche se non esistono attivazioni del livello di granularità, il runtime ne creerà automaticamente uno nuovo e invierà il messaggio.

Per il funzionamento, è necessario completare il processo di sottoscrizione impostando il metodo OnNextAsync per la ricezione dei dati. A tale scopo, il ReceiverGrain dovrebbe chiamare qualcosa di simile nel suo OnActivateAsync

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

Accesso completato. Ora l'unico requisito è che qualcosa attivi la creazione dell’intervallo di tempo del produttore e quindi registrerà il timer e inizierà a inviare int casuali a tutte le parti interessate.

Anche in questo caso, la presente guida non entra molto in dettaglio ed è utile solo per dare una rappresentazione del quadro generale. Leggere le altre parti di questo manuale e le altre risorse su RX per capire bene cosa sia disponibile e come accedervi.

La programmazione reattiva può essere un approccio potentissimo per risolvere molti problemi. È ad esempio possibile usare LINQ nel sottoscrittore per filtrare i numeri e fare ogni sorta di cose interessanti.

Vedi anche

Orleans API di programmazione dei flussi