Aracılığıyla paylaş


Streams uygulama ayrıntıları

Bu bölüm, Stream uygulamasına üst düzey bir genel bakış sağlar. Uygulama düzeyinde görünmeyen kavramları ve ayrıntıları açıklar. Yalnızca akışları kullanmayı planlıyorsanız bu bölümü okumanız gerek değildir.

Terminoloji:

"Kuyruk" sözcüğü, akış olaylarını alan ve olayları çekmek için izin veren veya olayları tüketmek için itme tabanlı bir mekanizma sağlayan herhangi bir dayanıklı depolama teknolojisine başvururuz. Ölçeklenebilirlik sağlamak için bu teknolojiler genellikle parçalı/bölümlenmiş kuyruklar sağlar. Örneğin, Azure Kuyrukları birden çok kuyruk oluşturmanıza ve birden çok hub'Event Hubs oluşturmanıza olanak sağlar.

Kalıcı akışlar

Tüm Kalıcı kalıcı akış sağlayıcıları ortak bir uygulamasını paylaşır PersistentStreamProvider. Bu genel akış sağlayıcılarının teknolojiye özgü bir ile yapılandırılması gerekir IQueueAdapterFactory.

Örneğin, test amacıyla, bir kuyruktan verileri okumak yerine test verilerini oluşturan kuyruk bağdaştırıcıları vardır. Aşağıdaki kodda, özel (oluşturucu) kuyruk bağdaştırıcımızı kullanmak için kalıcı bir akış sağlayıcısını nasıl yapılandırmış oluruz? Bunu, kalıcı akış sağlayıcısını bağdaştırıcıyı oluşturmak için kullanılan bir fabrika işleviyle yapılandırarak yapar.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Bir akış üreticisi yeni stream.OnNext()bir akış öğesi oluştursa ve çağırsa, Akış IQueueAdapter çalışma zamanı, öğeyi doğrudan uygun kuyruğa sıraya alan akış sağlayıcısının uygun yöntemini çağırır.

Aracıları çekme

Kalıcı Akış Sağlayıcısı'nın merkezinde çekme aracıları yer almaktadır. Aracıları çekme, bir dizi dayanıklı kuyruktan olayları çeker ve bunları tüketen tanecikler içinde uygulama koduna teslim etmek. Çekme aracılarını bölümlenmiş, yüksek oranda kullanılabilir ve esnek bir dağıtılmış bileşen olan dağıtılmış bir "mikro hizmet" olarak düşünebilirsiniz. Çekme aracıları, uygulama taneciklerini barındıran ve Streaming Runtime tarafından tam olarak yönetilen aynı siloların içinde çalışır.

StreamQueueMapper ve StreamQueueBalancer

Çekme aracıları ve ile parametreli IStreamQueueMapper hale getirildi IStreamQueueBalancer. , IStreamQueueMapper tüm kuyrukların listesini sağlar ve akışları kuyruklara eşlemeden de sorumludur. Bu şekilde, Kalıcı Akış Sağlayıcısı'nın üretici tarafı iletinin kuyruğa alınarak hangi kuyruğa alınması gerektir.

, IStreamQueueBalancer kuyrukların Silo siloları ve aracılar arasında dengelenme yolunu ifade ediyor. Burada amaç, performans sorunlarını önlemek ve esnekliği desteklemek için kuyrukları aracılara dengeli bir şekilde atamaktır. Silo kümesine yeni bir silo ekleniyorsa kuyruklar eski ve yeni silolar arasında otomatik olarak yeniden dengelenir. , StreamQueueBalancer bu işlemi özelleştirmeye olanak sağlar. Farklı dengeleme senaryolarını (büyük ve az sayıda kuyruk) ve farklı ortamları (Azure, on-prem, static) desteklemek için Birçok yerleşik StreamQueueBalancers vardır.

Yukarıdaki test oluşturucu örneğini kullanarak aşağıdaki kod, kuyruk eşleyici ve kuyruk dengeleyiciyi nasıl yapılandırabilir?

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

Yukarıdaki kod, sekiz kuyruklu GeneratorAdapterFactory bir kuyruk eşleci kullanmak üzere yapılandırılan ve kullanarak küme genelinde kuyrukları dengeler DynamicClusterConfigDeploymentBalancer.

Çekme protokolü

Her silo bir dizi çekme aracısı çalıştırır, her aracı bir kuyruktan çektir. Aracıları çekme, SystemTarget adlı bir iç çalışma zamanı bileşeni tarafından uygulanır. SystemTarget'ler temelde çalışma zamanı taneleridir, tek iş parçacıklı eşzamanlılık durumuna tabi olur, düzenli tanecik mesajlaşmayı kullanabilir ve tanecikler kadar hafiftir. Taneciklerin aksine SystemTarget'ler sanal değildir: bunlar açıkça oluşturulur (çalışma zamanı tarafından) ve konum saydam değildir. Çekme aracılarını SystemTargets olarak uygulayan Streaming Runtime, yerleşik Olan Bazı özelliklere güvenerek çok sayıda kuyruğa ölçeklendirebilirsiniz çünkü yeni bir çekme aracısı oluşturmak yeni bir tane oluşturmak kadar ucuzdur.

