Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Spring Integration Extension for Azure menyediakan adaptor Integrasi Spring untuk berbagai layanan yang disediakan oleh Azure SDK untuk Java. Kami menyediakan dukungan Integrasi Spring untuk layanan Azure ini: Event Hubs, Service Bus, Storage Queue. Berikut ini adalah daftar adaptor yang didukung:
-
spring-cloud-azure-starter-integration-eventhubs- untuk informasi selengkapnya, lihat integrasi Spring dengan Azure Event Hubs -
spring-cloud-azure-starter-integration-servicebus- untuk informasi selengkapnya, lihat integrasi Spring dengan Azure Service Bus -
spring-cloud-azure-starter-integration-storage-queue- untuk informasi selengkapnya, lihat integrasi Spring dengan Azure Storage Queue
Integrasi Spring dengan Azure Event Hubs
Konsep utama
Azure Event Hubs adalah platform streaming big data dan layanan penyerapan peristiwa. Ini dapat menerima dan memproses jutaan peristiwa per detik. Data yang dikirim ke pusat aktivitas dapat diubah dan disimpan dengan menggunakan penyedia analitik real-time atau adaptor batching/penyimpanan.
Integrasi Spring memungkinkan olahpesan ringan dalam aplikasi berbasis Spring dan mendukung integrasi dengan sistem eksternal melalui adaptor deklaratif. Adaptor tersebut memberikan tingkat abstraksi yang lebih tinggi atas dukungan Spring untuk jarak jauh, olahpesan, dan penjadwalan. Proyek ekstensi
Nota
API dukungan RxJava dihilangkan dari versi 4.0.0. Lihat Javadoc untuk detailnya.
Grup konsumen
Azure Event Hubs memberikan dukungan serupa dari grup konsumen sebagai Apache Kafka, tetapi dengan sedikit logika yang berbeda. Meskipun Kafka menyimpan semua offset yang diterapkan di broker, Anda harus menyimpan offset pesan Azure Event Hubs yang diproses secara manual. Azure Event Hubs SDK menyediakan fungsi untuk menyimpan offset tersebut di dalam Azure Storage.
Dukungan partisi
Event Hubs menyediakan konsep partisi fisik yang serupa dengan Kafka. Tetapi tidak seperti penyeimbangan ulang otomatis Kafka antara konsumen dan partisi, Azure Event Hubs menyediakan semacam mode preemptive. Akun penyimpanan bertindak sebagai sewa untuk menentukan partisi mana yang dimiliki oleh konsumen mana. Ketika konsumen baru dimulai, ia akan mencoba mencuri beberapa partisi dari sebagian besar konsumen yang dimuat berat untuk mencapai keseimbangan beban kerja.
Untuk menentukan strategi penyeimbangan beban, pengembang dapat menggunakan EventHubsContainerProperties untuk konfigurasi. Lihat bagian berikut untuk contoh cara mengonfigurasi EventHubsContainerProperties.
Dukungan konsumen batch
EventHubsInboundChannelAdapter mendukung mode penggunaan batch. Untuk mengaktifkannya, pengguna dapat menentukan mode pendengar sebagai ListenerMode.BATCH saat membuat instans EventHubsInboundChannelAdapter.
Saat diaktifkan, Pesan
Nota
Header titik pemeriksaan hanya ada ketika mode titik pemeriksaan MANUAL digunakan.
Titik pemeriksaan konsumen batch mendukung dua mode: BATCH dan MANUAL. mode BATCH adalah mode titik pemeriksaan otomatis untuk memeriksa seluruh batch peristiwa bersama-sama setelah diterima. mode MANUAL adalah memeriksa peristiwa oleh pengguna. Saat digunakan,
Kebijakan penggunaan batch dapat ditentukan oleh properti max-size dan max-wait-time, di mana max-size adalah properti yang diperlukan sementara max-wait-time bersifat opsional.
Untuk menentukan strategi penggunaan batch, pengembang dapat menggunakan EventHubsContainerProperties untuk konfigurasi. Lihat bagian berikut untuk contoh cara mengonfigurasi EventHubsContainerProperties.
Penyiapan dependensi
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Konfigurasi
Starter ini menyediakan 3 bagian opsi konfigurasi berikut:
Properti Konfigurasi Koneksi
Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Event Hubs.
Nota
Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.
Properti koneksi yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-eventhubs:
| Harta benda | Jenis | Deskripsi |
|---|---|---|
| spring.cloud.azure.eventhubs.enabled | Boolean | Apakah Azure Event Hubs diaktifkan. |
| spring.cloud.azure.eventhubs.connection-string | Tali | Nilai string koneksi Namespace Layanan Event Hubs. |
| spring.cloud.azure.eventhubs.namespace | Tali | Nilai Namespace Layanan Azure Event Hubs, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName |
| musim semi.cloud.azure.eventhubs.nama-domain | Tali | Nama domain nilai Namespace Azure Event Hubs. |
| spring.cloud.azure.eventhubs.custom-endpoint-address | Tali | Alamat Titik Akhir Kustom. |
| musim semi.cloud.azure.eventhubs.shared-connection | Boolean (tipe data yang hanya memiliki dua nilai: true atau false) | Apakah EventProcessorClient dan EventHubProducerAsyncClient yang mendasarinya menggunakan koneksi yang sama. Secara default, koneksi baru dibuat dan digunakan untuk setiap klien Event Hub yang dibuat. |
Properti Konfigurasi Titik Pemeriksaan
Bagian ini berisi opsi konfigurasi untuk layanan Storage Blobs, yang digunakan untuk mempertahankan kepemilikan partisi dan informasi titik pemeriksaan.
Nota
Dari versi 4.0.0, saat properti spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tidak diaktifkan secara manual, tidak ada kontainer Storage yang akan dibuat secara otomatis.
Titik pemeriksaan properti yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-eventhubs:
| Harta benda | Jenis | Deskripsi |
|---|---|---|
| spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolean (tipe data yang hanya memiliki dua nilai: true atau false) | Apakah akan mengizinkan pembuatan kontainer jika tidak ada. |
| musim semi.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Tali | Nama untuk akun penyimpanan. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Tali | Kunci akses akun penyimpanan. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Tali | Nama kontainer penyimpanan. |
Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk penyimpanan titik pemeriksaan Blob Penyimpanan. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan terpadu spring.cloud.azure. atau awalan spring.cloud.azure.eventhubs.processor.checkpoint-store.
Properti konfigurasi prosesor Azure Event Hub
EventHubsInboundChannelAdapter menggunakan EventProcessorClient untuk menggunakan pesan dari hub peristiwa, untuk mengonfigurasi properti keseluruhan EventProcessorClient, pengembang dapat menggunakan EventHubsContainerProperties untuk konfigurasi. Lihat bagian berikut tentang cara bekerja dengan EventHubsInboundChannelAdapter.
Penggunaan dasar
Mengirim pesan ke Azure Event Hubs
Isi opsi konfigurasi kredensial.
Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}Nota
Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.
Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Nota
Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Buat
DefaultMessageHandlerdengan kacangEventHubsTemplateuntuk mengirim pesan ke Azure Event Hubs.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }Buat pengikatan gateway pesan dengan handler pesan di atas melalui saluran pesan.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }Mengirim pesan menggunakan gateway.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Menerima pesan dari Azure Event Hubs
Isi opsi konfigurasi kredensial.
Buat biji saluran pesan sebagai saluran input.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }Buat
EventHubsInboundChannelAdapterdengan kacangEventHubsMessageListenerContaineruntuk menerima pesan dari Azure Event Hubs.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }Buat pengikatan penerima pesan dengan EventHubsInboundChannelAdapter melalui saluran pesan yang dibuat sebelumnya.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Mengonfigurasi EventHubsMessageConverter untuk menyesuaikan objectMapper
EventHubsMessageConverter dibuat sebagai kacang yang dapat dikonfigurasi untuk memungkinkan pengguna menyesuaikan ObjectMapper.
Dukungan konsumen batch
Untuk menggunakan pesan dari Azure Event Hubs dalam batch mirip dengan sampel di atas, selain itu pengguna harus mengatur opsi konfigurasi terkait yang menggunakan batch untuk EventHubsInboundChannelAdapter.
Saat membuat EventHubsInboundChannelAdapter, mode pendengar harus diatur sebagai BATCH. Saat membuat biji EventHubsMessageListenerContainer, atur mode titik pemeriksaan sebagai MANUAL atau BATCH, dan opsi batch dapat dikonfigurasi sesuai kebutuhan.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Header pesan Azure Event Hubs
Tabel berikut ini menggambarkan bagaimana properti pesan Azure Event Hubs dipetakan ke header pesan Spring. Untuk Azure Event Hubs, pesan disebut sebagai event.
Pemetaan antara Pesan Event Hubs / Properti Peristiwa dan Header Pesan Spring dalam Mode Pendengar Rekaman:
| Properti Peristiwa Azure Event Hubs | Konstanta Header Pesan Spring | Jenis | Deskripsi |
|---|---|---|---|
| Waktu antrean | EventHubsHeaders#ENQUEUED_TIME | Instan | Instan, di UTC, saat peristiwa diantrekan di partisi Event Hub. |
| Offset | EventHubsHeaders#OFFSET | Panjang | Offset peristiwa saat diterima dari partisi Event Hub terkait. |
| Kunci partisi | AzureHeaders#PARTITION_KEY | Tali | Kunci hash partisi jika diatur saat awalnya menerbitkan peristiwa. |
| ID Partisi | AzureHeaders#RAW_PARTITION_ID | Tali | ID partisi Pusat Aktivitas. |
| Nomor urut | EventHubsHeaders#SEQUENCE_NUMBER | Panjang | Nomor urutan yang ditetapkan ke peristiwa saat diantrekan di partisi Event Hub terkait. |
| Properti peristiwa antrean terakhir | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Properti peristiwa antrean terakhir dalam partisi ini. |
| NA | AzureHeaders#CHECKPOINTER | Titik pemeriksaan | Header untuk titik pemeriksaan pesan tertentu. |
Pengguna dapat mengurai header pesan untuk informasi terkait dari setiap peristiwa. Untuk mengatur header pesan untuk peristiwa, semua header yang dikustomisasi akan dimasukkan sebagai properti aplikasi dari suatu peristiwa, di mana header diatur sebagai kunci properti. Ketika peristiwa diterima dari Azure Event Hubs, semua properti aplikasi akan dikonversi ke header pesan.
Nota
Header pesan kunci partisi, waktu antrean, offset, dan nomor urut tidak didukung untuk diatur secara manual.
Saat mode konsumen batch diaktifkan, header tertentu dari pesan batch dicantumkan sebagai berikut, yang berisi daftar nilai dari setiap peristiwa Event Hubs tunggal.
Pemetaan antara Pesan Event Hubs / Properti Peristiwa dan Header Pesan Spring dalam Mode Pendengar Batch:
| Properti Peristiwa Azure Event Hubs | Konstanta Header Pesan Spring Batch | Jenis | Deskripsi |
|---|---|---|---|
| Waktu antrean | EventHubsHeaders#ENQUEUED_TIME | Daftar Instan | Daftar instan, di UTC, saat setiap peristiwa diantrekan di partisi Event Hub. |
| Offset | EventHubsHeaders#OFFSET | Daftar Panjang | Daftar offset setiap peristiwa saat diterima dari partisi Event Hub terkait. |
| Kunci partisi | AzureHeaders#PARTITION_KEY | Daftar String | Daftar kunci hashing partisi jika ditetapkan saat awalnya menerbitkan setiap peristiwa. |
| Nomor urut | EventHubsHeaders#SEQUENCE_NUMBER | Daftar Panjang | Daftar nomor urut yang ditetapkan untuk setiap peristiwa saat diantrekan di partisi Event Hub terkait. |
| Properti sistem | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Daftar Peta | Daftar properti sistem dari setiap peristiwa. |
| Properti aplikasi | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Daftar Peta | Daftar properti aplikasi dari setiap peristiwa, tempat semua header pesan atau properti peristiwa yang dikustomisasi ditempatkan. |
Nota
Saat menerbitkan pesan, semua header batch di atas akan dihapus dari pesan jika ada.
Sampel
Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.
Integrasi Spring dengan Azure Service Bus
Konsep utama
Integrasi Spring memungkinkan olahpesan ringan dalam aplikasi berbasis Spring dan mendukung integrasi dengan sistem eksternal melalui adaptor deklaratif.
Proyek ekstensi Spring Integration for Azure Service Bus menyediakan adaptor saluran masuk dan keluar untuk Azure Service Bus.
Nota
API dukungan CompletableFuture telah tidak digunakan lagi dari versi 2.10.0, dan digantikan oleh Reactor Core dari versi 4.0.0. Lihat Javadoc untuk detailnya.
Penyiapan dependensi
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Konfigurasi
Starter ini menyediakan 2 bagian opsi konfigurasi berikut:
Properti konfigurasi koneksi
Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Service Bus.
Nota
Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.
Properti koneksi yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-servicebus:
| Harta benda | Jenis | Deskripsi |
|---|---|---|
| spring.cloud.azure.servicebus.enabled | Boolean | Apakah Azure Service Bus diaktifkan. |
| spring.cloud.azure.servicebus.connection-string | Tali | Nilai string koneksi Namespace Service Bus. |
| spring.cloud.azure.servicebus.custom-endpoint-address | Tali | Alamat titik akhir kustom yang akan digunakan saat menyambungkan ke Azure Service Bus. |
| spring.cloud.azure.servicebus.namespace | Tali | Nilai Namespace Service Bus, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName |
| musim semi.cloud.azure.servicebus.domain-name | Tali | Nama domain nilai Namespace Azure Service Bus. |
Properti konfigurasi prosesor Azure Service Bus
ServiceBusInboundChannelAdapter menggunakan ServiceBusProcessorClient untuk menggunakan pesan, untuk mengonfigurasi properti keseluruhan ServiceBusProcessorClient, pengembang dapat menggunakan ServiceBusContainerProperties untuk konfigurasi. Lihat bagian berikut tentang cara bekerja dengan ServiceBusInboundChannelAdapter.
Penggunaan dasar
Mengirim pesan ke Azure Service Bus
Isi opsi konfigurasi kredensial.
Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}Nota
Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.
Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Nota
Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Nota
Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Buat
DefaultMessageHandlerdengan kacangServiceBusTemplateuntuk mengirim pesan ke Service Bus, atur jenis entitas untuk ServiceBusTemplate. Sampel ini mengambil Antrean Bus Layanan sebagai contoh.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }Buat pengikatan gateway pesan dengan handler pesan di atas melalui saluran pesan.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }Mengirim pesan menggunakan gateway.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Menerima pesan dari Azure Service Bus
Isi opsi konfigurasi kredensial.
Buat biji saluran pesan sebagai saluran input.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }Buat
ServiceBusInboundChannelAdapterdengan kacangServiceBusMessageListenerContaineruntuk menerima pesan ke Azure Service Bus. Sampel ini mengambil Antrean Bus Layanan sebagai contoh.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }Buat pengikatan penerima pesan dengan
ServiceBusInboundChannelAdaptermelalui saluran pesan yang kami buat sebelumnya.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Mengonfigurasi ServiceBusMessageConverter untuk menyesuaikan objectMapper
ServiceBusMessageConverter dibuat sebagai kacang yang dapat dikonfigurasi untuk memungkinkan pengguna menyesuaikan ObjectMapper.
Header pesan Azure Service Bus
Untuk beberapa header Azure Service Bus yang dapat dipetakan ke beberapa konstanta header Spring, prioritas header Spring yang berbeda dicantumkan.
Pemetaan antara Header Bus Layanan dan Header Spring:
| Header dan properti pesan Azure Service Bus | Konstanta header pesan Spring | Jenis | Dikonfigurasi | Deskripsi |
|---|---|---|---|---|
| Tipe isi | MessageHeaders#CONTENT_TYPE |
Tali | Ya | Pendeskripsi Tipe Konten RFC2045 pesan. |
| ID Korelasi | ServiceBusMessageHeaders#CORRELATION_ID |
Tali | Ya | ID korelasi pesan |
| ID Pesan | ServiceBusMessageHeaders#MESSAGE_ID |
Tali | Ya | ID pesan pesan, header ini memiliki prioritas yang lebih tinggi daripada MessageHeaders#ID. |
| ID Pesan | MessageHeaders#ID |
UUID (Pengidentifikasi Unik Universal) | Ya | ID pesan pesan, header ini memiliki prioritas yang lebih rendah daripada ServiceBusMessageHeaders#MESSAGE_ID. |
| Kunci partisi | ServiceBusMessageHeaders#PARTITION_KEY |
Tali | Ya | Kunci partisi untuk mengirim pesan ke entitas yang dipartisi. |
| Balas ke | MessageHeaders#REPLY_CHANNEL |
Tali | Ya | Alamat entitas yang akan dikirimi balasan. |
| Balas ke ID sesi | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Tali | Ya | Nilai properti ReplyToGroupId pesan. |
| Waktu antrean terjadwal utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Ya | Tanggalwaktu di mana pesan harus diantrekan di Azure Service Bus, header ini memiliki prioritas yang lebih tinggi daripada AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE. |
| Waktu antrean terjadwal utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Integer | Ya | Tanggalwaktu di mana pesan harus diantrekan di Azure Service Bus, header ini memiliki prioritas yang lebih rendah daripada ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME. |
| ID Sesi | ServiceBusMessageHeaders#SESSION_ID |
Tali | Ya | IDentifier sesi untuk entitas yang sadar sesi. |
| Waktu hidup | ServiceBusMessageHeaders#TIME_TO_LIVE |
Durasi | Ya | Durasi waktu sebelum pesan ini kedaluwarsa. |
| Ke | ServiceBusMessageHeaders#TO |
Tali | Ya | Alamat "ke" pesan, dicadangkan untuk digunakan di masa mendatang dalam skenario perutean dan saat ini diabaikan oleh broker itu sendiri. |
| Subyek | ServiceBusMessageHeaders#SUBJECT |
Tali | Ya | Subjek untuk pesan. |
| Deskripsi kesalahan surat mati | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Tali | Tidak | Deskripsi untuk pesan yang telah di-dead-letter. |
| Alasan surat mati | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Tali | Tidak | Alasan pesan itu surat mati. |
| Sumber surat mati | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Tali | Tidak | Entitas tempat pesan di-dead-letter. |
| Jumlah pengiriman | ServiceBusMessageHeaders#DELIVERY_COUNT |
panjang | Tidak | Berapa kali pesan ini dikirimkan kepada klien. |
| Nomor urutan yang diantrekan | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
panjang | Tidak | Nomor urutan antrean yang ditetapkan ke pesan oleh Azure Service Bus. |
| Waktu antrean | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Tidak | Tanggalwaktu saat pesan ini diantrekan di Azure Service Bus. |
| Kedaluwarsa pada | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Tidak | Tanggalwaktu saat pesan ini akan kedaluwarsa. |
| Kunci token | ServiceBusMessageHeaders#LOCK_TOKEN |
Tali | Tidak | Token kunci untuk pesan saat ini. |
| Terkunci hingga | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Tidak | Tanggalwaktu saat kunci pesan ini kedaluwarsa. |
| Nomor urut | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
panjang | Tidak | Nomor unik yang ditetapkan ke pesan oleh Azure Service Bus. |
| Negara | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Tidak | Status pesan, yang dapat Aktif, Ditangguhkan, atau Dijadwalkan. |
Dukungan kunci partisi
Starter ini mendukung partisi Service Bus dengan mengizinkan pengaturan kunci partisi dan ID sesi di header pesan. Bagian ini memperkenalkan cara mengatur kunci partisi untuk pesan.
Disarankan: Gunakan ServiceBusMessageHeaders.PARTITION_KEY sebagai kunci header.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Tidak disarankan tetapi saat ini didukung: AzureHeaders.PARTITION_KEY sebagai kunci header.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nota
Ketika ServiceBusMessageHeaders.PARTITION_KEY dan AzureHeaders.PARTITION_KEY diatur di header pesan, ServiceBusMessageHeaders.PARTITION_KEY lebih disukai.
Dukungan sesi
Contoh ini menunjukkan cara mengatur ID sesi pesan secara manual dalam aplikasi.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nota
Ketika ServiceBusMessageHeaders.SESSION_ID diatur di header pesan, dan header ServiceBusMessageHeaders.PARTITION_KEY yang berbeda juga diatur, nilai ID sesi akhirnya akan digunakan untuk menimpa nilai kunci partisi.
Menyesuaikan properti klien Azure Service Bus
Pengembang dapat menggunakan AzureServiceClientBuilderCustomizer untuk menyesuaikan properti Klien Bus Layanan. Contoh berikut mengkustomisasi properti sessionIdleTimeout di ServiceBusClientBuilder:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Sampel
Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.
Integrasi Spring dengan Antrean Azure Storage
Konsep utama
Azure Queue Storage adalah layanan untuk menyimpan pesan dalam jumlah besar. Anda mengakses pesan dari mana saja di dunia melalui panggilan terautentikasi menggunakan HTTP atau HTTPS. Pesan antrean dapat berukuran hingga 64 KB. Antrean mungkin berisi jutaan pesan, hingga batas kapasitas total akun penyimpanan. Antrean umumnya digunakan untuk membuat backlog pekerjaan untuk diproses secara asinkron.
Penyiapan dependensi
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Konfigurasi
Starter ini menyediakan opsi konfigurasi berikut:
Properti konfigurasi koneksi
Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Antrean Azure Storage.
Nota
Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.
Properti koneksi yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-storage-queue:
| Harta benda | Jenis | Deskripsi |
|---|---|---|
| musim semi.cloud.azure.storage.queue.enabled | Boolean | Apakah Antrean Azure Storage diaktifkan. |
| musim semi.cloud.azure.storage.queue.connection-string | Tali | Nilai string koneksi Ruang Nama Antrean Penyimpanan. |
| musim semi.cloud.azure.storage.queue.accountName | Tali | Nama akun Antrean Penyimpanan. |
| spring.cloud.azure.storage.queue.accountKey | Tali | Kunci akun Antrean Penyimpanan. |
| spring.cloud.azure.storage.queue.endpoint | Tali | Titik akhir layanan Antrean Penyimpanan. |
| spring.cloud.azure.storage.queue.sasToken | Tali | Kredensial token Sas |
| musim semi.cloud.azure.storage.queue.serviceVersion | Versi Layanan Antrean | QueueServiceVersion yang digunakan saat membuat permintaan API. |
| musim semi.cloud.azure.storage.queue.messageEncoding | Tali | Pengodean pesan antrean. |
Penggunaan dasar
Mengirim pesan ke Antrean Azure Storage
Isi opsi konfigurasi kredensial.
Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}Nota
Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.
Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Nota
Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Nota
Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Buat
DefaultMessageHandlerdengan kacangStorageQueueTemplateuntuk mengirim pesan ke Antrean Penyimpanan.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }Buat pengikatan gateway Pesan dengan handler pesan di atas melalui saluran pesan.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }Mengirim pesan menggunakan gateway.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Menerima pesan dari Antrean Azure Storage
Isi opsi konfigurasi kredensial.
Buat biji saluran pesan sebagai saluran input.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }Buat
StorageQueueMessageSourcedengan kacangStorageQueueTemplateuntuk menerima pesan ke Antrean Penyimpanan.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }Buat pengikatan penerima pesan dengan StorageQueueMessageSource yang dibuat di langkah terakhir melalui saluran pesan yang kami buat sebelumnya.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Sampel
Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.