Dukungan Spring Cloud Azure untuk Spring Cloud Stream
Artikel ini berlaku untuk: ✔️ Versi 4.14.0 ✔️ Versi 5.8.0
Spring Cloud Stream adalah kerangka kerja untuk membangun layanan mikro berbasis peristiwa yang sangat dapat diskalakan yang terhubung dengan sistem olahpesan bersama.
Kerangka kerja ini menyediakan model pemrograman fleksibel yang dibangun di atas idiom Spring yang sudah mapan dan akrab serta praktik terbaik. Praktik terbaik ini termasuk dukungan untuk semantik pub/sub persisten, grup konsumen, dan partisi stateful.
Implementasi binder saat ini meliputi:
spring-cloud-azure-stream-binder-eventhubs
- untuk informasi selengkapnya, lihat Spring Cloud Stream Binder untuk Azure Event Hubsspring-cloud-azure-stream-binder-servicebus
- untuk informasi selengkapnya, lihat Spring Cloud Stream Binder untuk Azure Bus Layanan
Spring Cloud Stream Binder untuk Azure Event Hubs
Konsep kunci
Spring Cloud Stream Binder untuk Azure Event Hubs menyediakan implementasi pengikatan untuk kerangka kerja Spring Cloud Stream. Implementasi ini menggunakan Adaptor Saluran Spring Integration Event Hubs pada dasarnya. Dari perspektif desain, Event Hubs mirip dengan Kafka. Selain itu, Azure Event Hubs dapat diakses melalui Kafka API. Jika proyek Anda memiliki dependensi yang ketat pada Kafka API, Anda dapat mencoba Events Hub dengan Sampel API Kafka
Grup konsumen
Azure Event Hubs memberikan dukungan serupa dari grup konsumen sebagai Apache Kafka, tetapi dengan sedikit logika yang berbeda. Meskipun Kafka menyimpan semua offset yang diterapkan di broker, Anda harus menyimpan offset pesan Azure Event Hubs yang diproses secara manual. Azure Event Hubs SDK menyediakan fungsi untuk menyimpan offset tersebut di dalam Azure Storage.
Dukungan partisi
Event Hubs menyediakan konsep partisi fisik yang serupa dengan Kafka. Tetapi tidak seperti penyeimbangan ulang otomatis Kafka antara konsumen dan partisi, Azure Event Hubs menyediakan semacam mode preemptive. Akun penyimpanan bertindak sebagai sewa untuk menentukan konsumen mana yang memiliki partisi mana. Ketika konsumen baru dimulai, ia mencoba mencuri beberapa partisi dari konsumen yang paling banyak dimuat untuk mencapai keseimbangan beban kerja.
Untuk menentukan strategi penyeimbangan beban, properti spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
disediakan. Untuk informasi selengkapnya, lihat bagian Properti konsumen .
Dukungan konsumen batch
Pengikat Spring Cloud Azure Stream Event Hubs mendukung fitur Spring Cloud Stream Batch Consumer.
Untuk bekerja dengan mode batch-consumer, atur spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
properti ke true
. Saat diaktifkan, pesan dengan payload daftar peristiwa batch diterima dan diteruskan ke Consumer
fungsi . Setiap header pesan juga dikonversi ke daftar, yang kontennya adalah nilai header terkait yang diurai dari setiap peristiwa. Header komunal ID partisi, checkpointer, dan properti antrean terakhir disajikan sebagai nilai tunggal karena seluruh batch peristiwa berbagi nilai yang sama. Untuk informasi selengkapnya, lihat bagian header pesan Event Hubs dari dukungan Spring Cloud Azure untuk Integrasi Spring.
Catatan
Header titik pemeriksaan hanya ada ketika MANUAL
mode titik pemeriksaan digunakan.
Titik pemeriksaan konsumen batch mendukung dua mode: BATCH
dan MANUAL
. BATCH
mode adalah mode titik pemeriksaan otomatis untuk memeriksa seluruh batch peristiwa bersama-sama setelah pengikat menerimanya. MANUAL
mode adalah untuk memeriksa peristiwa oleh pengguna. Saat digunakan, Checkpointer
diteruskan ke header pesan, dan pengguna dapat menggunakannya untuk melakukan titik pemeriksaan.
Anda dapat menentukan ukuran batch dengan mengatur max-size
properti dan max-wait-time
yang memiliki awalan spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
. Properti max-size
diperlukan dan max-wait-time
properti bersifat opsional. Untuk informasi selengkapnya, lihat bagian Properti konsumen .
Penyiapan dependensi
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Atau, Anda juga dapat menggunakan Spring Cloud Azure Stream Event Hubs Starter, seperti yang ditunjukkan dalam contoh berikut untuk Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Konfigurasi
Pengikat menyediakan tiga bagian opsi konfigurasi berikut:
properti konfigurasi Koneksi ion
Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Event Hubs.
Catatan
Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.
Koneksi ion properti spring-cloud-azure-stream-binder-eventhubs yang dapat dikonfigurasi:
Properti | Tipe | Deskripsi |
---|---|---|
spring.cloud.azure.eventhubs.enabled | Boolean | Apakah Azure Event Hubs diaktifkan. |
spring.cloud.azure.eventhubs.connection-string | String | Nilai string koneksi Ruang Nama Azure Event Hubs. |
spring.cloud.azure.eventhubs.namespace | String | Nilai Namespace Layanan Azure Event Hubs, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | String | Nama domain nilai Namespace Azure Event Hubs. |
spring.cloud.azure.eventhubs.custom-endpoint-address | String | Alamat Titik Akhir Kustom. |
Tip
Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk pengikat Spring Cloud Azure Stream Event Hubs. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan spring.cloud.azure.
terpadu atau awalan spring.cloud.azure.eventhubs.
.
Pengikat juga mendukung Spring Could Azure Resource Manager secara default. Untuk mempelajari tentang cara mengambil string koneksi dengan prinsip keamanan yang tidak diberikan dengan Data
peran terkait, lihat bagian Penggunaan dasar Spring Could Azure Resource Manager.
Properti konfigurasi titik pemeriksaan
Bagian ini berisi opsi konfigurasi untuk layanan Storage Blobs, yang digunakan untuk mempertahankan kepemilikan partisi dan informasi titik pemeriksaan.
Catatan
Dari versi 4.0.0, ketika properti spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tidak diaktifkan secara manual, tidak ada kontainer Storage yang akan dibuat secara otomatis dengan nama dari spring.cloud.stream.bindings.binding-name.destination.
Titik pemeriksaan properti yang dapat dikonfigurasi dari spring-cloud-azure-stream-binder-eventhubs:
Properti | Tipe | Deskripsi |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolean | Apakah akan mengizinkan pembuatan kontainer jika tidak ada. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | String | Nama untuk akun penyimpanan. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | String | Kunci akses akun penyimpanan. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | String | Nama kontainer penyimpanan. |
Tip
Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk penyimpanan titik pemeriksaan Blob Penyimpanan. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan spring.cloud.azure.
terpadu atau awalan spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Properti konfigurasi Pengikatan Azure Event Hubs
Opsi berikut dibagi menjadi empat bagian: Properti Konsumen, Konfigurasi Konsumen Tingkat Lanjut, Properti Produsen, dan Konfigurasi Produsen Tingkat Lanjut.
Properti konsumen
Properti ini diekspos melalui EventHubsConsumerProperties
.
Properti yang dapat dikonfigurasi konsumen dari spring-cloud-azure-stream-binder-eventhubs:
Properti | Tipe | Deskripsi |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | Mode titik pemeriksaan yang digunakan saat konsumen memutuskan cara mengirim pesan titik pemeriksaan |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Bilangan bulat | Memutuskan jumlah pesan untuk setiap partisi untuk melakukan satu titik pemeriksaan. Akan berlaku hanya ketika PARTITION_COUNT mode titik pemeriksaan digunakan. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Durasi | Memutuskan interval waktu untuk melakukan satu titik pemeriksaan. Akan berlaku hanya ketika TIME mode titik pemeriksaan digunakan. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size | Bilangan bulat | Jumlah maksimum peristiwa dalam batch. Diperlukan untuk mode konsumen batch. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Durasi | Durasi waktu maksimum untuk mengkonsumsi batch. Akan berlaku hanya ketika mode konsumen batch diaktifkan dan bersifat opsional. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Durasi | Durasi waktu interval untuk memperbarui. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | Strategi penyeimbangan beban. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Durasi | Durasi waktu setelah kepemilikan partisi kedaluwarsa. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Boolean | Apakah prosesor peristiwa harus meminta informasi tentang peristiwa antrean terakhir pada partisi terkait, dan melacak informasi tersebut saat peristiwa diterima. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Bilangan bulat | Jumlah yang digunakan oleh konsumen untuk mengontrol jumlah peristiwa yang akan diterima dan antrean konsumen Event Hub secara aktif secara lokal. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Memetakan dengan kunci sebagai ID partisi, dan nilai StartPositionProperties |
Peta yang berisi posisi peristiwa yang akan digunakan untuk setiap partisi jika titik pemeriksaan untuk partisi tidak ada di penyimpanan titik pemeriksaan. Peta ini di-key off dari ID partisi. |
Catatan
Konfigurasi initial-partition-event-position
menerima map
untuk menentukan posisi awal untuk setiap hub peristiwa. Dengan demikian, kuncinya adalah ID partisi, dan nilainya termasuk StartPositionProperties
properti offset, nomor urut, waktu tanggal antrean dan apakah inklusif. Misalnya, Anda dapat mengaturnya sebagai
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Konfigurasi konsumen tingkat lanjut
Koneksi, titik pemeriksaan, dan konfigurasi klien Azure SDK umum di atas mendukung penyesuaian untuk setiap konsumen binder, yang dapat Anda konfigurasi dengan awalan spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
.
Properti produsen
Properti ini diekspos melalui EventHubsProducerProperties
.
Properti yang dapat dikonfigurasi produsen dari spring-cloud-azure-stream-binder-eventhubs:
Properti | Tipe | Deskripsi |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | Boolean | Bendera sakelar untuk sinkronisasi produsen. Jika true, produser akan menunggu respons setelah operasi pengiriman. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | long | Jumlah waktu untuk menunggu respons setelah operasi pengiriman. Akan berlaku hanya ketika produsen sinkronisasi diaktifkan. |
Konfigurasi produsen tingkat lanjut
Koneksi di atas dan konfigurasi klien Azure SDK umum mendukung kustomisasi untuk setiap produsen binder, yang dapat Anda konfigurasi dengan awalan spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
.
Penggunaan dasar
Mengirim dan menerima pesan dari/ke Azure Event Hubs
Isi opsi konfigurasi dengan informasi kredensial.
Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Catatan
Nilai yang diizinkan untuk tenant-id
adalah: common
, organizations
, consumers
, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Tentukan pemasok dan konsumen.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Dukungan partisi
Dengan PartitionSupplier
informasi partisi yang disediakan pengguna dibuat untuk mengonfigurasi informasi partisi tentang pesan yang akan dikirim. Diagram alur berikut menunjukkan proses mendapatkan prioritas yang berbeda untuk ID dan kunci partisi:
Dukungan konsumen batch
Berikan opsi konfigurasi batch, seperti yang ditunjukkan dalam contoh berikut:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Tentukan pemasok dan konsumen.
Untuk mode titik pemeriksaan sebagai
BATCH
, Anda dapat menggunakan kode berikut untuk mengirim pesan dan menggunakan dalam batch.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Untuk mode titik pemeriksaan sebagai
MANUAL
, Anda dapat menggunakan kode berikut untuk mengirim pesan dan menggunakan/titik pemeriksaan dalam batch.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Catatan
Dalam mode penggunaan batch, jenis konten default pengikat Spring Cloud Stream adalah application/json
, jadi pastikan payload pesan selaras dengan jenis konten. Misalnya, saat menggunakan jenis application/json
konten default untuk menerima pesan dengan String
payload, payload harus , dikelilingi JSON String
dengan tanda kutip ganda untuk teks asli String
. Sementara untuk text/plain
jenis konten, itu bisa menjadi objek secara String
langsung. Untuk informasi selengkapnya, lihat Negosiasi Jenis Konten Spring Cloud Stream.
Menangani pesan kesalahan
Menangani pesan kesalahan pengikatan keluar
Secara default, Integrasi Spring membuat saluran kesalahan global yang disebut
errorChannel
. Konfigurasikan titik akhir pesan berikut untuk menangani pesan kesalahan pengikatan keluar:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Menangani pesan kesalahan pengikatan masuk
Spring Cloud Stream Event Hubs Binder mendukung dua solusi untuk menangani kesalahan untuk pengikatan pesan masuk: saluran kesalahan kustom dan handler.
Saluran kesalahan:
Spring Cloud Stream menyediakan saluran kesalahan untuk setiap pengikatan masuk. Dikirim
ErrorMessage
ke saluran kesalahan. Untuk informasi selengkapnya, lihat Menangani Kesalahan dalam dokumentasi Spring Cloud Stream.Saluran kesalahan default
Anda dapat menggunakan saluran kesalahan global bernama
errorChannel
untuk menggunakan semua pesan kesalahan pengikatan masuk. Untuk menangani pesan ini, konfigurasikan titik akhir pesan berikut:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Saluran kesalahan khusus pengikatan
Anda dapat menggunakan saluran kesalahan tertentu untuk menggunakan pesan kesalahan pengikatan masuk tertentu dengan prioritas yang lebih tinggi daripada saluran kesalahan default. Untuk menangani pesan ini, konfigurasikan titik akhir pesan berikut:
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Catatan
Saluran kesalahan khusus pengikatan saling eksklusif dengan penanganan kesalahan dan saluran lain yang disediakan.
Penanganan Kesalahan:
Spring Cloud Stream memaparkan mekanisme bagi Anda untuk menyediakan penangan kesalahan kustom dengan menambahkan
Consumer
yang menerima instansErrorMessage
. Untuk informasi selengkapnya, lihat Penanganan Kesalahan dalam dokumentasi Spring Cloud Stream.Catatan
Ketika penangan kesalahan pengikatan dikonfigurasi, ini dapat berfungsi dengan saluran kesalahan default.
Handler kesalahan pengikatan-default
Konfigurasikan satu
Consumer
biji untuk mengonsumsi semua pesan kesalahan pengikatan masuk. Fungsi default berikut berlangganan ke setiap saluran kesalahan pengikatan masuk:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Anda juga perlu mengatur properti ke
spring.cloud.stream.default.error-handler-definition
nama fungsi.Penangan kesalahan khusus pengikatan
Konfigurasikan
Consumer
biji untuk mengonsumsi pesan kesalahan pengikatan masuk tertentu. Fungsi berikut berlangganan saluran kesalahan pengikatan masuk tertentu dan memiliki prioritas yang lebih tinggi daripada handler kesalahan pengikatan-default:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
Anda juga perlu mengatur properti ke
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
nama fungsi.
Header pesan Azure Event Hubs
Untuk header pesan dasar yang didukung, lihat bagian header pesan Event Hubs dari dukungan Spring Cloud Azure untuk Integrasi Spring.
Dukungan beberapa pengikat
Koneksi ke beberapa namespace Azure Event Hubs juga didukung dengan menggunakan beberapa pengikat. Sampel ini mengambil string koneksi sebagai contoh. Kredensial perwakilan layanan dan identitas terkelola juga didukung. Anda dapat mengatur properti terkait di pengaturan lingkungan setiap binder.
Untuk menggunakan beberapa pengikat dengan Azure Event Hubs, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Catatan
File aplikasi sebelumnya menunjukkan cara mengonfigurasi satu poller default untuk aplikasi ke semua pengikatan. Jika Anda ingin mengonfigurasi poller untuk pengikatan tertentu, Anda dapat menggunakan konfigurasi seperti
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.Kita perlu mendefinisikan dua pemasok dan dua konsumen:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Provisi sumber daya
Binder Azure Event Hubs mendukung provisi pusat aktivitas dan grup konsumen, pengguna dapat menggunakan properti berikut untuk mengaktifkan provisi.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Catatan
Nilai yang diizinkan untuk tenant-id
adalah: common
, organizations
, consumers
, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Sampel
Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.
Pengikat Stream Cloud Spring untuk Azure Service Bus
Konsep kunci
Spring Cloud Stream Binder untuk Azure Bus Layanan menyediakan implementasi pengikatan untuk Spring Cloud Stream Framework. Implementasi ini menggunakan Spring Integration Bus Layanan Channel Adapters pada dasarnya.
Pesan terjadwal
Pengikat ini mendukung pengiriman pesan ke topik untuk pemrosesan tertunda. Pengguna dapat mengirim pesan terjadwal dengan header x-delay
yang mengekspresikan dalam milidetik waktu penundaan untuk pesan tersebut. Pesan akan dikirimkan ke topik masing-masing setelah x-delay
milidetik.
Grup konsumen
Bus Layanan Topic memberikan dukungan serupa dari kelompok konsumen sebagai Apache Kafka, tetapi dengan logika yang sedikit berbeda.
Pengikat ini bergantung pada Subscription
topik untuk bertindak sebagai grup konsumen.
Penyiapan dependensi
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Atau, Anda juga dapat menggunakan Spring Cloud Azure Stream Bus Layanan Starter, seperti yang ditunjukkan dalam contoh berikut untuk Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Konfigurasi
Pengikat menyediakan dua bagian opsi konfigurasi berikut:
properti konfigurasi Koneksi ion
Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Bus Layanan.
Catatan
Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.
Koneksi ion properti spring-cloud-azure-stream-binder-servicebus yang dapat dikonfigurasi:
Properti | Tipe | Deskripsi |
---|---|---|
spring.cloud.azure.servicebus.enabled | Boolean | Apakah Azure Bus Layanan diaktifkan. |
spring.cloud.azure.servicebus.connection-string | String | Bus Layanan nilai string koneksi Namespace. |
spring.cloud.azure.servicebus.namespace | String | Bus Layanan nilai Namespace, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | String | Nama domain nilai Azure Bus Layanan Namespace. |
Catatan
Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk pengikat Bus Layanan Spring Cloud Azure Stream. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan spring.cloud.azure.
terpadu atau awalan spring.cloud.azure.servicebus.
.
Pengikat juga mendukung Spring Could Azure Resource Manager secara default. Untuk mempelajari tentang cara mengambil string koneksi dengan prinsip keamanan yang tidak diberikan dengan Data
peran terkait, lihat bagian Penggunaan dasar Spring Could Azure Resource Manager.
Properti konfigurasi pengikatan Azure Bus Layanan
Opsi berikut dibagi menjadi empat bagian: Properti Konsumen, Konfigurasi Konsumen Tingkat Lanjut, Properti Produsen, dan Konfigurasi Produsen Tingkat Lanjut.
Properti konsumen
Properti ini diekspos melalui ServiceBusConsumerProperties
.
Properti yang dapat dikonfigurasi konsumen dari spring-cloud-azure-stream-binder-servicebus:
Properti | Jenis | Default | Deskripsi |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | Boolean | salah | Jika pesan yang gagal dirutekan ke DLQ. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Bilangan bulat | 1 | Pesan bersamaan maksimum yang harus diproses oleh klien prosesor Bus Layanan. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | Bilangan bulat | nihil | Jumlah maksimum sesi bersamaan untuk diproses pada waktu tertentu. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Boolean | nihil | Apakah sesi diaktifkan. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Bilangan bulat | 0 | Jumlah prefetch klien prosesor Bus Layanan. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | Sub-Antrean | tidak ada | Jenis sub antrean yang akan disambungkan. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Durasi | 5m | Jumlah waktu untuk melanjutkan perpanjangan kunci secara otomatis. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Mode penerima klien prosesor Bus Layanan. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Boolean | benar | Apakah akan menyelesaikan pesan secara otomatis. Jika diatur sebagai false, header Checkpointer pesan akan ditambahkan untuk memungkinkan pengembang menyelesaikan pesan secara manual. |
Konfigurasi konsumen tingkat lanjut
Koneksi di atas dan konfigurasi klien Azure SDK umum mendukung kustomisasi untuk setiap konsumen binder, yang dapat Anda konfigurasi dengan awalan spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
.
Properti produsen
Properti ini diekspos melalui ServiceBusProducerProperties
.
Properti yang dapat dikonfigurasi produsen dari spring-cloud-azure-stream-binder-servicebus:
Properti | Jenis | Default | Deskripsi |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | Boolean | salah | Beralih bendera untuk sinkronisasi produsen. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | long | 10000 | Nilai waktu habis untuk pengiriman produsen. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | nihil | Bus Layanan jenis entitas produsen, diperlukan untuk produsen pengikatan. |
Penting
Saat menggunakan produsen pengikatan, properti spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
harus dikonfigurasi.
Konfigurasi produsen tingkat lanjut
Koneksi di atas dan konfigurasi klien Azure SDK umum mendukung kustomisasi untuk setiap produsen binder, yang dapat Anda konfigurasi dengan awalan spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
.
Penggunaan dasar
Mengirim dan menerima pesan dari/ke Bus Layanan
Isi opsi konfigurasi dengan informasi kredensial.
Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Catatan
Nilai yang diizinkan untuk tenant-id
adalah: common
, organizations
, consumers
, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Tentukan pemasok dan konsumen.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Dukungan kunci partisi
Pengikat mendukung partisi Bus Layanan dengan mengizinkan pengaturan kunci partisi dan ID sesi di header pesan. Bagian ini memperkenalkan cara mengatur kunci partisi untuk pesan.
Spring Cloud Stream menyediakan properti spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
ekspresi SpEL kunci partisi . Misalnya, mengatur properti ini sebagai "'partitionKey-' + headers[<message-header-key>]"
dan menambahkan header yang disebut message-header-key. Spring Cloud Stream menggunakan nilai untuk header ini saat mengevaluasi ekspresi untuk menetapkan kunci partisi. Kode berikut menyediakan contoh produsen:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Dukungan sesi
Pengikat mendukung sesi pesan Bus Layanan. ID sesi pesan dapat diatur melalui header pesan.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Catatan
Menurut pemartisian Bus Layanan, ID sesi memiliki prioritas yang lebih tinggi daripada kunci partisi. Jadi ketika kedua ServiceBusMessageHeaders#SESSION_ID
header dan ServiceBusMessageHeaders#PARTITION_KEY
diatur, nilai ID sesi akhirnya digunakan untuk menimpa nilai kunci partisi.
Menangani pesan kesalahan
Menangani pesan kesalahan pengikatan keluar
Secara default, Integrasi Spring membuat saluran kesalahan global yang disebut
errorChannel
. Konfigurasikan titik akhir pesan berikut untuk menangani pesan kesalahan pengikatan keluar.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Menangani pesan kesalahan pengikatan masuk
Spring Cloud Stream Bus Layanan Binder mendukung tiga solusi untuk menangani kesalahan untuk pengikatan pesan masuk: penangan kesalahan pengikat, saluran kesalahan kustom, dan handler.
Penanganan kesalahan pengikat:
Handler kesalahan binder default menangani pengikatan masuk. Anda menggunakan handler ini untuk mengirim pesan gagal ke antrean dead-letter ketika
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
diaktifkan. Jika tidak, pesan yang gagal akan ditinggalkan. Kecuali untuk mengonfigurasi saluran kesalahan khusus pengikatan, handler kesalahan pengikat selalu berlaku terlepas dari apakah ada penangan atau saluran kesalahan kustom lainnya.Saluran kesalahan:
Spring Cloud Stream menyediakan saluran kesalahan untuk setiap pengikatan masuk. Dikirim
ErrorMessage
ke saluran kesalahan. Untuk informasi selengkapnya, lihat Menangani Kesalahan dalam dokumentasi Spring Cloud Stream.Saluran kesalahan default
Anda dapat menggunakan saluran kesalahan global bernama
errorChannel
untuk menggunakan semua pesan kesalahan pengikatan masuk. Untuk menangani pesan ini, konfigurasikan titik akhir pesan berikut:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Saluran kesalahan khusus pengikatan
Anda dapat menggunakan saluran kesalahan tertentu untuk menggunakan pesan kesalahan pengikatan masuk tertentu dengan prioritas yang lebih tinggi daripada saluran kesalahan default. Untuk menangani pesan ini, konfigurasikan titik akhir pesan berikut:
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Catatan
Saluran kesalahan khusus pengikatan saling eksklusif dengan penanganan kesalahan dan saluran lain yang disediakan.
Penanganan kesalahan:
Spring Cloud Stream memaparkan mekanisme bagi Anda untuk menyediakan penangan kesalahan kustom dengan menambahkan
Consumer
yang menerima instansErrorMessage
. Untuk informasi selengkapnya, lihat Penanganan Kesalahan dalam dokumentasi Spring Cloud Stream.Catatan
Ketika penangan kesalahan pengikatan dikonfigurasi, ini dapat berfungsi dengan saluran kesalahan default dan penangan kesalahan pengikat.
Handler kesalahan pengikatan-default
Konfigurasikan satu
Consumer
biji untuk mengonsumsi semua pesan kesalahan pengikatan masuk. Fungsi default berikut berlangganan ke setiap saluran kesalahan pengikatan masuk:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Anda juga perlu mengatur properti ke
spring.cloud.stream.default.error-handler-definition
nama fungsi.Penangan kesalahan khusus pengikatan
Konfigurasikan
Consumer
biji untuk mengonsumsi pesan kesalahan pengikatan masuk tertentu. Fungsi berikut berlangganan saluran kesalahan pengikatan masuk tertentu dengan prioritas yang lebih tinggi daripada handler kesalahan pengikatan-default.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Anda juga perlu mengatur properti ke
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
nama fungsi.
Bus Layanan header pesan
Untuk header pesan dasar yang didukung, lihat bagian Bus Layanan header pesan dari dukungan Spring Cloud Azure untuk Integrasi Spring.
Catatan
Saat mengatur kunci partisi, prioritas header pesan lebih tinggi dari properti Spring Cloud Stream. Jadi spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
berlaku hanya ketika tidak ada header dan ServiceBusMessageHeaders#PARTITION_KEY
yang dikonfigurasiServiceBusMessageHeaders#SESSION_ID
.
Dukungan beberapa pengikat
Koneksi ke beberapa namespace Bus Layanan juga didukung dengan menggunakan beberapa pengikat. Sampel ini mengambil string koneksi sebagai contoh. Kredensial perwakilan layanan dan identitas terkelola juga didukung, pengguna dapat mengatur properti terkait di setiap pengaturan lingkungan binder.
Untuk menggunakan beberapa pengikat ServiceBus, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Catatan
File aplikasi sebelumnya menunjukkan cara mengonfigurasi satu poller default untuk aplikasi ke semua pengikatan. Jika Anda ingin mengonfigurasi poller untuk pengikatan tertentu, Anda dapat menggunakan konfigurasi seperti
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.kita perlu mendefinisikan dua pemasok dan dua konsumen
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Provisi sumber daya
Pengikat bus layanan mendukung provisi antrean, topik, dan langganan, pengguna dapat menggunakan properti berikut untuk mengaktifkan provisi.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Catatan
Nilai yang diizinkan untuk tenant-id
adalah: common
, organizations
, consumers
, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Sampel
Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk