Pemrograman asinkron di SDK Azure untuk Java
Artikel ini menjelaskan model pemrograman asinkron di SDK Azure untuk Java.
SDK Azure awalnya hanya berisi API asinkron non-pemblokiran untuk berinteraksi dengan layanan Azure. API ini memungkinkan Anda menggunakan SDK Azure untuk membuat aplikasi yang dapat diskalakan yang menggunakan sumber daya sistem secara efisien. Namun, SDK Azure untuk Java juga berisi klien sinkron untuk melayani audiens yang lebih luas, dan juga membuat pustaka klien kami dapat didekati bagi pengguna yang tidak terbiasa dengan pemrograman asinkron. (Lihat Dapat didekati dalam panduan desain Azure SDK.) Dengan demikian, semua pustaka klien Java di Azure SDK untuk Java menawarkan klien asinkron dan sinkron. Namun, kami sarankan menggunakan klien asinkron untuk sistem produksi untuk memaksimalkan penggunaan sumber daya sistem.
Jika Anda melihat bagian Klien Layanan Async di Pedoman Desain Java Azure SDK, Anda akan melihat bahwa, alih-alih menggunakan CompletableFuture
yang disediakan oleh Java 8, API asinkron kami menggunakan tipe reaktif. Mengapa kami memilih jenis reaktif daripada jenis yang tersedia secara native di JDK?
Java 8 memperkenalkan fitur seperti Streams, Lambdas, dan CompletableFuture. Fitur ini menyediakan banyak kemampuan, tetapi memiliki beberapa keterbatasan.
CompletableFuture
menyediakan kemampuan berbasis callback, non-blocking, dan antarmuka CompletionStage
memungkinkan komposisi yang mudah dari serangkaian operasi asinkron. Lambda membuat API berbasis push ini lebih mudah dibaca. Aliran menyediakan operasi gaya fungsional untuk menangani pengumpulan elemen data. Namun, aliran tidak sinkron dan tidak dapat digunakan kembali. CompletableFuture
memungkinkan Anda membuat satu permintaan, memberikan dukungan untuk panggilan balik, dan mengharapkan respons tunggal. Namun, banyak layanan cloud memerlukan kemampuan untuk melakukan streaming data - Event Hubs untuk instans.
Aliran reaktif dapat membantu mengatasi keterbatasan ini dengan mengalirkan elemen dari sumber ke pelanggan. Saat pelanggan meminta data dari sumber, sumber mengirimkan sejumlah hasil kembali. Tidak perlu mengirimnya sekaligus. Transfer terjadi selama periode waktu tertentu, setiap kali sumber memiliki data untuk dikirim.
Dalam model ini, pelanggan mendaftarkan penanganan aktivitas untuk memproses data saat tiba. Interaksi berbasis push ini memberi tahu pelanggan melalui sinyal yang berbeda:
- Panggilan
onSubscribe()
menunjukkan bahwa transfer data akan segera dimulai. - Panggilan
onError()
menunjukkan ada kesalahan, yang juga menandai akhir transfer data. - Panggilan
onComplete()
menunjukkan keberhasilan penyelesaian transfer data.
Tidak seperti Java Streams, aliran reaktif memperlakukan kesalahan sebagai peristiwa kelas satu. Aliran reaktif memiliki saluran khusus bagi sumber untuk mengkomunikasikan kesalahan apa pun kepada pelanggan. Selain itu, aliran reaktif memungkinkan pelanggan menegosiasikan kecepatan transfer data untuk mengubah aliran ini menjadi model push-pull.
Spesifikasi Aliran Reaktif memberikan standar tentang bagaimana transfer data harus terjadi. Pada tingkat tinggi, spesifikasi mendefinisikan empat antarmuka berikut dan menentukan aturan tentang bagaimana antarmuka ini harus diimplementasikan.
- Penerbit adalah sumber aliran data.
- Pelanggan adalah konsumen dari aliran data.
- Langganan mengelola status transfer data antara penerbit dan pelanggan.
- Prosesor adalah penerbit dan pelanggan.
Ada beberapa perpustakaan Java terkenal yang menyediakan implementasi spesifikasi ini, seperti RxJava, Akka Aliran, Vert.x, dan Project Reactor.
SDK Azure untuk Java mengadopsi Project Reactor untuk menawarkan API asinkronnya. Faktor utama yang mendorong keputusan ini adalah untuk memberikan integrasi yang lancar dengan Spring Webflux, yang juga menggunakan Project Reactor. Faktor lain yang berkontribusi untuk memilih Reaktor Project daripada RxJava adalah bahwa Project Reactor menggunakan Java 8 tetapi RxJava, pada saat itu, masih berada di Java 7. Project Reactor juga menawarkan satu set operator yang kaya yang dapat dikomposisi dan memungkinkan Anda menulis kode deklaratif untuk membuat alur pemrosesan data. Hal lain yang menyenangkan tentang Project Reactor adalah memiliki adaptor untuk mengubah jenis Project Reactor ke jenis implementasi populer lainnya.
Kami membahas klien sinkron dan opsi untuk klien asinkron. Tabel di bawah ini merangkum seperti apa API yang dirancang menggunakan opsi ini:
Jenis API | Tidak ada nilai | Nilai tunggal | Beberapa nilai |
---|---|---|---|
Java Standar - API Sinkron | void |
T |
Iterable<T> |
Java Standar - API Asinkron | CompletableFuture<Void> |
CompletableFuture<T> |
CompletableFuture<List<T>> |
Antarmuka Aliran Reaktif | Publisher<Void> |
Publisher<T> |
Publisher<T> |
Implementasi Project Reactor Aliran Reaktif | Mono<Void> |
Mono<T> |
Flux<T> |
Demi kelengkapan, perlu disebutkan bahwa Java 9 memperkenalkan kelas Alur yang mencakup empat antarmuka aliran reaktif. Namun, kelas ini tidak termasuk implementasi apa pun.
Spesifikasi aliran reaktif tidak membedakan antara jenis penerbit. Dalam spesifikasi aliran reaktif, penerbit hanya menghasilkan nol atau lebih elemen data. Dalam banyak kasus, ada perbedaan yang berguna antara penerbit yang menghasilkan paling banyak satu elemen data versus yang menghasilkan nol atau lebih. Di API berbasis cloud, perbedaan ini menunjukkan apakah permintaan mengembalikan respons bernilai tunggal atau koleksi. Project Reactor menyediakan dua jenis untuk membuat perbedaan ini - Mono dan Flux. API yang menampilkan Mono
akan berisi respons yang memiliki paling banyak satu nilai, dan API yang menampilkan Flux
akan berisi respons yang memiliki nilai nol atau lebih.
Misalnya, Anda menggunakan ConfigurationAsyncClient untuk mengambil konfigurasi yang disimpan menggunakan layanan Azure App Configuration. (Untuk informasi selengkapnya, lihat Apa itu Azure App Configuration?.)
Jika Anda membuat ConfigurationAsyncClient
dan memanggil getConfigurationSetting()
di klien, Mono
ditampilkan, yang menunjukkan bahwa respons berisi satu nilai. Namun, menyebut metode ini saja tidak melakukan apa-apa. Klien belum mengajukan permintaan ke layanan Azure App Configuration. Pada tahap ini, Mono<ConfigurationSetting>
yang dikembalikan oleh API ini hanyalah "perakitan" alur pemrosesan data. Apa artinya ini adalah bahwa pengaturan yang diperlukan untuk menggunakan data selesai. Untuk benar-benar memicu transfer data (yaitu, untuk membuat permintaan ke layanan dan mendapatkan respons), Anda harus berlangganan Mono
yang dikembalikan. Jadi, saat berhadapan dengan aliran reaktif ini, Anda harus ingat untuk menelepon subscribe()
karena tidak ada yang terjadi sampai Anda melakukannya.
Contoh berikut menunjukkan cara berlangganan Mono
dan mencetak nilai konfigurasi ke konsol.
ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
.connectionString("<your connection string>")
.buildAsyncClient();
asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
config -> System.out.println("Config value: " + config.getValue()),
ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
() -> System.out.println("Successfully retrieved configuration setting"));
System.out.println("Done");
Perhatikan bahwa setelah memanggil getConfigurationSetting()
di klien, kode contoh berlangganan hasilnya dan menyediakan tiga lambda terpisah. Lambda pertama menggunakan data yang diterima dari layanan, yang dipicu setelah respons berhasil. Lambda kedua dipicu jika ada kesalahan saat mengambil konfigurasi. Lambda ketiga dipanggil ketika aliran data selesai, yang berarti tidak ada lagi elemen data yang diharapkan dari aliran ini.
Catatan
Seperti semua pemrograman asinkron, setelah langganan dibuat, eksekusi berlangsung seperti biasa. Jika tidak ada yang membuat program tetap aktif dan eksekusi, program tersebut dapat dihentikan sebelum operasi asinkron selesai. Utas utama yang memanggil subscribe()
tidak akan menunggu sampai Anda melakukan panggilan jaringan ke Azure App Configuration dan menerima respons. Dalam sistem produksi, Anda mungkin terus memproses sesuatu yang lain, tetapi dalam contoh ini Anda dapat menambahkan penundaan kecil dengan menelepon Thread.sleep()
atau menggunakan CountDownLatch
untuk memberikan operasi asinkron kesempatan untuk menyelesaikan.
Seperti yang ditunjukkan dalam contoh berikut, API yang menampilkan Flux
juga mengikuti pola yang sama. Perbedaannya adalah bahwa panggilan balik pertama yang diberikan ke metode subscribe()
ini dipanggil beberapa kali untuk setiap elemen data dalam respons. Kesalahan atau panggilan balik penyelesaian dipanggil tepat sekali dan dianggap sebagai sinyal terminal. Tidak ada panggilan balik lain yang dipanggil jika salah satu dari sinyal ini diterima dari penerbit.
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(
event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
Apa yang terjadi ketika sumber menghasilkan data pada tingkat yang lebih cepat daripada yang dapat ditangani pelanggan? Pelanggan dapat kewalahan dengan data, yang dapat menyebabkan kesalahan di luar memori. Pelanggan membutuhkan cara untuk berkomunikasi kembali ke penerbit untuk memperlambat saat tidak dapat mengikuti. Secara default, saat Anda memanggil subscribe()
pada Flux
seperti yang ditunjukkan pada contoh di atas, pelanggan meminta aliran data tanpa batas, menunjukkan kepada penerbit untuk mengirim data secepat mungkin. Perilaku ini tidak selalu diinginkan, dan pelanggan mungkin harus mengontrol tingkat penerbitan melalui "backpressure". Backpressure memungkinkan pelanggan mengendalikan aliran elemen data. Pelanggan akan meminta sejumlah elemen data yang dapat ditangani. Setelah pelanggan selesai memproses elemen ini, pelanggan dapat meminta lebih banyak. Dengan menggunakan backpressure, Anda dapat mengubah model push untuk transfer data menjadi model push-pull.
Contoh berikut menunjukkan cara Anda dapat mengontrol laju saat peristiwa diterima oleh konsumen Event Hubs:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.request(1); // request another event when the subscriber is ready
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Saat pelanggan pertama kali "terhubung" ke penerbit, penerbit menyerahkan satu instans Subscription
kepada pelanggan, yang mengelola status transfer data. Subscription
ini adalah media di mana pelanggan dapat menerapkan backpressure dengan menelepon request()
untuk menentukan berapa banyak lagi elemen data yang dapat ditanganinya.
Jika pelanggan meminta lebih dari satu elemen data setiap kali menelepon onNext()
, request(10)
misalnya, penerbit akan segera mengirim 10 elemen berikutnya jika tersedia atau kapan tersedia. Elemen ini terakumulasi dalam buffer di akhir pelanggan, dan karena setiap panggilan onNext()
akan meminta 10 lagi, backlog terus tumbuh sampai penerbit tidak memiliki elemen data lagi untuk dikirim, atau buffer pelanggan meluap, mengakibatkan kesalahan di luar memori.
Langganan mengelola status transfer data antara penerbit dan pelanggan. Langganan aktif sampai penerbit selesai mentransfer semua data ke pelanggan atau pelanggan tidak lagi tertarik untuk menerima data. Ada beberapa cara Anda dapat membatalkan langganan seperti yang ditunjukkan di bawah ini.
Contoh berikut membatalkan langganan dengan membuang pelanggan:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
Disposable disposable = asyncClient.receive().subscribe(
partitionEvent -> {
Long num = partitionEvent.getData().getSequenceNumber()
System.out.println("Sequence number of received event: " + num);
},
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();
Contoh berikut membatalkan langganan dengan memanggil metode cancel()
di Subscription
:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.cancel(); // Cancels the subscription. No further event is received.
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Percakapan adalah sumber daya mahal yang tidak boleh Anda sia-siakan untuk menunggu tanggapan dari panggilan layanan jarak jauh. Saat adopsi arsitektur layanan mikro meningkat, kebutuhan untuk menskalakan dan menggunakan sumber daya secara efisien menjadi penting. API asinkron menguntungkan saat ada operasi terikat jaringan. SDK Azure untuk Java menawarkan sekumpulan API yang kaya untuk operasi asinkron untuk membantu memaksimalkan sumber daya sistem Anda. Kami sangat mendorong Anda untuk mencoba klien asinkron kami.
Untuk informasi selengkapnya tentang operator yang paling sesuai dengan tugas khusus Anda, lihat Operator mana yang saya butuhkan? di Panduan Referensi Reaktor 3.
Sekarang setelah Anda lebih memahami berbagai konsep pemrograman asinkron, penting untuk mempelajari cara iterasi atas hasilnya. Untuk informasi selengkapnya tentang strategi iterasi terbaik, dan detail cara kerja paginasi, lihat Paginasi dan iterasi di SDK Azure untuk Java.