Her çekme aracısı, yöntemini kullanarak kuyruktan çeken düzenli aralıklarla bir zamanlayıcı IQueueAdapterReceiver.GetQueueMessagesAsync çalıştırır. Döndürülen iletiler, adlı aracı başına iç veri yapısına yer verilir IQueueCache. Her ileti, hedef akışını bulmak için inceler. Aracı, Pub-Sub bu akışa abone olan akış tüketicilerinin listesini bulmak için bu aracıyı kullanır. Tüketici listesi alındıktan sonra, aracı bunu yerel olarak (pub-sub önbelleğinde) depolar, bu nedenle her iletiye Pub-Sub olması gerek değildir. Aracı ayrıca bu akışa abone olan yeni tüketicilerin bildirimlerini almak için pub-sub'a abone olur. Aracı ile pub-sub arasındaki bu el sıkışması güçlü akış aboneliği semantiği sağlar: Tüketici akışa abone olduktan sonra abone olduktan sonra oluşturulan tüm olayları görebilir. Ayrıca, kullanmak geçmişte StreamSequenceToken abone olma özelliğine olanak sağlar.

Kuyruk önbelleği

IQueueCache , yeni olayları kuyruktan bölümleme ve bunları tüketicilere teslime olanak sağlayan iç aracı başına veri yapısıdır. Ayrıca, farklı akışlara ve farklı tüketicilere teslimin ayrımlarını sağlar.

Imagine akışın 3 akış tüketicisi olduğu ve bunlardan birinin yavaş olduğu bir durumdur. Dikkat alınmazsa, bu yavaş tüketici aracı ilerlemesini, bu akışın diğer tüketicilerinin tüketimini yavaşlatarak ve hatta diğer akışlar için olaylarınqueuyulan ve teslimi yavaşlatarak etkilemektedir. Bunu önlemek ve aracıda en yüksek paralelliklere izin vermek için kullanıriz IQueueCache.

IQueueCache , akış olaylarını arabelleğe akar ve aracıya olayları her tüketiciye kendi hızıyla teslim etmek için bir yol sağlar. Tüketici başına teslim, tüketici başına ilerlemeyi takip eden adlı IQueueCacheCursoriç bileşen tarafından uygulanır. Bu şekilde her tüketici olayları kendi hızıyla alır: Hızlı tüketiciler olayları kuyruktan alındıklarından daha hızlı alırken yavaş tüketiciler daha sonra bunları alır. İleti tüm tüketicilere teslim edildiktan sonra önbellekten silinebilir.

Geripressure

StreamIng Runtime'daki geri basınç iki yerde uygulanır: kuyruktan aracıya akış olayları getirme ve olayları aracıdan akış tüketicilerine teslim.

İkinci seçenek, yerleşik Olan İleti teslim mekanizması tarafından sağlanır. Her akış olayı, aracıdan tüketicilere standart Grain mesajlaşması aracılığıyla tek tek teslim edilir. Diğer bir ifadeyle, aracılar her akış tüketicisi için bir olay (veya sınırlı boyutlu bir olay toplu işi) gönderir ve bu çağrıyı bekler. Bir sonraki olay, önceki olayın Görevi çözümlenene veya bozuk olana kadar teslim olmaya başlamaz. Bu şekilde tüketici başına teslim oranını doğal olarak aynı anda bir iletiyle sınırlandıracağız.

Stream olayları kuyruktan aracıya getiriken, Streaming yeni bir özel Backpressure mekanizması sağlar. Aracı, olayların kuyruktan kuyruktan kesilmelerini ve tüketicilere teslimi nedeniyle tek IQueueCache bir yavaş tüketici o kadar geride kaldı ki doyacak kadar geride kaldı. Süresiz IQueueCache olarak büyümeyi önlemek için boyutunu sınırlandırmış oluruz (boyut sınırı yapılandırılabilir). Ancak aracı hiçbir zaman teslim edilmeyen olayları atmıyor.

Bunun yerine, önbellek doldurulmaya başladığında, aracılar kuyruktan olay kuyruktan kuyruğa alınma oranını yavaşlatıyor. Bu şekilde kuyruktan tüketeceğimiz hızı ("geripressure") ayarlayarak yavaş teslim dönemlerini "atabilir" ve daha sonra hızlı tüketim oranlarına geri dönebilirsiniz. "Yavaş teslim" vadisini algılamak için IQueueCache , olayların bireysel akış tüketicilerine teslimi ilerlemesini takip eden önbellek demetlerinin iç veri yapısını kullanır. Bu, çok hızlı yanıt veren ve kendi kendini ayarlayarak bir sistemle sonuç verir.