Notes
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Ce guide vous montre une façon rapide de configurer et d’utiliser Orleans Streams. Pour en savoir plus sur les détails des fonctionnalités de diffusion en continu, lisez d’autres parties de cette documentation.
Configurations requises
Dans ce guide, vous utilisez un flux en mémoire qui recourt à la messagerie basée sur le grain pour transmettre les données de streaming aux abonnés. Vous utilisez le fournisseur de stockage en mémoire pour stocker des listes d’abonnements. L’utilisation de mécanismes basés sur la mémoire pour le streaming et le stockage est destinée uniquement au développement et aux tests locaux, et non pour les environnements de production.
Sur le silo, là où silo
est un ISiloBuilder, appelez AddMemoryStreams :
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
Sur le client de cluster, où client
est un IClientBuilder, appel AddMemoryStreams.
client.AddMemoryStreams("StreamProvider");
Dans ce guide, utilisez un flux simple basé sur des messages, utilisant la messagerie de grain pour envoyer des données de flux aux abonnés. Utilisez le fournisseur de stockage en mémoire pour stocker des listes d’abonnements ; ce n’est pas un choix judicieux pour les applications de production réelles.
Sur le silo, là où hostBuilder
est un ISiloHostBuilder
, appelez AddSimpleMessageStreamProvider :
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
Sur le client de cluster, où clientBuilder
est un IClientBuilder
, appel AddSimpleMessageStreamProvider.
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
Remarque
Par défaut, les messages transmis sur le flux de messages simples sont considérés comme immuables et peuvent être passés par référence à d’autres grains. Pour désactiver ce comportement, configurez le fournisseur SMS pour désactiver SimpleMessageStreamProviderOptions.OptimizeForImmutableData.
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
Vous pouvez créer des flux, envoyer des données en tant que producteurs et recevoir des données en tant qu’abonnés.
Produire des événements
Il est relativement facile de produire des événements pour les flux. Tout d’abord, accédez au fournisseur de flux défini dans la configuration précédemment ("StreamProvider"
), puis choisissez un flux et envoyez-y des données.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
Il est relativement facile de produire des événements pour les flux. Tout d’abord, accédez au fournisseur de flux défini dans la configuration précédemment ("SMSProvider"
), puis choisissez un flux et envoyez-y des données.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
Comme vous pouvez le voir, le flux a un GUID et un espace de noms. Cela facilite l’identification des flux uniques. Par exemple, l’espace de noms d’une salle de conversation peut être « Salles » et le GUID peut être le GUID du propriétaire RoomGrain
.
Ici, utilisez le GUID d’une salle de conversation connue. À l’aide de la OnNextAsync
méthode du flux, envoyez des données à celui-ci. Procédons ainsi à l’intérieur d’un minuteur à l’aide de nombres aléatoires. Vous pouvez également utiliser n’importe quel autre type de données pour le flux.
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
S’abonner aux données de diffusion en continu et les recevoir
Pour recevoir des données, vous pouvez utiliser des abonnements implicites et explicites, décrits plus en détail dans les abonnements explicites et implicites. Cet exemple utilise des abonnements implicites, qui sont plus faciles. Lorsqu’un type de grain souhaite s’abonner implicitement à un flux, il utilise l’attribut [ImplicitStreamSubscription(namespace)]
.
Pour votre cas, définissez un ReceiverGrain
exemple comme suit :
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
Chaque fois que les données sont envoyées aux flux dans l’espace de noms RANDOMDATA
(comme dans l’exemple du minuteur), un grain de type ReceiverGrain
ayant le même Guid
que le flux reçoit le message. Même si aucune activation du grain n’existe actuellement, le runtime en crée automatiquement un nouveau et lui envoie le message.
Pour que cela fonctionne, effectuez le processus d’abonnement en définissant la OnNextAsync
méthode de réception des données. Pour ce faire, le ReceiverGrain
doit appeler quelque chose comme ceci dans son OnActivateAsync
:
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
Vous êtes tous prêts ! Maintenant, la seule exigence est que quelque chose déclenche la création du grain du producteur. Ensuite, il inscrit le minuteur et commence à envoyer des entiers aléatoires à toutes les parties intéressées.
Là encore, ce guide ignore de nombreux détails et fournit uniquement une vue d’ensemble générale. Lisez d’autres parties de ce manuel et d’autres ressources sur Rx pour mieux comprendre ce qui est disponible et comment il fonctionne.
La programmation réactive peut être une approche puissante pour résoudre de nombreux problèmes. Par exemple, vous pouvez utiliser LINQ dans l’abonné pour filtrer les numéros et effectuer différentes opérations intéressantes.