Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Spring Cloud Stream adalah kerangka kerja untuk membangun layanan mikro berbasis peristiwa yang sangat dapat diskalakan yang terhubung dengan sistem olahpesan bersama.
Kerangka kerja ini menyediakan model pemrograman fleksibel yang dibangun di atas idiom Spring yang sudah mapan dan akrab serta praktik terbaik. Praktik terbaik ini termasuk dukungan untuk semantik pub/sub persisten, grup konsumen, dan partisi stateful.
Implementasi binder saat ini meliputi:
-
spring-cloud-azure-stream-binder-eventhubs- untuk informasi selengkapnya, lihat Spring Cloud Stream Binder untuk Azure Event Hubs -
spring-cloud-azure-stream-binder-servicebus- untuk informasi selengkapnya, lihat Spring Cloud Stream Binder untuk Azure Service Bus
Spring Cloud Stream Binder untuk Azure Event Hubs
Konsep utama
Spring Cloud Stream Binder untuk Azure Event Hubs menyediakan implementasi pengikatan untuk kerangka kerja Spring Cloud Stream. Implementasi ini menggunakan Adaptor Saluran Spring Integration Event Hubs pada dasarnya. Dari perspektif desain, Event Hubs mirip dengan Kafka. Selain itu, Azure Event Hubs dapat diakses melalui Kafka API. Jika proyek Anda memiliki dependensi yang ketat pada Kafka API, Anda dapat mencoba Events Hub dengan Sampel Api Kafka
Grup konsumen
Azure Event Hubs memberikan dukungan serupa dari grup konsumen sebagai Apache Kafka, tetapi dengan sedikit logika yang berbeda. Meskipun Kafka menyimpan semua offset yang diterapkan di broker, Anda harus menyimpan offset pesan Azure Event Hubs yang diproses secara manual. Azure Event Hubs SDK menyediakan fungsi untuk menyimpan offset tersebut di dalam Azure Storage.
Dukungan partisi
Event Hubs menyediakan konsep partisi fisik yang serupa dengan Kafka. Tetapi tidak seperti penyeimbangan ulang otomatis Kafka antara konsumen dan partisi, Azure Event Hubs menyediakan semacam mode preemptive. Akun penyimpanan bertindak sebagai sewa untuk menentukan konsumen mana yang memiliki partisi mana. Ketika konsumen baru dimulai, ia mencoba mencuri beberapa partisi dari konsumen yang paling banyak dimuat untuk mencapai keseimbangan beban kerja.
Untuk menentukan strategi penyeimbangan beban, properti spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* disediakan. Untuk informasi selengkapnya, lihat bagian properti konsumen
Dukungan konsumen batch
Pengikat Spring Cloud Azure Stream Event Hubs mendukung fitur Spring Cloud Stream Batch Consumer.
Untuk bekerja dengan mode batch-consumer, atur properti spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode ke true. Saat diaktifkan, pesan dengan payload daftar peristiwa batch diterima dan diteruskan ke fungsi Consumer. Setiap header pesan juga dikonversi ke daftar, yang kontennya adalah nilai header terkait yang diurai dari setiap peristiwa. Header komunal ID partisi, checkpointer, dan properti antrean terakhir disajikan sebagai nilai tunggal karena seluruh batch peristiwa berbagi nilai yang sama. Untuk informasi selengkapnya, lihat bagian header pesan Azure Event Hubs
Nota
Header titik pemeriksaan hanya ada ketika mode titik pemeriksaan MANUAL digunakan.
Titik pemeriksaan konsumen batch mendukung dua mode: BATCH dan MANUAL. mode BATCH adalah mode titik pemeriksaan otomatis untuk memeriksa seluruh batch peristiwa bersama-sama setelah pengikat menerimanya. mode MANUAL adalah memeriksa peristiwa oleh pengguna. Saat digunakan, Checkpointer diteruskan ke header pesan, dan pengguna dapat menggunakannya untuk melakukan titik pemeriksaan.
Anda dapat menentukan ukuran batch dengan mengatur properti max-size dan max-wait-time yang memiliki awalan spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. Properti max-size diperlukan dan properti max-wait-time bersifat opsional. Untuk informasi selengkapnya, lihat bagian properti konsumen
Penyiapan dependensi
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Atau, Anda juga dapat menggunakan Spring Cloud Azure Stream Event Hubs Starter, seperti yang ditunjukkan dalam contoh berikut untuk Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Konfigurasi
Pengikat menyediakan tiga bagian opsi konfigurasi berikut:
Properti konfigurasi koneksi
Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Event Hubs.
Nota
Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.
Properti koneksi yang dapat dikonfigurasi dari spring-cloud-azure-stream-binder-eventhubs:
| Harta benda | Jenis | Deskripsi |
|---|---|---|
| spring.cloud.azure.eventhubs.enabled | Boolean | Apakah Azure Event Hubs diaktifkan. |
| spring.cloud.azure.eventhubs.connection-string | Tali | Nilai string koneksi Namespace Layanan Event Hubs. |
| spring.cloud.azure.eventhubs.namespace | Tali | Nilai Namespace Layanan Azure Event Hubs, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName |
| musim semi.cloud.azure.eventhubs.nama-domain | Tali | Nama domain nilai Namespace Azure Event Hubs. |
| spring.cloud.azure.eventhubs.custom-endpoint-address | Tali | Alamat Titik Akhir Kustom. |
Ujung
Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk pengikat Spring Cloud Azure Stream Event Hubs. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan terpadu spring.cloud.azure. atau awalan spring.cloud.azure.eventhubs..
Pengikat juga mendukung Spring Could Azure Resource Manager secara default. Untuk mempelajari tentang cara mengambil string koneksi dengan prinsip keamanan yang tidak diberikan dengan peran terkait
Properti konfigurasi titik pemeriksaan
Bagian ini berisi opsi konfigurasi untuk layanan Storage Blobs, yang digunakan untuk mempertahankan kepemilikan partisi dan informasi titik pemeriksaan.
Nota
Dari versi 4.0.0, saat properti spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tidak diaktifkan secara manual, tidak ada kontainer Storage yang akan dibuat secara otomatis dengan nama dari spring.cloud.stream.bindings.binding-name.destination.
Titik pemeriksaan properti yang dapat dikonfigurasi dari spring-cloud-azure-stream-binder-eventhubs:
| Harta benda | Jenis | Deskripsi |
|---|---|---|
| spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolean | Apakah akan mengizinkan pembuatan kontainer jika tidak ada. |
| musim semi.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Tali | Nama untuk akun penyimpanan. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Tali | Kunci akses akun penyimpanan. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Tali | Nama kontainer penyimpanan. |
Ujung
Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk penyimpanan titik pemeriksaan Blob Penyimpanan. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan terpadu spring.cloud.azure. atau awalan spring.cloud.azure.eventhubs.processor.checkpoint-store.
Properti konfigurasi Pengikatan Azure Event Hubs
Opsi berikut dibagi menjadi empat bagian: Properti Konsumen, Konfigurasi Konsumen Tingkat Lanjut, Properti Produsen, dan Konfigurasi Produsen Tingkat Lanjut.
Properti konsumen
Properti ini diekspos melalui EventHubsConsumerProperties.
Nota
Untuk menghindari pengulangan, karena versi 4.17.0 dan 5.11.0, Spring Cloud Azure Stream Binder Event Hubs mendukung pengaturan nilai untuk semua saluran, dalam format spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.
Properti yang dapat dikonfigurasi konsumen dari spring-cloud-azure-stream-binder-eventhubs:
| Harta benda | Jenis | Deskripsi |
|---|---|---|
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | Mode Pos Pemeriksaan | Mode titik pemeriksaan yang digunakan saat konsumen memutuskan cara mengirim pesan titik pemeriksaan |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Integer | Memutuskan jumlah pesan untuk setiap partisi untuk melakukan satu titik pemeriksaan. Akan berlaku hanya ketika mode titik pemeriksaan PARTITION_COUNT digunakan. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Durasi | Memutuskan interval waktu untuk melakukan satu titik pemeriksaan. Akan berlaku hanya ketika mode titik pemeriksaan TIME digunakan. |
| spring.cloud.stream.eventhubs.bindings.<ukuran.batch.max binding-name.consumer | Integer | Jumlah maksimum peristiwa dalam batch. Diperlukan untuk mode konsumen batch. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Durasi | Durasi waktu maksimum untuk mengkonsumsi batch. Akan berlaku hanya ketika mode konsumen batch diaktifkan dan bersifat opsional. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Durasi | Durasi waktu interval untuk memperbarui. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | Strategi Penyeimbangan Beban | Strategi penyeimbangan beban. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Durasi | Durasi waktu setelah kepemilikan partisi kedaluwarsa. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Boolean | Apakah prosesor peristiwa harus meminta informasi tentang peristiwa antrean terakhir pada partisi terkait, dan melacak informasi tersebut saat peristiwa diterima. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Integer | Jumlah yang digunakan oleh konsumen untuk mengontrol jumlah peristiwa yang akan diterima dan antrean konsumen Event Hub secara aktif secara lokal. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Petakan dengan kunci sebagai ID partisi, dan nilai StartPositionProperties |
Peta yang berisi posisi peristiwa yang akan digunakan untuk setiap partisi jika titik pemeriksaan untuk partisi tidak ada di penyimpanan titik pemeriksaan. Peta ini di-key off dari ID partisi. |
Nota
Konfigurasi initial-partition-event-position menerima map untuk menentukan posisi awal untuk setiap hub peristiwa. Dengan demikian, kuncinya adalah ID partisi, dan nilainya adalah StartPositionProperties, yang mencakup properti offset, nomor urutan, waktu tanggal antrean dan apakah inklusif. Misalnya, Anda dapat mengaturnya sebagai
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Konfigurasi konsumen tingkat lanjut
koneksi
Properti produsen
Properti ini diekspos melalui EventHubsProducerProperties.
Nota
Untuk menghindari pengulangan, karena versi 4.17.0 dan 5.11.0, Spring Cloud Azure Stream Binder Event Hubs mendukung pengaturan nilai untuk semua saluran, dalam format spring.cloud.stream.eventhubs.default.producer.<property>=<value>.
Properti yang dapat dikonfigurasi produsen dari spring-cloud-azure-stream-binder-eventhubs:
| Harta benda | Jenis | Deskripsi |
|---|---|---|
| spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | Boolean | Bendera sakelar untuk sinkronisasi produsen. Jika true, produser akan menunggu respons setelah operasi pengiriman. |
| spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | panjang | Jumlah waktu untuk menunggu respons setelah operasi pengiriman. Akan berlaku hanya ketika produsen sinkronisasi diaktifkan. |
Konfigurasi produsen tingkat lanjut
koneksi
Penggunaan dasar
Mengirim dan menerima pesan dari/ke Azure Event Hubs
Isi opsi konfigurasi dengan informasi kredensial.
Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUALNota
Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.
Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Nota
Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Tentukan pemasok dan konsumen.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Dukungan partisi
PartitionSupplier dengan informasi partisi yang disediakan pengguna dibuat untuk mengonfigurasi informasi partisi tentang pesan yang akan dikirim. Diagram alur berikut menunjukkan proses mendapatkan prioritas yang berbeda untuk ID dan kunci partisi:
Dukungan konsumen batch
Berikan opsi konfigurasi batch, seperti yang ditunjukkan dalam contoh berikut:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as neededTentukan pemasok dan konsumen.
Untuk mode titik pemeriksaan sebagai
BATCH, Anda dapat menggunakan kode berikut untuk mengirim pesan dan menggunakan dalam batch.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }Untuk mode titik pemeriksaan sebagai
MANUAL, Anda dapat menggunakan kode berikut untuk mengirim pesan dan menggunakan/titik pemeriksaan dalam batch.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Nota
Dalam mode penggunaan batch, jenis konten default pengikat Spring Cloud Stream application/json, jadi pastikan payload pesan selaras dengan jenis konten. Misalnya, saat menggunakan jenis konten default application/json untuk menerima pesan dengan payload String, payload harus JSON String, dikelilingi dengan tanda kutip ganda untuk teks String asli. Sementara untuk jenis konten text/plain, ini bisa menjadi objek String secara langsung. Untuk informasi selengkapnya, lihat Negosiasi Jenis Konten Spring Cloud Stream.
Menangani pesan kesalahan
Menangani pesan kesalahan pengikatan keluar
Secara default, Integrasi Spring membuat saluran kesalahan global yang disebut
errorChannel. Konfigurasikan titik akhir pesan berikut untuk menangani pesan kesalahan pengikatan keluar.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }Menangani pesan kesalahan pengikatan masuk
Spring Cloud Stream Event Hubs Binder mendukung satu solusi untuk menangani kesalahan untuk pengikatan pesan masuk: penanganan kesalahan.
Penanganan Kesalahan
: Spring Cloud Stream memaparkan mekanisme bagi Anda untuk menyediakan handler kesalahan kustom dengan menambahkan
Consumeryang menerima instansErrorMessage. Untuk informasi selengkapnya, lihat Menangani Pesan Kesalahan dalam dokumentasi Spring Cloud Stream.Handler kesalahan pengikatan-default
Konfigurasikan satu biji
Consumeruntuk mengonsumsi semua pesan kesalahan pengikatan masuk. Fungsi default berikut berlangganan ke setiap saluran kesalahan pengikatan masuk:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }Anda juga perlu mengatur properti
spring.cloud.stream.default.error-handler-definitionke nama fungsi.Penangan kesalahan khusus pengikatan
Konfigurasikan biji
Consumeruntuk menggunakan pesan kesalahan pengikatan masuk tertentu. Fungsi berikut berlangganan saluran kesalahan pengikatan masuk tertentu dan memiliki prioritas yang lebih tinggi daripada handler kesalahan pengikatan-default:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }Anda juga perlu mengatur properti
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definitionke nama fungsi.
Header pesan Azure Event Hubs
Untuk header pesan dasar yang didukung, lihat bagian header pesan Azure Event Hubs dukungan Spring Cloud Azure untuk Spring Integration.
Dukungan beberapa pengikat
Koneksi ke beberapa namespace Layanan Azure Event Hubs juga didukung dengan menggunakan beberapa pengikat. Sampel ini mengambil string koneksi sebagai contoh. Kredensial perwakilan layanan dan identitas terkelola juga didukung. Anda dapat mengatur properti terkait di pengaturan lingkungan setiap binder.
Untuk menggunakan beberapa pengikat dengan Azure Event Hubs, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000Nota
File aplikasi sebelumnya menunjukkan cara mengonfigurasi satu poller default untuk aplikasi ke semua pengikatan. Jika Anda ingin mengonfigurasi poller untuk pengikatan tertentu, Anda dapat menggunakan konfigurasi seperti
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.Nota
Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.
Kita perlu mendefinisikan dua pemasok dan dua konsumen:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Provisi sumber daya
Binder Azure Event Hubs mendukung provisi pusat aktivitas dan grup konsumen, pengguna dapat menggunakan properti berikut untuk mengaktifkan provisi.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Nota
Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Sampel
Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.
Spring Cloud Stream Binder untuk Azure Service Bus
Konsep utama
Spring Cloud Stream Binder untuk Azure Service Bus menyediakan implementasi pengikatan untuk Spring Cloud Stream Framework. Implementasi ini menggunakan Adaptor Saluran Bus Layanan Integrasi Spring pada dasarnya.
Pesan terjadwal
Pengikat ini mendukung pengiriman pesan ke topik untuk pemrosesan tertunda. Pengguna dapat mengirim pesan terjadwal dengan header x-delay mengekspresikan dalam milidetik waktu penundaan untuk pesan tersebut. Pesan akan dikirimkan ke topik masing-masing setelah x-delay milidetik.
Grup konsumen
Topik Bus Layanan memberikan dukungan serupa dari grup konsumen sebagai Apache Kafka, tetapi dengan sedikit logika yang berbeda.
Pengikat ini bergantung pada Subscription topik untuk bertindak sebagai grup konsumen.
Penyiapan dependensi
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Atau, Anda juga dapat menggunakan Spring Cloud Azure Stream Service Bus Starter, seperti yang ditunjukkan dalam contoh berikut untuk Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Konfigurasi
Pengikat menyediakan dua bagian opsi konfigurasi berikut:
Properti konfigurasi koneksi
Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Service Bus.
Nota
Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.
Properti koneksi yang dapat dikonfigurasi dari spring-cloud-azure-stream-binder-servicebus:
| Harta benda | Jenis | Deskripsi |
|---|---|---|
| spring.cloud.azure.servicebus.enabled | Boolean | Apakah Azure Service Bus diaktifkan. |
| spring.cloud.azure.servicebus.connection-string | Tali | Nilai string koneksi Namespace Service Bus. |
| spring.cloud.azure.servicebus.custom-endpoint-address | Tali | Alamat titik akhir kustom yang akan digunakan saat menyambungkan ke Azure Service Bus. |
| spring.cloud.azure.servicebus.namespace | Tali | Nilai Namespace Service Bus, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName |
| musim semi.cloud.azure.servicebus.domain-name | Tali | Nama domain nilai Namespace Azure Service Bus. |
Nota
Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk binder Azure Stream Service Bus Spring Cloud. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan terpadu spring.cloud.azure. atau awalan spring.cloud.azure.servicebus..
Pengikat juga mendukung Spring Could Azure Resource Manager secara default. Untuk mempelajari tentang cara mengambil string koneksi dengan prinsip keamanan yang tidak diberikan dengan peran terkait
Properti konfigurasi pengikatan Azure Service Bus
Opsi berikut dibagi menjadi empat bagian: Properti Konsumen, Konfigurasi Konsumen Tingkat Lanjut, Properti Produsen, dan Konfigurasi Produsen Tingkat Lanjut.
Properti konsumen
Properti ini diekspos melalui ServiceBusConsumerProperties.
Nota
Untuk menghindari pengulangan, karena versi 4.17.0 dan 5.11.0, Spring Cloud Azure Stream Binder Service Bus mendukung pengaturan nilai untuk semua saluran, dalam format spring.cloud.stream.servicebus.default.consumer.<property>=<value>.
Properti yang dapat dikonfigurasi konsumen dari spring-cloud-azure-stream-binder-servicebus:
| Harta benda | Jenis | Bawaan | Deskripsi |
|---|---|---|---|
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | Boolean | palsu | Jika pesan yang gagal dirutekan ke DLQ. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Integer | 1 | Pesan bersamaan maksimum yang harus diproses oleh klien prosesor Azure Service Bus. Saat sesi diaktifkan, sesi berlaku untuk setiap sesi. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | Integer | nol | Jumlah maksimum sesi bersamaan untuk diproses pada waktu tertentu. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Boolean | nol | Apakah sesi diaktifkan. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-idle-timeout | Durasi | nol | Mengatur jumlah waktu maksimum (Durasi) untuk menunggu pesan diterima untuk sesi yang saat ini aktif. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Integer | 0 | Jumlah prefetch klien prosesor Azure Service Bus. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-antrean | Sub-Antrean | tidak | Jenis sub antrean yang akan disambungkan. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Durasi | 5m | Jumlah waktu untuk melanjutkan perpanjangan kunci secara otomatis. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Mode penerima klien prosesor Bus Layanan. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Boolean | benar | Apakah akan menyelesaikan pesan secara otomatis. Jika diatur sebagai false, header pesan Checkpointer akan ditambahkan untuk memungkinkan pengembang menyelesaikan pesan secara manual. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabyte | Panjang | 1024 | Ukuran maksimum antrean/topik dalam megabyte, yang merupakan ukuran memori yang dialokasikan untuk antrean/topik. |
| musim semi.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | Durasi | P10675199DT2H48M5.4775807D. (10675199 hari, 2 jam, 48 menit, 5 detik, dan 477 milidetik) | Durasi setelah pesan kedaluwarsa, dimulai dari saat pesan dikirim ke Azure Service Bus. |
Penting
Saat menggunakan Azure Resource Manager (ARM)
Konfigurasi konsumen tingkat lanjut
koneksi
Properti produsen
Properti ini diekspos melalui ServiceBusProducerProperties.
Nota
Untuk menghindari pengulangan, karena versi 4.17.0 dan 5.11.0, Spring Cloud Azure Stream Binder Service Bus mendukung pengaturan nilai untuk semua saluran, dalam format spring.cloud.stream.servicebus.default.producer.<property>=<value>.
Properti yang dapat dikonfigurasi produsen dari spring-cloud-azure-stream-binder-servicebus:
| Harta benda | Jenis | Bawaan | Deskripsi |
|---|---|---|---|
| spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | Boolean | palsu | Beralih bendera untuk sinkronisasi produsen. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | panjang | 10.000 | Nilai waktu habis untuk pengiriman produsen. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | nol | Jenis entitas Service Bus dari produsen, diperlukan untuk produsen pengikatan. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabyte | Panjang | 1024 | Ukuran maksimum antrean/topik dalam megabyte, yang merupakan ukuran memori yang dialokasikan untuk antrean/topik. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | Durasi | P10675199DT2H48M5.4775807D. (10675199 hari, 2 jam, 48 menit, 5 detik, dan 477 milidetik) | Durasi setelah pesan kedaluwarsa, dimulai dari saat pesan dikirim ke Azure Service Bus. |
Penting
Saat menggunakan produsen pengikatan, properti spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type diperlukan untuk dikonfigurasi.
Konfigurasi produsen tingkat lanjut
koneksi
Penggunaan dasar
Mengirim dan menerima pesan dari/ke Azure Service Bus
Isi opsi konfigurasi dengan informasi kredensial.
Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus TopicNota
Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.
Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Nota
Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Tentukan pemasok dan konsumen.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Dukungan kunci partisi
Pengikat mendukung partisi Azure Service Bus
Spring Cloud Stream menyediakan properti ekspresi SpEL kunci partisi spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Misalnya, mengatur properti ini sebagai "'partitionKey-' + headers[<message-header-key>]" dan menambahkan header yang disebut message-header-key. Spring Cloud Stream menggunakan nilai untuk header ini saat mengevaluasi ekspresi untuk menetapkan kunci partisi. Kode berikut menyediakan contoh produsen:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Dukungan sesi
Pengikat mendukung sesi pesan Service Bus. ID sesi pesan dapat diatur melalui header pesan.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Nota
Menurut partisi Service Bus, ID sesi memiliki prioritas yang lebih tinggi daripada kunci partisi. Jadi ketika header ServiceBusMessageHeaders#SESSION_ID dan ServiceBusMessageHeaders#PARTITION_KEY diatur, nilai ID sesi akhirnya digunakan untuk menimpa nilai kunci partisi.
Menangani pesan kesalahan
Menangani pesan kesalahan pengikatan keluar
Secara default, Integrasi Spring membuat saluran kesalahan global yang disebut
errorChannel. Konfigurasikan titik akhir pesan berikut untuk menangani pesan kesalahan pengikatan keluar.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }Menangani pesan kesalahan pengikatan masuk
Spring Cloud Stream Service Bus Binder mendukung dua solusi untuk menangani kesalahan untuk pengikatan pesan masuk: handler kesalahan pengikat dan handler.
handler kesalahan
Binder : Handler kesalahan binder default menangani pengikatan masuk. Anda menggunakan handler ini untuk mengirim pesan yang gagal ke antrean surat mati ketika
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejecteddiaktifkan. Jika tidak, pesan yang gagal akan ditinggalkan. Penangan kesalahan pengikat saling eksklusif dengan penangan kesalahan lain yang disediakan.handler kesalahan
: Spring Cloud Stream memaparkan mekanisme bagi Anda untuk menyediakan handler kesalahan kustom dengan menambahkan
Consumeryang menerima instansErrorMessage. Untuk informasi selengkapnya, lihat Menangani Pesan Kesalahan dalam dokumentasi Spring Cloud Stream.Handler kesalahan pengikatan-default
Konfigurasikan satu biji
Consumeruntuk mengonsumsi semua pesan kesalahan pengikatan masuk. Fungsi default berikut berlangganan ke setiap saluran kesalahan pengikatan masuk:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }Anda juga perlu mengatur properti
spring.cloud.stream.default.error-handler-definitionke nama fungsi.Penangan kesalahan khusus pengikatan
Konfigurasikan biji
Consumeruntuk menggunakan pesan kesalahan pengikatan masuk tertentu. Fungsi berikut berlangganan saluran kesalahan pengikatan masuk tertentu dengan prioritas yang lebih tinggi daripada handler kesalahan pengikatan-default.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }Anda juga perlu mengatur properti
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definitionke nama fungsi.
Header pesan Azure Service Bus
Untuk header pesan dasar yang didukung, lihat bagian header pesan Azure Service Bus dukungan Spring Cloud Azure untuk Spring Integration.
Nota
Saat mengatur kunci partisi, prioritas header pesan lebih tinggi dari properti Spring Cloud Stream. Jadi spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression hanya berlaku ketika tidak ada header ServiceBusMessageHeaders#SESSION_ID dan ServiceBusMessageHeaders#PARTITION_KEY yang dikonfigurasi.
Dukungan beberapa pengikat
Koneksi ke beberapa namespace Bus Layanan juga didukung dengan menggunakan beberapa pengikat. Sampel ini mengambil string koneksi sebagai contoh. Kredensial perwakilan layanan dan identitas terkelola juga didukung, pengguna dapat mengatur properti terkait di setiap pengaturan lingkungan binder.
Untuk menggunakan beberapa pengikat ServiceBus, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000Nota
File aplikasi sebelumnya menunjukkan cara mengonfigurasi satu poller default untuk aplikasi ke semua pengikatan. Jika Anda ingin mengonfigurasi poller untuk pengikatan tertentu, Anda dapat menggunakan konfigurasi seperti
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.Nota
Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.
kita perlu mendefinisikan dua pemasok dan dua konsumen
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Provisi sumber daya
Pengikat bus layanan mendukung provisi antrean, topik, dan langganan, pengguna dapat menggunakan properti berikut untuk mengaktifkan provisi.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Nota
Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Menyesuaikan properti klien Azure Service Bus
Pengembang dapat menggunakan AzureServiceClientBuilderCustomizer untuk menyesuaikan properti Klien Bus Layanan. Contoh berikut mengkustomisasi properti sessionIdleTimeout di ServiceBusClientBuilder:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Sampel
Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.