Aracılığıyla paylaş


Orleans akışa hızlı başlangıç

Bu kılavuzda Orleans Streams'i ayarlamanın ve kullanmanın hızlı bir yolu gösterilmektedir. Akış özelliklerinin ayrıntıları hakkında daha fazla bilgi edinmek için bu belgelerin diğer bölümlerini okuyun.

Gerekli yapılandırmalar

Bu kılavuzda, abonelere akış verilerini göndermek için tanecik mesajlaşması kullanan, belleğe dayalı bir akış kullanacaksınız. Abonelik listelerini depolamak için bellek içi depolama sağlayıcısını kullanırsınız. Akış ve depolama için bellek tabanlı mekanizmaların kullanılması, üretim ortamları için değil yalnızca yerel geliştirme ve test amaçlıdır.

Orleans akış, Microsoft.Orleans.Streaming NuGet paketini gerektirir. Bu paket, bu kılavuzda kullanılan uzantı yöntemi de dahil olmak üzere AddMemoryStreams hem istemci hem de sunucu için akış işlevselliği sağlar.

Siloda, silo bir ISiloBuilder olan ISiloBuilder, 'yi çağırın.

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

Küme istemcisinde, burada client bir IClientBuilder olduğunda, AddMemoryStreams çağır.

client.AddMemoryStreams("StreamProvider");

Bu kılavuzda, abonelere akış verileri göndermek için grain messaging kullanan basit bir ileti tabanlı akış kullanın. Abonelik listelerini depolamak için bellek içi depolama sağlayıcısını kullanın; bu, gerçek üretim uygulamaları için akıllıca bir seçim değildir.

Siloda, silo bir ISiloBuilder olan ISiloBuilder, 'yi çağırın.

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

Küme istemcisinde, burada clientBuilder bir IClientBuilder olduğunda, AddSimpleMessageStreamProvider çağır.

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Uyarı

Varsayılan olarak, Basit İleti Akışı üzerinden geçirilen iletiler değişmez kabul edilir ve diğer birimlere başvuru ile aktarılabilir. Bu davranışı kapatmak için, SMS sağlayıcısını SimpleMessageStreamProviderOptions.OptimizeForImmutableData özelliğini kapatacak şekilde yapılandırın.

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

Akışlar oluşturabilir, bunları üretici olarak kullanarak veri gönderebilir ve verileri abone olarak alabilirsiniz.

Olay oluşturma

Akışlar için etkinlik oluşturmak oldukça kolaydır. İlk olarak, daha önce"StreamProvider" () yapılandırmada tanımlanan akış sağlayıcısına erişim elde edin, ardından bir akış seçin ve ona veri gönderin.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

Akışlar için etkinlik oluşturmak oldukça kolaydır. İlk olarak, daha önce"SMSProvider" () yapılandırmada tanımlanan akış sağlayıcısına erişim elde edin, ardından bir akış seçin ve ona veri gönderin.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

Gördüğünüz gibi, akışın bir GUID'i ve bir ad alanı vardır. Bu, benzersiz akışları tanımlamayı kolaylaştırır. Örneğin, bir sohbet odasının ad alanı "Odalar" ve GUID ise RoomGrain'nin sahibi olduğu GUID olabilir.

Burada, bilinen bir sohbet odasının GUID'sini kullanın. Akışın OnNextAsync yöntemini kullanarak ona veri gönderin. Şimdi bunu rastgele sayılar kullanarak bir süreölçer içinde yapalım. Akış için başka herhangi bir veri türünü de kullanabilirsiniz.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

Akış verilerine abone olma ve veri alma

Verileri almak için, Açık ve örtük abonelikler bölümünde daha ayrıntılı olarak açıklanan örtük ve açık abonelikleri kullanabilirsiniz. Bu örnekte daha kolay olan örtük abonelikler kullanılır. Bir tanecik türü örtük olarak bir akışa abone olmak istediğinde özniteliğini [ImplicitStreamSubscription(namespace)]kullanır.

Sizin durumunuz için şunun gibi bir ReceiverGrain tanımlayın:

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

Veriler RANDOMDATA ad alanındaki akışlara gönderildiğinde (zamanlayıcı örneğinde olduğu gibi), akışla aynı ReceiverGrain türündeki Guid mesajı alır. Tahıl aktivasyonları mevcut olmasa bile, çalışma süresi otomatik olarak yeni bir tanesini oluşturur ve iletiyi ona iletir.

Bunun çalışması için, veri alma yöntemini ayarlayarak OnNextAsync abonelik işlemini tamamlayın. Bunu yapmak için, ReceiverGrain içinde OnActivateAsync benzeri bir şey çağırmalıdır.

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

Artık hazırsınız! Şimdi tek gereksinim, üretici taneciğinin oluşturulmasını tetikleyen bir şey olması. Ardından zamanlayıcıyı kaydeder ve tüm ilgili taraflara rastgele tamsayılar göndermeye başlar.

Bu kılavuz birçok ayrıntıyı atlar ve yalnızca üst düzey bir genel bakış sağlar. Nelerin kullanılabilir olduğunu ve nasıl çalıştığını iyi anlamak için bu kılavuzun diğer bölümlerini ve Rx'te diğer kaynakları okuyun.

Reaktif programlama, birçok sorunu çözmek için güçlü bir yaklaşım olabilir. Örneğin, abonede LINQ kullanarak sayıları filtreleyip çeşitli ilginç işlemler gerçekleştirebilirsiniz.

Ayrıca bkz.

Orleans Streams Programlama API'leri