Compartilhar via


Início rápido de fluxo do Orleans

Este guia mostra uma maneira rápida de configurar e usar Orleans o Streams. Para saber mais sobre os detalhes dos recursos de streaming, leia outras partes desta documentação.

Configurações necessárias

Neste guia, você usará um fluxo baseado em memória que usa mensagens granuladas para enviar dados de fluxo aos assinantes. Você usa o provedor de armazenamento na memória para armazenar listas de assinaturas. O uso de mecanismos baseados em memória para streaming e armazenamento destina-se apenas ao desenvolvimento e teste locais, não para ambientes de produção.

No silo, em que silo é uma ISiloBuilder, chame AddMemoryStreams:

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

No cliente do cluster, em que client é um IClientBuilder, chame AddMemoryStreams.

client.AddMemoryStreams("StreamProvider");

Neste guia, use um fluxo simples baseado em mensagens que usa mensagens granuladas para enviar dados de fluxo aos assinantes. Use o provedor de armazenamento na memória para armazenar listas de assinaturas; essa não é uma escolha sábia para aplicativos de produção reais.

No silo, em que hostBuilder é uma ISiloHostBuilder, chame AddSimpleMessageStreamProvider:

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

No cliente do cluster, em que clientBuilder é um IClientBuilder, chame AddSimpleMessageStreamProvider.

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Observação

Por padrão, as mensagens passadas sobre o Fluxo de Mensagens Simples são consideradas imutáveis e podem ser passadas por referência a outros grãos. Para desativar esse comportamento, configure o provedor de SMS para desativar SimpleMessageStreamProviderOptions.OptimizeForImmutableData.

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

Você pode criar fluxos, enviar dados usando-os como produtores e receber dados como assinantes.

Produzir eventos

É relativamente fácil produzir eventos para fluxos. Primeiro, obtenha acesso ao provedor de fluxo definido na configuração anteriormente ("StreamProvider"), depois escolha um fluxo e envie dados por push para ele.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

É relativamente fácil produzir eventos para fluxos. Primeiro, obtenha acesso ao provedor de fluxo definido na configuração anteriormente ("SMSProvider"), depois escolha um fluxo e envie dados por push para ele.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

Como você pode ver, o fluxo tem um GUID e um namespace. Isso facilita a identificação de fluxos exclusivos. Por exemplo, o namespace de uma sala de chat pode ser "Salas", e o GUID pode ser o RoomGrainGUID proprietário.

Aqui, use o GUID de uma sala de chat conhecida. Usando o método do fluxo OnNextAsync, envie dados para ele. Vamos fazer isso dentro de um temporizador usando números aleatórios. Também é possível usar qualquer outro tipo de dados para o fluxo.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

Assinar e receber dados de transmissão

Para receber dados, você pode usar assinaturas implícitas e explícitas, descritas com mais detalhes em assinaturas explícitas e implícitas. Esse exemplo usa assinaturas implícitas, que são mais fáceis. Quando um tipo de grão deseja assinar implicitamente um fluxo, ele usa o atributo [ImplicitStreamSubscription(namespace)].

No seu caso, defina uma ReceiverGrain desta maneira:

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

Sempre que os dados são enviados por push para fluxos no RANDOMDATA namespace (como no exemplo do temporizador), um grão de tipo ReceiverGrain com o mesmo Guid que o fluxo recebe a mensagem. Mesmo que nenhuma ativação do grão exista no momento, o runtime cria automaticamente uma nova e envia a mensagem para ela.

Para que isso funcione, conclua o processo de assinatura definindo o OnNextAsync método para receber dados. Para fazer isso, o ReceiverGrain deve chamar algo assim em seu OnActivateAsync:

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

Você está pronto! Agora, o único requisito é que algo dispare a criação do grão de produção. Em seguida, ele registra o temporizador e começa a enviar inteiros aleatórios para todas as partes interessadas.

Novamente, este guia ignora muitos detalhes e fornece apenas uma visão geral de alto nível. Leia outras partes deste manual e outros recursos no Rx para obter uma boa compreensão do que está disponível e como ele funciona.

A programação reativa pode ser uma abordagem poderosa para resolver muitos problemas. Por exemplo, você pode usar LINQ no assinante para filtrar números e executar várias operações interessantes.

Consulte também

Orleans APIs de Programação de Fluxos