Bagikan melalui


Pemrograman asinkron di Azure SDK untuk Java

Artikel ini menjelaskan model pemrograman asinkron di Azure SDK untuk Java.

Azure SDK awalnya hanya berisi API non-pemblokiran dan asinkron untuk berinteraksi dengan layanan Azure. API ini memungkinkan Anda menggunakan Azure SDK untuk membangun aplikasi yang dapat diskalakan yang menggunakan sumber daya sistem secara efisien. Namun, Azure SDK untuk Java juga berisi klien sinkron untuk melayani audiens yang lebih luas, dan juga membuat pustaka klien kami dapat didekati bagi pengguna yang tidak terbiasa dengan pemrograman asinkron. (Lihat Dapat didekati dalam panduan desain Azure SDK.) Dengan demikian, semua pustaka klien Java di Azure SDK untuk Java menawarkan klien asinkron dan sinkron. Namun, sebaiknya gunakan klien asinkron untuk sistem produksi untuk memaksimalkan penggunaan sumber daya sistem.

Aliran reaktif

Jika Anda melihat bagian Klien Layanan Asinkron di Panduan Desain Java Azure SDK, Anda akan melihat bahwa, alih-alih menggunakan CompletableFuture yang disediakan oleh Java 8, API asinkron kami menggunakan jenis reaktif. Mengapa kita memilih jenis reaktif daripada jenis yang tersedia secara asli di JDK?

Java 8 memperkenalkan fitur seperti Streams, Lambdas, dan CompletableFuture. Fitur-fitur ini menyediakan banyak kemampuan, tetapi memiliki beberapa batasan.

CompletableFuture menyediakan kemampuan non-pemblokiran berbasis panggilan balik, dan antarmuka CompletionStage memungkinkan komposisi yang mudah dari serangkaian operasi asinkron. Lambda membuat API berbasis push ini lebih mudah dibaca. Streams menyediakan operasi yang bergaya pemrograman fungsional untuk menangani kumpulan elemen data. Namun, stream bersifat sinkron dan tidak dapat digunakan kembali. CompletableFuture memungkinkan Anda membuat satu permintaan, memberikan dukungan untuk panggilan balik, dan mengharapkan satu respons. Namun, banyak layanan cloud memerlukan kemampuan untuk mengalirkan data - Azure Event Hubs misalnya.

Aliran reaktif dapat membantu mengatasi batasan ini dengan mengalirkan elemen-elemen dari sumber ke penyimpan. Saat pelanggan meminta data dari sumber, sumber mengirim sejumlah hasil kembali. Tidak perlu mengirim semuanya sekaligus. Transfer terjadi selama jangka waktu tertentu, setiap kali sumber memiliki data untuk dikirim.

Dalam model ini, pelanggan mendaftarkan penanganan aktivitas untuk memproses data ketika tiba. Interaksi berbasis push ini memberi tahu pelanggan melalui sinyal yang berbeda:

  • Panggilan onSubscribe() menunjukkan bahwa transfer data akan dimulai.
  • Panggilan onError() menunjukkan ada kesalahan, yang juga menandai akhir transfer data.
  • Panggilan onComplete() menunjukkan keberhasilan penyelesaian transfer data.

Tidak seperti Java Streams, aliran reaktif memperlakukan kesalahan sebagai peristiwa kelas satu. Aliran reaktif memiliki saluran khusus bagi sumber untuk mengomunikasikan kesalahan apa pun kepada pelanggan. Selain itu, aliran reaktif memungkinkan pelanggan untuk menegosiasikan tingkat transfer data untuk mengubah aliran ini menjadi model push-pull.

Spesifikasi Aliran Reaktif menyediakan standar tentang bagaimana transfer data harus terjadi. Pada tingkat tinggi, spesifikasi menentukan empat antarmuka berikut dan menentukan aturan tentang bagaimana antarmuka ini harus diimplementasikan.

  • Publisher adalah sumber aliran data.
  • Pelanggan adalah konsumen aliran data.
  • Langganan mengelola status transfer data antara pengirim dan penerima.
  • Prosesor adalah penerbit dan pelanggan.

Ada beberapa pustaka Java terkenal yang menyediakan implementasi spesifikasi ini, seperti RxJava, Akka Streams, Vert.x, dan Project Reactor.

