Aracılığıyla paylaş


Java için Azure SDK'da zaman uyumsuz programlama

Bu makalede Java için Azure SDK'sında zaman uyumsuz programlama modeli açıklanmaktadır.

Azure SDK başlangıçta yalnızca Azure hizmetleriyle etkileşime yönelik engelleyici olmayan, zaman uyumsuz API'ler içeriyordu. Bu API'ler, sistem kaynaklarını verimli bir şekilde kullanan ölçeklenebilir uygulamalar oluşturmak için Azure SDK'sını kullanmanıza olanak tanır. Bununla birlikte , Java için Azure SDK daha geniş bir kitleye hitap eden zaman uyumlu istemciler de içerir ve istemci kitaplıklarımızı zaman uyumsuz programlama hakkında bilgi sahibi olmayan kullanıcılar için erişilebilir hale getirir. (Bkz. Azure SDK tasarım yönergelerinde Ulaşılabilir .) Bu nedenle, Java için Azure SDK'daki tüm Java istemci kitaplıkları hem zaman uyumsuz hem de zaman uyumlu istemciler sunar. Ancak, sistem kaynaklarının kullanımını en üst düzeye çıkarmak için üretim sistemleri için zaman uyumsuz istemcileri kullanmanızı öneririz.

Reaktif akışlar

Java Azure SDK Tasarım Yönergeleri'ninZaman Uyumsuz Hizmet İstemcileri bölümüne bakarsanız, Java 8 tarafından sağlananları kullanmak CompletableFuture yerine zaman uyumsuz API'lerimizin reaktif türler kullandığını fark edersiniz. JDK'da yerel olarak kullanılabilen türler yerine neden reaktif türleri seçtik?

Java 8' de Streams, Lambdas ve CompletableFuture gibi özellikler sunulmuştur. Bu özellikler birçok özellik sağlar, ancak bazı sınırlamaları vardır.

CompletableFuture geri çağırma tabanlı ve engellemesiz özellikler sağlar ve CompletionStage arabirimi, bir dizi zaman uyumsuz işlemin kolayca birleştirilmesine izin verir. Lambdalar, bu gönderme tabanlı API'leri daha okunabilir hale getirir. Akışlar, veri öğelerinin bir koleksiyonunu işlemek için işlevsel stil işlemleri sağlar. Ancak akışlar eşzamanlıdır ve yeniden kullanılamaz. CompletableFuture tek bir istekte bulunmanıza olanak tanır, geri arama için destek sağlar ve tek bir yanıt bekler. Ancak birçok bulut hizmeti, örneğin Event Hubs gibi veri akışı yapabilmeyi gerektirir.

Reaktif akışlar, öğeleri kaynaktan aboneye aktararak bu sınırlamaların üstesinden gelmeye yardımcı olabilir. Abone bir kaynaktan veri istediğinde, kaynak herhangi bir sayıda sonucu geri gönderir. Hepsini aynı anda göndermesi gerekmez. Kaynakta gönderilecek veriler olduğunda aktarım belirli bir süre boyunca gerçekleşir.

Bu modelde abone, olay işleyicilerini geldiğinde verileri işlemek üzere kaydeder. Bu gönderme tabanlı etkileşimler, aboneyi farklı sinyallerle bilgilendirir:

  • Çağrı onSubscribe() , veri aktarımının başlamak üzere olduğunu gösterir.
  • Çağrı onError() , veri aktarımının sonunu da işaretleyen bir hata olduğunu gösterir.
  • Çağrı onComplete() , veri aktarımının başarıyla tamamlanmasını gösterir.

Java Akışlarından farklı olarak, reaktif akışlar hataları birinci sınıf olaylar olarak ele alır. Reaktif akışlar, kaynağın hataları aboneye iletmesi için ayrılmış bir kanala sahiptir. Ayrıca, reaktif akışlar abonenin bu akışları bir anında çekme modeline dönüştürmek için veri aktarım hızı konusunda anlaşmasına olanak sağlar.

Reactive Streams belirtimi, veri aktarımının nasıl gerçekleşmesi gerektiğine ilişkin bir standart sağlar. Belirtim, üst düzeyde aşağıdaki dört arabirimi tanımlar ve bu arabirimlerin nasıl uygulanması gerektiğine ilişkin kuralları belirtir.

  • Publisher bir veri akışının kaynağıdır.
  • Abone , bir veri akışının tüketicisidir.
  • Abonelik , yayımcı ve abone arasındaki veri aktarımının durumunu yönetir.
  • İşlemci hem yayımcı hem de abonedir.

RxJava, Akka Streams, Vert.x ve Project Reactor gibi bu belirtimin uygulamalarını sağlayan bazı iyi bilinen Java kitaplıkları vardır.

