Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Uygulamalar, .NET'teki iyi bilinen Reaktif Uzantılar'a (Rx) çok benzeyen API'ler aracılığıyla akışlarla etkileşim kurar. Temel fark, Orleans ' dağıtılmış ve ölçeklenebilir işlem dokusunda işlemeyi daha verimli hale getirmek için akış Orleans olmasıdır.
Zaman uyumsuz akış
Akışa bir referans almak için bir akış sağlayıcısı kullanmaya başlarsınız. Bir akış sağlayıcısını, uygulayıcıların akış davranışını ve semantiğini özelleştirmesine olanak tanıyan bir akış fabrikası olarak düşünebilirsiniz:
public async Task SetupStream()
{
IStreamProvider streamProvider = this.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<string> stream = streamProvider.GetStream<string>(streamId);
}
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Taneciğin içindeyken Grain.GetStreamProvider yöntemini çağırarak veya istemci örneğinde GetStreamProvider yöntemini çağırarak akış sağlayıcısına başvurabilirsiniz.
Orleans.Streams.IAsyncStream<T>, bir sanal akışa yönelik mantıksal, güçlü türlendirilmiş bir tanıtıcıdır ve Orleans Tanecik Referansı'na benzer.
GetStreamProvider ve GetStream çağrıları tamamen yereldir.
GetStream için bağımsız değişkenler, bir GUID ve ek bir dize (null olabilen bir akış ad alanı) içerir. GUID ve ad alanı dizesi birlikte akış kimliğini oluşturur (bu, IGrainFactory.GetGrain için bağımsız değişkenlere benzer şekilde). Bu birleşim, akış kimliklerini belirleme konusunda ek esneklik sağlar. Tıpkı bir tane 7'nin PlayerGrain türü içinde var olabileceği ve farklı bir tane 7'nin ChatRoomGrain türü içinde bulunabileceği gibi, Akış 123, PlayerEventsStream ad alanı içinde bulunabilir ve farklı bir akış 123, ChatRoomMessagesStream ad alanı içinde bulunabilir.
Üretim ve tüketim
IAsyncStream<T> hem hem de IAsyncObserver<T>IAsyncObservable<T> arabirimlerini uygular. Bu, uygulamanızın akışı kullanarak ya yeni olaylar üretmesine IAsyncObserver<T> ya da olaylara abone olup olayları IAsyncObservable<T> tüketmesine olanak tanır.
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
Akışında olaylar üretmek için uygulamanız şunları çağırır:
await stream.OnNextAsync<T>(event)
Bir akışa abone olmak için uygulamanız şunları çağırır:
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
SubscribeAsync bağımsız değişkeni, IAsyncObserver<T> arabirimini uygulayan bir nesne veya gelecek olayları işlemek için bir dizi lambda işlevi olabilir. Daha fazla seçenek SubscribeAsync için AsyncObservableExtensions sınıfı aracılığıyla kullanılabilir. SubscribeAsync, akıştan aboneliği kaldırmak için kullanılan bir opak tanıtıcı olan StreamSubscriptionHandle<T> döndürür (zaman uyumsuz bir sürümü olan IDisposable'e benzer).
await subscriptionHandle.UnsubscribeAsync()
Aboneliğin etkinleştirme için değil, bir tahıl için olduğunu unutmayın. Tahıl kodu akışa abone olduktan sonra, bu abonelik bu etkinleştirmenin ömrünü aşıyor ve tahıl kodu (potansiyel olarak farklı bir etkinleştirmede) açıkça abonelikten kaldırılana kadar sonsuza kadar dayanıklı kalır. Bu, sanal akış soyutlamasının özüdür: Tüm akışlar her zaman mantıksal olarak var olmakla kalmaz, bir akış aboneliği de dayanıklıdır ve onu oluşturan belirli fiziksel etkinleştirmeden sonra da varlığını sürdürür.
Çokluk
Bir Orleans akışın birden çok üreticisi ve birden çok tüketicisi olabilir. Bir üretici tarafından yayımlanan bir ileti, ileti yayımlanmadan önce akışa abone olan tüm tüketicilere teslim edilir.
Ayrıca, bir tüketici aynı akışa birden çok kez abone olabilir. Her abone olduğunda her seferinde benzersiz bir StreamSubscriptionHandle<T> alır. Bir bileşen (veya istemci) aynı akışa X kez abone olursa, her abonelik için bir kez olmak üzere X kez aynı etkinliği alır. Tüketici ayrıca bireysel abonelik aboneliğinden de çıkabilir. Şu çağrıyı yaparak tüm geçerli abonelikleri bulabilirsiniz:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Hatalardan kurtarma
Bir akışın üreticisi ölürse (veya bileşeni devre dışı bırakılırsa) hiçbir şey yapması gerekmez. Bu tahıl daha fazla olay üretmek istediğinde akış tutamacını yeniden alabilir ve her zamanki gibi yeni olaylar üretebilir.
Tüketici mantığı biraz daha karmaşıktır. Daha önce belirtildiği gibi, bir tüketici birimi bir akışa abone olduktan sonra, bu abonelik, açıkça abonelikten çıkana kadar geçerlidir. Akışın tüketicisi ölürse (veya bileşeni devre dışı bırakılırsa) ve akışta yeni bir olay oluşturulursa, tüketici bileşeni otomatik olarak yeniden etkinleştirilir (tıpkı bir ileti gönderildiğinde her normal Orleans bileşenin otomatik olarak etkinleşmesi gibi). Kodun şu anda yapması gereken tek şey, verileri işlemek için bir IAsyncObserver<T> sağlamaktır. Tüketicinin yöntemin bir parçası olarak işleme mantığını yeniden eklemesi OnActivateAsync() gerekir. Bunu yapmak için şunları çağırabilir:
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
Tüketici, "işlemeye devam etmek" amacıyla ilk abonelik sırasında elde edilen önceki kimliği kullanır. ResumeAsync Mevcut aboneliği yalnızca yeni mantık örneğiyle güncelleştirdiğini ve bu tüketicinin IAsyncObserver<T> bu akışa zaten abone olduğu gerçeğini değiştirmediğini unutmayın.
Tüketici eski olanı nasıl elde eder subscriptionHandle? İki seçenek vardır. Tüketici, özgün SubscribeAsync işlemden döndürülen tanıtıcıyı saklamış olabilir ve artık kullanabilir. Alternatif olarak, tüketici tanıtıcıya sahip değilse IAsyncStream<T> çağrısını yaparak tüm etkin abonelik tanıtıcılarını isteyebilir.
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Tüketici daha sonra bunların tümünü sürdürebilir veya isterse bazılarından aboneliğini kaldırabilir.
İpucu
Tüketici arabirimini IAsyncObserver<T> doğrudan (public class MyGrain<T> : Grain, IAsyncObserver<T>) uygularsa, teorik olarak IAsyncObserver<T>'yi yeniden eklemesine gerek kalmaz ve bu nedenle ResumeAsync çağrısına gerek kalmaz. Akış çalışma zamanı, öğenin bu IAsyncObserver<T> yöntemlerini zaten uyguladığını ve bu IAsyncObserver<T> yöntemlerini çağırdığını otomatik olarak anlamalıdır. Ancak, akış çalıştırma zamanı şu anda bunu desteklememektedir ve tahıl doğrudan ResumeAsync uygulasa bile, tahıl kodunun açıkça IAsyncObserver<T> çağrısı yapması gerekir.
Açık ve örtük abonelikler
Varsayılan olarak, bir akış tüketicisinin akışa açıkça abone olması gerekir. Bu abonelik genellikle, tahılın (veya istemcinin) abone olmasını ifade eden harici bir mesajla tetiklenir. Örneğin, bir sohbet hizmetinde, bir kullanıcı sohbet odasına katıldığında, kullanıcıya ait grain sohbet adıyla birlikte bir JoinChatGroup mesajı alır ve bu da kullanıcı grainin bu sohbet akışına abone olmasına neden olur.
Ayrıca, Orleans akışlar örtük abonelikleri destekler. Bu modelde, tahıl belirgin bir şekilde dahil edilmez. Otomatik olarak ve örtük olarak kendi tanecik kimliğine ve bir ImplicitStreamSubscriptionAttributeöğesine göre abone olur. Örtük aboneliklerin temel değeri, akış etkinliğinin otomatik olarak tahıl etkinleştirmesini (ve dolayısıyla aboneliği) tetiklesine izin vermektir. Örneğin, SMS akışlarını kullanarak, bir tahıl bir akış üretmek ve başka bir tahıl bunu işlemek istiyorsa, üretici, tüketici tahılın kimliğine ihtiyaç duyar ve abone olmasını söyleyen bir çağrı yapması gerekir. Ancak o zaman olay göndermeye başlayabilirdi. Bunun yerine örtük aboneliklerle üretici bir akışa olay üretmeye başlayabilir ve tüketici dilimi otomatik olarak etkinleştirilir ve abone olur. Bu durumda yapımcının olayları kimin okuduğu hakkında bilgi sahibi olması gerekmez.
Grain uygulaması MyGrainType bir özniteliğini [ImplicitStreamSubscription("MyStreamNamespace")]bildirebilir. Bu, akış çalışma zamanına, kimlik GUID'si XXX ve ad alanı "MyStreamNamespace" olan bir akışta bir olay oluşturulduğunda, kimliği XXX ve türü MyGrainType olan taneciğe teslim edilmesi gerektiğini bildirir. Başka bir ifadeyle, çalışma zamanı akışı <XXX, MyStreamNamespace> tüketici dilimine eşler <XXX, MyGrainType>.
varlığı ImplicitStreamSubscription , akış çalışma zamanının bu dilimi akışa otomatik olarak abone yapıp akış olaylarını ona teslim etmesine neden olur. Ancak, tahıl kodunun yine de çalışma zamanına olayların nasıl işlenmesini istediğini bildirmesi gerekir. Temelde, öğesini eklemesi IAsyncObserver<T>gerekir. Bu nedenle, tahıl etkinleştirildiğinde, içindeki OnActivateAsync tahıl kodunun şunları çağırması gerekir:
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
IStreamProvider streamProvider =
this.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<string> stream =
streamProvider.GetStream<string>(streamId);
StreamSubscriptionHandle<string> subscription =
await stream.SubscribeAsync(new MyStreamObserver());
}
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Abonelik mantığı yazma
Aşağıda çeşitli durumlar için abonelik mantığı yazma yönergeleri verilmiştir: açık ve örtük abonelikler, geri alınabilen ve geri alınamayan akışlar. Örtük aboneliklerdeki temel fark, her akış ad alanının daima yalnızca bir örtük aboneliğe sahip olmasıdır. Birden çok abonelik oluşturmanın yolu yok (abonelik çokluğu yoktur), abonelikten çıkmanın bir yöntemi yoktur ve 'grain mantığı' yalnızca işleme mantığını eklemeye ihtiyaç duyar. Bu, örtük bir aboneliği sürdürmeye gerek olmadığı anlamına da gelir. Öte yandan, açık abonelikler için aboneliği sürdürmeniz gerekir; aksi takdirde, yeniden abone olmak tahılın birden çok kez abone olmasına neden olur.
Örtük abonelikler:
Örtük abonelikler için, işleme mantığını eklemek için tahılın yine de abone olması gerekir.
IStreamSubscriptionObserver ve IAsyncObserver<T> arabirimlerini uygulayarak, bu işlemi tüketici diliminde yapabilir ve böylece tahılın abone olmaktan ayrı olarak etkinleştirilmesini sağlayabilirsiniz. Akışa abone olmak için, tahıl bir tanıtıcı oluşturur ve yöntemini çağırır await handle.ResumeAsync(this)OnSubscribed(...) .
İletileri işlemek için IAsyncObserver<T>.OnNextAsync(...) yöntemini uygulayın ve akış verilerini ve bir sıra belirtecini alın. Alternatif olarak, ResumeAsync yöntemi arabiriminin yöntemlerini IAsyncObserver<T> temsil eden bir temsilci kümesi alabilir: onNextAsync, onErrorAsyncve onCompletedAsync.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation("Received an item from the stream: {Item}", item);
return Task.CompletedTask;
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Açık abonelikler:
Açık abonelikler için, akışa abone olmak için bir dilimin çağrısı SubscribeAsync yapması gerekir. Bu bir abonelik oluşturur ve işleme mantığını ekler. Belirtik abonelik, tahıl aboneliği kaldırılana kadar var olur. Bir tanecik devre dışı bırakılır ve yeniden etkinleştirilirse, yine de açıkça abone kalır, ancak işleme mantığı bağlı değildir. Bu durumda, tahılın işleme mantığını yeniden eklemesi gerekir. Bunu yapmak için, OnActivateAsync içinde tahılın önce IAsyncStream<T>.GetAllSubscriptionHandles() çağrısını yaparak aboneliklerini bulması gerekir. Tahıl, işlemeye devam etmek istediği her tutamaçta ResumeAsync yürütmelidir veya yaptığı tüm tutamaçlardan UnsubscribeAsync. Öz, isteğe bağlı olarak StreamSequenceToken öğesini ResumeAsync çağrılarına bağımsız değişken olarak belirleyebilir ve bu açık abonelik, bu belirteçten kullanmaya başlamasına neden olur.
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Akış sırası ve sıralı belirteçler
Tek bir üretici ve tüketici arasındaki olay tesliminin sırası akış sağlayıcısına bağlıdır.
SMS ile üretici, tüketici tarafından görülen olayların sırasını açıkça denetleyerek bunları nasıl yayımladıklarını denetler. Varsayılan olarak (SMS sağlayıcısı seçeneği SimpleMessageStreamProviderOptions.FireAndForgetDelivery ise false) ve üretici her OnNextAsync çağrıyı bekliyorsa, olaylar FIFO sırasına göre gelir. SMS'de üretici, bir Task hatasının arızalı olduğunu belirten ve OnNextAsync çağrısı tarafından döndürülen teslim hatalarını nasıl yönetileceğine karar verir.
Temel alınan Azure Kuyrukları hata durumlarında sırayı garanti etmediği için (hatasız yürütmelerde FIFO sırasını garanti etseler bile) Azure Kuyruk akışları FIFO sırasını garanti etmez. Bir üretici Azure Kuyruğuna olay gönderdiğinde, kuyruk işlemi başarısız olursa, üreticinin başka bir kuyruk denemesi ve daha sonra olası yinelenen iletileri çözümlemesi gerekir. Teslim tarafında Akış Orleans çalışma zamanı olayı sıralar ve tüketicilere işlenmek üzere teslim etmeye çalışır. Çalışma zamanı, yalnızca başarılı bir işleme sürecinden sonra olayı kuyruktan siler. Teslim veya işleme başarısız olursa olay kuyruktan silinmez ve daha sonra otomatik olarak yeniden görünür. Streaming çalışma zamanı, potansiyel olarak FIFO sırasını bozabilecek şekilde yeniden teslim etmeye çalışır. Bu davranış, Azure Kuyruklarının normal semantiğiyle eşleşir.
Uygulama tanımlı sipariş: Yukarıdaki sıralama sorunlarını işlemek için uygulamanız isteğe bağlı olarak sıralamasını belirtebilir. Olayları sıralamak için kullanılan bir opak StreamSequenceToken nesnesi kullanarak IComparablebunu elde edin. Bir üretici isteğe bağlı StreamSequenceToken bir çağrıyı OnNextAsync geçirebilir. Bu StreamSequenceToken, tüketiciye aktarılır ve etkinlikle birlikte teslim edilir. Bu şekilde, uygulamanız akış çalışma zamanından bağımsız olarak sıralamasını gerekçelendirebilir ve yeniden oluşturabilir.
Geri sarılabilir akışlar
Bazı akışlar yalnızca zamanın en son noktasından başlayarak abone olmaya izin verirken, diğerleri "zamanda geriye gitme" seçeneğine izin verir. Bu özellik, temel alınan kuyruğa alma teknolojisine ve belirli akış sağlayıcısına bağlıdır. Örneğin, Azure Queues yalnızca en son sıralanan olayların tüketilmesine izin verirken, Event Hubs belirli bir noktadan itibaren olayların (belirli bir süreye kadar) yeniden yürütülmesine izin verir. Zamanda geri dönmeyi destekleyen akışlara geri sarılabilir akışlar denir.
Geri sarılabilir bir akışın tüketicisi çağrısına StreamSequenceToken bir SubscribeAsync geçirebilir. Çalışma zamanı, StreamSequenceToken öğesinden başlayarak olayları teslim eder. Null belirteç, tüketicinin en son sürümden başlayarak olayları almak istediği anlamına gelir.
Akışı geri sarma özelliği kurtarma senaryolarında çok yararlıdır. Örneğin, bir akışa abone olan ve durumunu en son sıra belirteciyle birlikte düzenli aralıklarla kaydeden bir nesne düşünün. Bir hatadan kurtarılırken, grain, son checkpoint'in sıralı belirteçinden aynı veri akışına yeniden abone olabilir ve bu süreçte son checkpoint'ten bu yana üretilen olayları kaybetmeden kurtulabilir.
Event Hubs sağlayıcısı geri alınabilir. Kodunu GitHub'da bulabilirsiniz: Orleans/Azure/Orleans. Streaming.EventHubs. SMS (şimdi Yayın Kanalı) ve Azure Kuyruğu sağlayıcıları geri sarılamaz.
Durum bilgisi olmayan otomatik olarak ölçeklendirilen işleme
Varsayılan olarak, Orleans çok sayıda nispeten küçük akışı destekleyen akış hedefleri, her biri bir veya daha fazla durum bilgisine sahip tanecik tarafından işlenir. Toplu olarak, tüm akışların işlenmesi birçok normal (durum bilgisi olan) tane arasında parçalanır. Uygulama kodunuz, veri akışı kimlikleri ve tanecik kimlikleri atayarak ve doğrudan abone olarak bu parçalanmayı denetler. Amaç, durum bilgisi olan parçalı işlemedir.
Ancak, otomatik olarak ölçeklendirilen durum bilgisi olmayan işlemenin ilginç bir senaryosu da vardır. Bu senaryoda, bir uygulamanın az sayıda akışı (hatta bir büyük akışı) vardır ve hedef durumsuz işlemdir. Küresel bir olay akışı, her olayın kodunun çözülmesini ve durum bilgisi gerektiren daha fazla işlem için diğer akışlara iletilmesini içeren işlemlerle bir örnek oluşturur. Durumsuz ölçeklendirilmiş akış işleme, Orleans içinde StatelessWorkerAttribute bileşenler aracılığıyla desteklenebilir.
Durum bilgisi gerektirmeyen otomatik ölçeklenebilir işlemenin mevcut durumu: Henüz uygulanmadı. StatelessWorkerAttribute tanecikten bir akışa abone olma girişimi tanımsız davranışla sonuçlanır. Bu seçeneği desteklemeyi düşünüyoruz.
Tahıllar ve Orleans istemciler
Orleans Akışlar, tahıllar ve Orleans istemciler arasında düzgün çalışır. Bu, olayları üretmek ve tüketmek için bir grain içinde ve Orleans istemcide aynı API'leri kullanabileceğiniz anlamına gelir. Bu, uygulama mantığını büyük ölçüde basitleştirerek Grain Observers gibi özel istemci tarafı API'lerini yedekli hale getirir.
Tam olarak yönetilen ve güvenilir akış pub-sub
Akış aboneliklerini izlemek için Orleans, akış tüketicileri ve üreticileri için bir randevu noktası görevi gören Streaming Pub-Sub adlı bir çalışma zamanı bileşeni kullanır. Pub-sub tüm akış aboneliklerini izler, kalıcı olarak saklar ve akış tüketicilerini akış üreticileriyle eşleştirir.
Uygulamalar Pub-Sub verilerinin nerede ve nasıl depolandığını seçebilir. Pub-Sub bileşeninin kendisi, bildirim temelli kalıcılık kullanan PubSubRendezvousGrain tanecikler (olarak adlandırılırOrleans) olarak uygulanır.
PubSubRendezvousGrain adlı PubSubStoredepolama sağlayıcısını kullanır. Her dilimde olduğu gibi, bir depolama sağlayıcısı için bir uygulama belirleyebilirsiniz. Streaming Pub-Sub için, siloyu inşa ederken "silo ana bilgisayar oluşturucusunu" kullanarak PubSubStore uygulamasını değiştirebilirsiniz.
Aşağıdaki Pub-Sub durumunu Azure tablolarında depolamak için yapılandırmaktadır.
var endpoint = new Uri(configuration["AZURE_TABLE_STORAGE_ENDPOINT"]!);
var credential = new DefaultAzureCredential();
hostBuilder.UseOrleans(siloBuilder =>
{
siloBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.TableServiceClient = new TableServiceClient(endpoint, credential));
});
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
Bu şekilde, Pub-Sub veriler Azure Tablosu'nda durabilir bir şekilde depolanır. İlk geliştirme için bellek depolamayı da kullanabilirsiniz. Pub-Sub'a ek olarak Streaming Runtime, Orleans üreticilerden tüketicilere olaylar sunar, etkin olarak kullanılan akışlara ayrılan tüm çalışma zamanı kaynaklarını yönetir ve kullanılmayan akışlardan çalışma zamanı kaynaklarını saydam bir şekilde çöp toplar.
Yapılandırma
Akışları kullanmak için silo konağı veya küme istemci oluşturucuları aracılığıyla akış sağlayıcılarını etkinleştirmeniz gerekir. Örnek akış sağlayıcısı kurulumu:
var tableEndpoint = new Uri(configuration["AZURE_TABLE_STORAGE_ENDPOINT"]!);
var queueEndpoint = new Uri(configuration["AZURE_QUEUE_STORAGE_ENDPOINT"]!);
var credential = new DefaultAzureCredential();
hostBuilder.UseOrleans(siloBuilder =>
{
siloBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams("AzureQueueProvider",
optionsBuilder => optionsBuilder.ConfigureAzureQueue(
options => options.Configure(
opt => opt.QueueServiceClient = new QueueServiceClient(queueEndpoint, credential))))
.AddAzureTableGrainStorage("PubSubStore",
options => options.TableServiceClient = new TableServiceClient(tableEndpoint, credential));
});
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");