Azure SDK untuk Java mengadopsi Project Reactor untuk menyediakan API async-nya. Faktor utama yang mendorong keputusan ini adalah memberikan integrasi yang lancar dengan Spring Webflux, yang juga menggunakan Project Reactor. Faktor lain yang berkontribusi untuk memilih Project Reactor daripada RxJava adalah bahwa Project Reactor menggunakan Java 8 tetapi RxJava, pada saat itu, masih berada di Java 7. Project Reactor juga menawarkan serangkaian operator kaya yang dapat disusun dan memungkinkan Anda menulis kode deklaratif untuk membangun alur pemrosesan data. Hal lain yang bagus tentang Project Reactor adalah bahwa ia memiliki adaptor untuk mengonversi jenis Project Reactor ke jenis implementasi populer lainnya.

Membandingkan API operasi sinkron dan asinkron

Kami membahas klien sinkron dan opsi untuk klien asinkron. Tabel di bawah ini meringkas seperti apa API yang dirancang menggunakan opsi ini:

Jenis API Tidak ada nilai Nilai tunggal Beberapa nilai
Java Standar - API Sinkronisasi void T Iterable<T>
Java Standar - API yang Asinkron CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Antarmuka Aliran Reaktif Publisher<Void> Publisher<T> Publisher<T>
Implementasi Project Reactor dari Aliran Reaktif Mono<Void> Mono<T> Flux<T>

Demi kelengkapan, perlu disebutkan bahwa Java 9 memperkenalkan kelas Alur yang mencakup empat antarmuka aliran reaktif. Namun, kelas ini tidak menyertakan implementasi apa pun.

Menggunakan API asinkron di Azure SDK untuk Java

Spesifikasi aliran reaktif tidak membedakan antara jenis penerbit. Dalam spesifikasi aliran reaktif, penerbit hanya menghasilkan nol atau lebih elemen data. Dalam banyak kasus, ada perbedaan yang berguna antara penerbit yang memproduksi paling banyak satu elemen data versus elemen yang menghasilkan nol atau lebih. Dalam API berbasis cloud, perbedaan ini menunjukkan apakah permintaan mengembalikan respons bernilai tunggal atau koleksi. Project Reactor menyediakan dua jenis untuk membuat perbedaan ini - Mono dan Flux. API yang mengembalikan Mono akan berisi respons yang memiliki paling banyak satu nilai, dan API yang mengembalikan Flux akan berisi respons yang memiliki nol atau lebih nilai.

Misalnya, Anda menggunakan ConfigurationAsyncClient untuk mengambil konfigurasi yang disimpan menggunakan layanan Azure App Configuration. (Untuk informasi selengkapnya, lihat Apa itu Azure App Configuration?.)

Jika Anda membuat ConfigurationAsyncClient dan memanggil getConfigurationSetting() klien, ia mengembalikan Mono, yang menunjukkan bahwa respons berisi satu nilai. Namun, memanggil metode ini saja tidak melakukan apa-apa. Klien belum membuat permintaan ke layanan Azure App Configuration. Pada tahap ini, Mono<ConfigurationSetting> yang dikembalikan oleh API ini hanyalah sebuah "rangkaian" dari alur pemrosesan data. Artinya, penyiapan yang diperlukan untuk mengkonsumsi data selesai. Untuk benar-benar memicu transfer data (yaitu, untuk membuat permintaan ke layanan dan mendapatkan respons), Anda harus berlangganan ke hasil yang dikembalikan Mono. Jadi, ketika berhadapan dengan aliran reaktif ini, Anda harus ingat untuk memanggil subscribe() karena tidak ada yang terjadi sampai Anda melakukannya.

Contoh berikut menunjukkan cara berlangganan Mono dan mencetak nilai konfigurasi ke konsol.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Perhatikan bahwa setelah memanggil getConfigurationSetting() pada klien, kode contoh berlangganan pada hasilnya dan menyediakan tiga lambda terpisah. Lambda pertama memproses data yang diterima dari layanan, yang dipicu oleh respons sukses. Lambda kedua dipicu jika ada kesalahan saat mengambil konfigurasi. Lambda ketiga dipanggil ketika aliran data selesai, yang berarti tidak ada lagi elemen data yang diharapkan dari aliran ini.

Nota

Seperti semua pemrograman asinkron, setelah langganan dibuat, eksekusi berlangsung seperti biasa. Jika tidak ada yang perlu menjaga program tetap aktif dan dijalankan, program dapat berakhir sebelum operasi asinkron selesai. Utas utama yang menjalankan subscribe() tidak akan menunggu hingga Anda melakukan panggilan jaringan ke Azure App Configuration dan menerima respons. Dalam sistem produksi, Anda mungkin terus memproses sesuatu yang lain, tetapi dalam contoh ini Anda dapat menambahkan penundaan kecil dengan memanggil Thread.sleep() atau menggunakan CountDownLatch untuk memberi operasi asinkron kesempatan untuk diselesaikan.