Java için Azure SDK, asenkron API'lerini sunmak için Project Reactor'ı benimsedi. Bu kararı veren ana faktör, Project Reactor'ı da kullanan Spring Webflux ile sorunsuz tümleştirme sağlamaktı. Project Reactor'ı RxJava yerine seçmeye katkıda bulunan bir diğer faktör de Project Reactor'ın Java 8 kullanması ama O zamanlar RxJava'nın hala Java 7'de olmasıydı. Project Reactor ayrıca birleştirilebilir ve veri işleme işlem hatları oluşturmak için bildirim temelli kod yazmanıza olanak sağlayan zengin bir işleç kümesi sunar. Project Reactor'ın bir diğer güzel özelliği de Project Reactor türlerini diğer popüler uygulama türlerine dönüştürmek için bağdaştırıcılara sahip olmasıdır.

Zaman uyumlu ve zaman uyumsuz işlemlerin API'lerini karşılaştırma

Zaman uyumlu istemcileri ve zaman uyumsuz istemciler için seçenekleri ele aldık. Aşağıdaki tabloda, şu seçenekler kullanılarak tasarlanan API'lerin nasıl göründüğü özetlenmiştir:

API Türü Değer yok Tek değer Birden çok değer
Standart Java - Zaman Uyumlu API'ler void T Iterable<T>
Standart Java - Zaman Uyumsuz API'ler CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Reaktif Akış Arabirimleri Publisher<Void> Publisher<T> Publisher<T>
Reaktif Akışların Proje Reaktörü uygulaması Mono<Void> Mono<T> Flux<T>

Eksiksizlik açısından, Java 9'un dört reaktif akış arabirimi içeren Flow sınıfını tanıttığını belirtmek gerekir. Ancak, bu sınıf herhangi bir uygulama içermez.

Java için Azure SDK'da zaman uyumsuz API'ler kullanma

Reaktif akış tanımı, yayımcı türleri arasında ayrım yapmaz. Reaktif akışlar belirtiminde yayımcılar basitçe sıfır veya daha fazla veri öğesi üretir. Çoğu durumda, en fazla bir veri öğesi üreten yayımcı ile sıfır veya daha fazla veri üreten yayımcı arasında yararlı bir ayrım vardır. Bulut tabanlı API'lerde bu fark, isteğin tek değerli bir yanıt mı yoksa koleksiyon mu döndürdüğüne işaret eder. Project Reactor bu ayrımı yapmak için iki tür sağlar - Mono ve Flux. Bir API Mono döndürdüğünde, yanıtı en fazla bir değer içerir ve bir API Flux döndürdüğünde, yanıtı sıfır veya daha fazla değer içerir.

Örneğin, Azure Uygulama Yapılandırma hizmeti kullanılarak depolanan bir yapılandırmayı almak için ConfigurationAsyncClient kullandığınızı varsayalım. (Daha fazla bilgi için bkz. Azure Uygulama Yapılandırması nedir?.)

"ConfigurationAsyncClient oluşturup istemcide getConfigurationSetting() çağrısı yaparsanız, yanıtın tek bir değer içerdiğini gösteren bir Mono döndürülür." Ancak, bu yöntemi tek başına çağırmak hiçbir şey yapmaz. İstemci henüz Azure Uygulama Yapılandırma hizmetine bir istekte bulunmadı. Bu aşamada, bu API tarafından döndürülen Mono<ConfigurationSetting> yalnızca veri işleme hattının bir "bileşimi"dir. Bu, verileri tüketmek için gerekli kurulumun tamamlandığını gösterir. Veri aktarımını gerçekten tetikleyebilmek için (hizmete istekte bulunmak ve yanıtı almak için), döndürülen Monoöğesine abone olmanız gerekir. Bu nedenle, bu reaktif akışlarla ilgilenirken, subscribe() fonksiyonunu aramayı hatırlamalısınız çünkü bunu yapana kadar hiçbir şey olmaz.

Aşağıdaki örnek, Mono yapılandırma değerine nasıl abone olunacağını ve bu değerin konsola nasıl yazdırılacağını göstermektedir.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

İstemcide çağrıldıktan getConfigurationSetting() sonra örnek kodun sonuba abone olduğuna ve üç ayrı lambda sağladığına dikkat edin. İlk lambda, hizmetten alınan verileri tüketir ve başarılı bir yanıt sonrası tetiklenir. Yapılandırma alınırken bir hata olduğunda ikinci lambda tetikleniyor. Üçüncü lambda, veri akışı tamamlandığında çağrılır, yani bu akıştan başka veri öğesi beklenmez.

Uyarı

Tüm zaman uyumsuz programlamalarda olduğu gibi, abonelik oluşturulduktan sonra yürütme her zamanki gibi devam eder. Programın etkinliğini ve yürütülmesini sürdürecek bir şey yoksa, zaman uyumsuz işlem tamamlanmadan önce program sonlanabilir. Çağıran subscribe() ana iş parçacığı, Azure Uygulama Yapılandırması'na ağ çağrısı yapıp bir cevap almanıza kadar beklemeyecek. Üretim sistemlerinde başka bir şeyi işlemeye devam edebilirsiniz, ancak bu örnekte, Thread.sleep() çağırarak bir miktar bekleme süresi ekleyebilir veya zaman uyumsuz işlemin tamamlanmasına fırsat vermek için bir CountDownLatch kullanabilirsiniz.

