Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu makalede Java için Azure SDK'sında zaman uyumsuz programlama modeli açıklanmaktadır.
Azure SDK başlangıçta yalnızca Azure hizmetleriyle etkileşime yönelik engelleyici olmayan, zaman uyumsuz API'ler içeriyordu. Bu API'ler, sistem kaynaklarını verimli bir şekilde kullanan ölçeklenebilir uygulamalar oluşturmak için Azure SDK'sını kullanmanıza olanak tanır. Ancak, Java için
Java için Azure SDK reaktif akışlar
Java Azure SDK Tasarım Yönergeleri içindeki Async Service Clients bölümüne bakarsanız, eşzamansız API'lerin Java 8 tarafından sağlanan CompletableFuture kullanmak yerine reaktif türleri kullandığını görürsünüz. Azure SDK ekibi neden JDK'da yerel olarak kullanılabilen türler yerine reaktif türleri seçti?
Java 8' de Streams, Lambdas ve CompletableFuture gibi özellikler sunulmuştur. Bu özellikler birçok özellik sağlar, ancak bazı sınırlamaları vardır.
CompletableFuture geri çağırma tabanlı, engelleyici olmayan özellikler sağlar ve CompletionStage arabirim bir dizi zaman uyumsuz işlem oluşturmayı kolaylaştırır. Lambdalar, bu gönderme tabanlı API'leri daha okunabilir hale getirir. Akışlar, veri öğelerinin bir koleksiyonunu işlemek için işlevsel stil işlemleri sağlar. Ancak akışlar eşzamanlıdır ve yeniden kullanılamaz.
CompletableFuture tek bir istekte bulunmanıza olanak tanır, geri arama için destek sağlar ve tek bir yanıt bekler. Ancak birçok bulut hizmeti, örneğin Event Hubs gibi veri akışı yapabilmeyi gerektirir.
Reaktif akışlar, öğeleri kaynaktan aboneye aktararak bu sınırlamaların üstesinden gelmeye yardımcı olabilir. Abone bir kaynaktan veri istediğinde, kaynak herhangi bir sayıda sonucu geri gönderir. Hepsini aynı anda göndermesi gerekmez. Kaynakta gönderilecek veriler olduğunda aktarım belirli bir süre boyunca gerçekleşir.
Bu modelde abone, olay işleyicilerini geldiğinde verileri işlemek üzere kaydeder. Bu gönderme tabanlı etkileşimler, aboneyi farklı sinyallerle bilgilendirir:
- Çağrı
onSubscribe(), veri aktarımının başlamak üzere olduğunu gösterir. - Çağrı
onError(), veri aktarımının sonunu da işaretleyen bir hata olduğunu gösterir. - Çağrı
onComplete(), veri aktarımının başarıyla tamamlanmasını gösterir.
Java Akışlarından farklı olarak, reaktif akışlar hataları birinci sınıf olaylar olarak ele alır. Reaktif akışlar, kaynağın hataları aboneye iletmesi için ayrılmış bir kanala sahiptir. Ayrıca, reaktif akışlar abonenin bu akışları bir anında çekme modeline dönüştürmek için veri aktarım hızı konusunda anlaşmasına olanak sağlar.
Reactive Streams belirtimi, veri aktarımının nasıl gerçekleşmesi gerektiğine ilişkin bir standart sağlar. Belirtim, üst düzeyde aşağıdaki dört arabirimi tanımlar ve bu arabirimlerin nasıl uygulanması gerektiğine ilişkin kuralları belirtir.
- Publisher bir veri akışının kaynağıdır.
- Abone , bir veri akışının tüketicisidir.
- Abonelik , yayımcı ve abone arasındaki veri aktarımının durumunu yönetir.
- İşlemci hem yayımcı hem de abonedir.
Bazı iyi bilinen Java kitaplıkları bu belirtimin uygulamalarını sağlar, RxJava, Akka Streams, Vert.x ve Project Reactor gibi.
Java için Azure SDK, asenkron API'lerini sunmak için Project Reactor'ı benimsedi. Bu kararı veren ana faktör, Project Reactor'ı da kullanan Spring Webflux ile sorunsuz tümleştirme sağlamaktı. Project Reactor'ı RxJava yerine seçmeye katkıda bulunan bir diğer faktör de Project Reactor'ın Java 8 kullanması ama O zamanlar RxJava'nın hala Java 7'de olmasıydı. Project Reactor ayrıca birleştirilebilir ve veri işleme işlem hatları oluşturmak için bildirim temelli kod yazmanıza olanak sağlayan zengin bir işleç kümesi sunar. Project Reactor'ın bir diğer güzel özelliği de Project Reactor türlerini diğer popüler uygulama türlerine dönüştürmek için bağdaştırıcılara sahip olmasıdır.
Zaman uyumlu ve zaman uyumsuz işlemlerin API'lerini karşılaştırma
Eşzamanlı istemciler ve eşzamansız istemciler için seçenekler hakkında bilgi edindiniz. Aşağıdaki tabloda, bu seçenekler kullanılarak tasarlanan API'lerin nasıl göründüğü özetlenmiştir:
| API Türü | Değer yok | Tek değer | Birden çok değer |
|---|---|---|---|
| Standart Java - Zaman Uyumlu API'ler | void |
T |
Iterable<T> |
| Standart Java - Zaman Uyumsuz API'ler | CompletableFuture<Void> |
CompletableFuture<T> |
CompletableFuture<List<T>> |
| Reaktif Akış Arabirimleri | Publisher<Void> |
Publisher<T> |
Publisher<T> |
| Reaktif Akışların Proje Reaktörü uygulaması | Mono<Void> |
Mono<T> |
Flux<T> |
Eksiksizlik açısından, Java 9'un dört reaktif akış arabirimi içeren Flow sınıfını tanıttığını belirtmek gerekir. Ancak, bu sınıf herhangi bir uygulama içermez.
Java için Azure SDK'da zaman uyumsuz API'ler kullanma
Reaktif akış tanımı, yayımcı türleri arasında ayrım yapmaz. Reaktif akışlar belirtiminde yayımcılar basitçe sıfır veya daha fazla veri öğesi üretir. Çoğu durumda, en fazla bir veri öğesi üreten yayımcı ile sıfır veya daha fazla veri üreten yayımcı arasında yararlı bir ayrım vardır. Bulut tabanlı API'lerde bu fark, isteğin tek değerli bir yanıt mı yoksa koleksiyon mu döndürdüğüne işaret eder. Project Reactor bu ayrımı yapmak için iki tür sağlar - Mono ve Flux. Bir API Mono döndürdüğünde, yanıtı en fazla bir değer içerir ve bir API Flux döndürdüğünde, yanıtı sıfır veya daha fazla değer içerir.
Örneğin, Azure Uygulama Yapılandırma hizmeti kullanılarak depolanan bir yapılandırmayı almak için ConfigurationAsyncClient kullandığınızı varsayalım. (Daha fazla bilgi için bkz. Azure Uygulama Yapılandırması nedir?.)
"ConfigurationAsyncClient oluşturup istemcide getConfigurationSetting() çağrısı yaparsanız, yanıtın tek bir değer içerdiğini gösteren bir Mono döndürülür." Ancak, bu yöntemi tek başına çağırmak hiçbir şey yapmaz. İstemci henüz Azure Uygulama Yapılandırması hizmetine istekte bulunmadı. Bu aşamada, bu API tarafından döndürülen Mono<ConfigurationSetting> yalnızca veri işleme hattının bir "bileşimi"dir. Bu mimari, verileri tüketmek için gereken kurulumun tamamlandığını gösterir. Veri aktarımını gerçekten tetikleyebilmek için (hizmete istekte bulunmak ve yanıtı almak için), döndürülen Monoöğesine abone olmanız gerekir. Bu nedenle, bu reaktif akışlarla ilgilenirken, subscribe() fonksiyonunu aramayı hatırlamalısınız çünkü bunu yapana kadar hiçbir şey olmaz.
Aşağıdaki örnek, Mono yapılandırma değerine nasıl abone olunacağını ve bu değerin konsola nasıl yazdırılacağını göstermektedir.
ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
.connectionString("<your connection string>")
.buildAsyncClient();
asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
config -> System.out.println("Config value: " + config.getValue()),
ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
() -> System.out.println("Successfully retrieved configuration setting"));
System.out.println("Done");
İstemcide çağrıldıktan getConfigurationSetting() sonra örnek kodun sonuba abone olduğuna ve üç ayrı lambda sağladığına dikkat edin. İlk lambda, hizmetten alınan verileri tüketir ve başarılı bir yanıt sonrası tetiklenir. Yapılandırma alınırken bir hata olduğunda ikinci lambda tetikleniyor. Üçüncü lambda, veri akışı tamamlandığında çağrılır, yani bu akıştan başka veri öğesi beklenmez.
Uyarı
Tüm zaman uyumsuz programlamalarda olduğu gibi, abonelik oluşturulduktan sonra yürütme her zamanki gibi devam eder. Programı etkin ve çalışır durumda tutacak hiçbir şey yoksa, eşzamansız işlem tamamlanmadan önce program sonlanabilir.
subscribe() öğesini çağıran ana iş parçacığı, siz Azure Uygulama Yapılandırması’a ağ çağrısı yapıp bir yanıt alana kadar beklemez. Üretim sistemlerinde başka bir şeyi işlemeye devam edebilirsiniz, ancak bu örnekte, Thread.sleep() çağırarak bir miktar bekleme süresi ekleyebilir veya zaman uyumsuz işlemin tamamlanmasına fırsat vermek için bir CountDownLatch kullanabilirsiniz.
Aşağıdaki örnekte gösterildiği gibi, Flux döndüren bir API de benzer bir yapı izler. Aradaki fark, yönteme sağlanan ilk geri çağırmanın subscribe() yanıttaki her veri öğesi için birden çok kez çağrılır. Hata veya tamamlama geri çağırmaları tam olarak bir kez çağrılır ve terminal sinyalleri olarak kabul edilir. Yayımcıdan bu sinyallerden biri alınırsa başka geri çağırma fonksiyonları çalıştırılmaz.
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(
event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
Geri basınç
Kaynak verileri abonenin işleyebileceğinden daha hızlı ürettiğinde ne olur? Abone verilerden etkilenebilir ve bu da yetersiz bellek hatalarına yol açabilir. Abonenin, yetişemediğinde yavaşlayabilmesi için yayımcıyla geri iletişim kurması için bir yönteme ihtiyacı vardır. Varsayılan olarak, önceki örnekte gösterildiği gibi bir subscribe() çağrısı Flux yaptığınızda abone, yayımcıya verileri mümkün olan en kısa sürede göndermesini belirten, ilişkisiz bir veri akışı isteğinde bulunur. Bu davranış her zaman arzu edilmez ve abonenin "backpressure" aracılığıyla yayımlama hızını denetlemesi gerekebilir. Geri baskı, abonenin veri öğelerinin akışını denetlemesini sağlar. Abone, işleyebileceği sınırlı sayıda veri öğesi istemektedir. Abone bu öğeleri işlemeyi tamamladıktan sonra daha fazla istekte bulunabilir. Geri basınç kullanarak, veri aktarımı için bir itme modelini itme-çekme modeline dönüştürebilirsiniz.
Aşağıdaki örnekte Event Hubs tüketicisi tarafından olayların alınma hızını nasıl denetleyebileceğiniz gösterilmektedir:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.request(1); // request another event when the subscriber is ready
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Abone yayımcıya ilk kez "bağlandığında", yayımcı aboneye veri aktarımının durumunu yöneten bir Subscription örnek iletir. Bu Subscription, abonenin işleyebileceği veri öğesi sayısını belirtmek için request() çağrısı yaparak geri basınç uygulayabileceği bir araçtır.
Abone, onNext() öğesini her çağırdığında örneğin request(10) için birden fazla veri öğesi isterse, yayımcı sonraki 10 öğeyi mevcutsa hemen, değilse kullanılabilir hâle geldiklerinde gönderir. Bu öğeler abone tarafındaki bir arabellekte birikir ve her onNext() çağrısı 10 öğe daha istediğinden, yayımcının gönderecek başka veri öğesi kalmayana veya abonenin arabelleği taşarak yetersiz bellek hatalarına yol açana kadar birikme artmaya devam eder.
Reaktif akış aboneliğini iptal etme
Abonelik, yayımcı ve abone arasındaki veri aktarımının durumunu yönetir. Yayımcı tüm verileri aboneye aktarmayı bitirene veya abone artık veri almakla ilgilenmeyene kadar abonelik etkin kalır. Aşağıdaki örneklerde gösterildiği gibi bir aboneliği birkaç şekilde iptal edebilirsiniz.
Aşağıdaki örnek, aboneyi çıkararak aboneliği iptal eder.
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
Disposable disposable = asyncClient.receive().subscribe(
partitionEvent -> {
Long num = partitionEvent.getData().getSequenceNumber()
System.out.println("Sequence number of received event: " + num);
},
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();
Aşağıdaki örnek, cancel() yöntemini Subscription üzerinde çağırarak aboneliği iptal eder:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.cancel(); // Cancels the subscription. No further event is received.
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Sonuç
İş parçacıkları, uzak hizmet çağrılarından gelen yanıtları beklerken boşa harcamamanız gereken pahalı kaynaklardır. Mikro hizmet mimarilerinin benimsenmesi arttıkça kaynakları verimli bir şekilde ölçeklendirme ve kullanma gereksinimi de önemli hale gelir. Ağ bağlantılı süreçler olduğunda zaman uyumsuz API'ler tercih edilir. Java için Azure SDK, sistem kaynaklarınızı en üst düzeye çıkarmak için zaman uyumsuz işlemler için zengin bir API kümesi sunar. Asenkron istemcileri deneyin.
Belirli görevlerinize en uygun operatörler hakkında daha fazla bilgi için Reactor 3 Referans Kılavuzu'ndahangi operatöre ihtiyacım var? bölümüne bakın.
Sonraki Adımlar
Çeşitli zaman uyumsuz programlama kavramlarını daha iyi anladığınıza göre, sonuçlar üzerinde yineleme yapmayı öğrenmek önemlidir. En iyi yineleme stratejileri ve sayfalandırmanın nasıl çalıştığı hakkında daha fazla bilgi için bkz. Java için Azure SDK'da sayfalandırma ve yineleme.