Seperti yang ditunjukkan dalam contoh berikut, API yang mengembalikan Flux juga mengikuti pola serupa. Perbedaannya adalah bahwa panggilan balik pertama yang disediakan untuk subscribe() metode ini dipanggil beberapa kali untuk setiap elemen data dalam respons. Kesalahan atau panggilan balik penyelesaian dipanggil tepat sekali dan dianggap sebagai sinyal terminal. Tidak ada panggilan balik lain yang dilakukan jika penerbit mengirimkan salah satu sinyal ini.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Tekanan Balik (Backpressure)

Apa yang terjadi ketika sumber memproduksi data dengan laju yang lebih cepat daripada yang dapat ditangani pelanggan? Pelanggan bisa kewalahan dengan data, yang dapat menyebabkan kesalahan kehabisan memori. Pelanggan memerlukan cara untuk berkomunikasi kembali ke penerbit untuk memperlambat ketika tidak dapat mengikuti. Secara bawaan, saat Anda memanggil subscribe() pada Flux seperti yang ditunjukkan pada contoh di atas, pelanggan meminta aliran data yang tidak terbatas, menunjukkan kepada penerbit untuk mengirim data secepat mungkin. Perilaku ini tidak selalu diinginkan, dan pengguna mungkin perlu mengontrol laju penerbitan dengan menggunakan "backpressure". Backpressure memungkinkan pelanggan untuk mengontrol aliran elemen data. Pelanggan akan meminta sejumlah elemen data terbatas yang dapat mereka tangani. Setelah pelanggan selesai memproses elemen-elemen ini, pelanggan dapat meminta lebih banyak. Dengan menggunakan tekanan balik (backpressure), Anda dapat mengubah model dorongan untuk transfer data menjadi model dorong-tarik.

Contoh berikut menunjukkan bagaimana Anda dapat mengontrol tingkat di mana peristiwa diterima oleh konsumen Azure Event Hubs:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Ketika subscriber pertama kali "terhubung" ke penerbit, penerbit memberikan subscriber sebuah instans Subscription, yang mengelola status transfer data. Ini Subscription adalah media di mana pelanggan dapat menerapkan tekanan balik dengan memanggil request() untuk menentukan berapa banyak elemen data yang dapat ditanganinya.

Jika pelanggan meminta lebih dari satu elemen data setiap kali memanggil onNext(), request(10) misalnya, penerbit akan segera mengirim 10 elemen berikutnya jika tersedia atau saat tersedia. Elemen-elemen ini terakumulasi dalam buffer di sisi pelanggan, dan karena setiap onNext() panggilan akan meminta 10 lagi, penumpukan backlog terus bertambah sampai penerbit tidak memiliki lagi elemen data untuk dikirim, atau buffer pelanggan meluap, berujung pada kesalahan kehabisan memori.

Batalkan langganan

Langganan mengelola status transfer data antara penerbit dan pelanggan. Langganan aktif sampai penerbit selesai mentransfer semua data ke pelanggan atau pelanggan tidak lagi tertarik untuk menerima data. Ada beberapa cara untuk membatalkan langganan seperti yang ditunjukkan di bawah ini.

Contoh berikut membatalkan langganan dengan membuang pelanggan:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

Contoh berikut membatalkan langganan dengan memanggil cancel() metode pada Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Kesimpulan

Thread adalah sumber daya mahal yang tidak seharusnya Anda sia-siakan hanya untuk menunggu respons dari panggilan layanan jarak jauh. Ketika adopsi arsitektur layanan mikro meningkat, kebutuhan untuk menskalakan dan menggunakan sumber daya secara efisien menjadi penting. API asinkron menguntungkan ketika ada operasi yang terikat jaringan. Azure SDK for Java menawarkan serangkaian API yang kaya untuk operasi asinkron untuk membantu memaksimalkan sumber daya sistem Anda. Kami sangat mendorong Anda untuk mencoba klien asinkron kami.

Untuk informasi selengkapnya tentang operator yang paling sesuai dengan tugas tertentu Anda, lihat Operator mana yang saya butuhkan? di Panduan Referensi Reaktor 3.

Langkah selanjutnya

Sekarang setelah Anda lebih memahami berbagai konsep pemrograman asinkron, penting untuk mempelajari cara melakukan iterasi atas hasilnya. Untuk informasi selengkapnya tentang strategi iterasi terbaik, dan detail cara kerja penomoran halaman, lihat Penomoran halaman dan iterasi di Azure SDK untuk Java.