Aşağıdaki örnekte gösterildiği gibi, Flux döndüren bir API de benzer bir yapı izler. Aradaki fark, yönteme sağlanan ilk geri çağırmanın subscribe() yanıttaki her veri öğesi için birden çok kez çağrılır. Hata veya tamamlama geri çağırmaları tam olarak bir kez çağrılır ve terminal sinyalleri olarak kabul edilir. Yayımcıdan bu sinyallerden biri alınırsa başka geri çağırma fonksiyonları çalıştırılmaz.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Geri basınç

Kaynak verileri abonenin işleyebileceğinden daha hızlı ürettiğinde ne olur? Abone verilerden etkilenebilir ve bu da yetersiz bellek hatalarına yol açabilir. Abonenin, yetişemediğinde yavaşlayabilmesi için yayımcıyla geri iletişim kurması için bir yönteme ihtiyacı vardır. Varsayılan olarak, yukarıdaki örnekte gösterildiği gibi bir subscribe() çağrısını Flux üzerinde yaptığınızda, abone, yayımcıya verileri mümkün olan en kısa sürede göndermesini belirten, sınırsız bir veri akışı isteğinde bulunur. Bu davranış her zaman arzu edilmez ve abonenin "backpressure" aracılığıyla yayımlama hızını denetlemesi gerekebilir. Geri baskı, abonenin veri öğelerinin akışını denetlemesini sağlar. Abone, işleyebileceği sınırlı sayıda veri öğesi isteyecektir. Abone bu öğeleri işlemeyi tamamladıktan sonra, abone daha fazla istekte bulunabilir. Geri basınç kullanarak, veri aktarımı için bir itme modelini itme-çekme modeline dönüştürebilirsiniz.

Aşağıdaki örnekte Event Hubs tüketicisi tarafından olayların alınma hızını nasıl denetleyebileceğiniz gösterilmektedir:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Abone yayımcıya ilk kez "bağlandığında", yayımcı aboneye veri aktarımının durumunu yöneten bir Subscription örnek iletir. Bu Subscription, abonenin işleyebileceği veri öğesi sayısını belirtmek için request() çağrısı yaparak geri basınç uygulayabileceği bir araçtır.

Abone her çağırdığında onNext()request(10) birden fazla veri öğesi isterse (örneğin, yayımcı mümkün olduğunda veya mevcut olduklarında sonraki 10 öğeyi hemen gönderecektir). Bu öğeler abonenin tarafındaki bir arabellekte birikir ve her onNext() çağrısı 10 öğe daha talep ettiği için birikim, yayımcının gönderecek başka veri öğesi kalmayınca ya da abonenin arabellek taşması sonucu boş alan yetersizliği hataları oluşana kadar büyümeye devam eder.

Aboneliği iptal etme

Abonelik, yayımcı ve abone arasındaki veri aktarımının durumunu yönetir. Yayımcı tüm verileri aboneye aktarmayı tamamlayana veya abone artık veri almakla ilgilenmeyene kadar abonelik etkindir. Aşağıda gösterildiği gibi aboneliği iptal etmenin birkaç yolu vardır.

Aşağıdaki örnek, aboneyi çıkararak aboneliği iptal eder.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

Aşağıdaki örnek, cancel() yöntemini Subscription üzerinde çağırarak aboneliği iptal eder:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Sonuç

İş parçacıkları, uzak hizmet çağrılarından gelen yanıtları beklerken boşa harcamamanız gereken pahalı kaynaklardır. Mikro hizmet mimarilerinin benimsenmesi arttıkça kaynakları verimli bir şekilde ölçeklendirme ve kullanma gereksinimi de önemli hale gelir. Ağ bağlantılı süreçler olduğunda zaman uyumsuz API'ler tercih edilir. Java için Azure SDK, sistem kaynaklarınızı en üst düzeye çıkarmak için zaman uyumsuz işlemler için zengin bir API kümesi sunar. Eşzamansız istemcilerimizi denemenizi şiddetle öneririz.

Belirli görevlerinize en uygun operatörler hakkında daha fazla bilgi için Reactor 3 Referans Kılavuzu'ndahangi operatöre ihtiyacım var? bölümüne bakın.

Sonraki Adımlar

Çeşitli zaman uyumsuz programlama kavramlarını daha iyi anladığınıza göre, sonuçlar üzerinde yineleme yapmayı öğrenmek önemlidir. En iyi yineleme stratejileri ve sayfalandırmanın nasıl çalıştığı hakkında daha fazla bilgi için bkz. Java için Azure SDK'da sayfalandırma ve yineleme.