Dukungan Spring Cloud Azure untuk Integrasi Spring
Artikel ini berlaku untuk: ✔️ Versi 4.14.0 ✔️ Versi 5.8.0
Spring Integration Extension untuk Azure menyediakan adaptor Integrasi Spring untuk berbagai layanan yang disediakan oleh Azure SDK untuk Java. Kami menyediakan dukungan Integrasi Spring untuk layanan Azure ini: Event Hubs, Bus Layanan, Storage Queue. Berikut ini adalah daftar adaptor yang didukung:
spring-cloud-azure-starter-integration-eventhubs
- untuk informasi selengkapnya, lihat Integrasi Spring dengan Azure Event Hubsspring-cloud-azure-starter-integration-servicebus
- untuk informasi selengkapnya, lihat Integrasi Spring dengan Azure Bus Layananspring-cloud-azure-starter-integration-storage-queue
- untuk informasi selengkapnya, lihat Integrasi Spring dengan Antrean Azure Storage
Integrasi Spring dengan Azure Event Hubs
Konsep kunci
Azure Event Hubs adalah platform streaming data besar dan layanan penyerapan peristiwa. Layanan ini dapat menerima dan memproses jutaan peristiwa per detik. Data yang dikirim ke hub peristiwa dapat ditransformasikan dan disimpan menggunakan penyedia analitik real-time atau adapter batching/penyimpanan.
Integrasi Spring memungkinkan olahpesan ringan dalam aplikasi berbasis Spring dan mendukung integrasi dengan sistem eksternal melalui adaptor deklaratif. Adaptor tersebut memberikan tingkat abstraksi yang lebih tinggi atas dukungan Spring untuk jarak jauh, olahpesan, dan penjadwalan. Proyek ekstensi Spring Integration for Event Hubs menyediakan adaptor dan gateway saluran masuk dan keluar untuk Azure Event Hubs.
Catatan
API dukungan RxJava dihilangkan dari versi 4.0.0. Lihat Javadoc untuk detailnya.
Grup konsumen
Azure Event Hubs memberikan dukungan serupa dari grup konsumen sebagai Apache Kafka, tetapi dengan sedikit logika yang berbeda. Meskipun Kafka menyimpan semua offset yang diterapkan di broker, Anda harus menyimpan offset pesan Azure Event Hubs yang diproses secara manual. Azure Event Hubs SDK menyediakan fungsi untuk menyimpan offset tersebut di dalam Azure Storage.
Dukungan partisi
Event Hubs menyediakan konsep partisi fisik yang serupa dengan Kafka. Tetapi tidak seperti penyeimbangan ulang otomatis Kafka antara konsumen dan partisi, Azure Event Hubs menyediakan semacam mode preemptive. Akun penyimpanan bertindak sebagai sewa untuk menentukan partisi mana yang dimiliki oleh konsumen mana. Ketika konsumen baru dimulai, ia akan mencoba mencuri beberapa partisi dari sebagian besar konsumen yang dimuat berat untuk mencapai keseimbangan beban kerja.
Untuk menentukan strategi penyeimbangan beban, pengembang dapat menggunakan EventHubsContainerProperties
untuk konfigurasi. Lihat bagian berikut untuk contoh cara mengonfigurasi EventHubsContainerProperties
.
Dukungan konsumen batch
Mendukung EventHubsInboundChannelAdapter
mode penggunaan batch. Untuk mengaktifkannya, pengguna dapat menentukan mode pendengar seperti ListenerMode.BATCH
saat membuat EventHubsInboundChannelAdapter
instans.
Saat diaktifkan, Pesan yang payload-nya adalah daftar peristiwa batch akan diterima dan diteruskan ke saluran hilir. Setiap header pesan juga dikonversi sebagai daftar, yang kontennya adalah nilai header terkait yang diurai dari setiap peristiwa. Untuk header komunal ID partisi, checkpointer, dan properti antrean terakhir, mereka disajikan sebagai nilai tunggal untuk seluruh batch peristiwa berbagi yang sama. Untuk informasi selengkapnya, lihat bagian Header Pesan Azure Event Hubs .
Catatan
Header titik pemeriksaan hanya ada ketika mode titik pemeriksaan MANUAL digunakan.
Titik pemeriksaan konsumen batch mendukung dua mode: BATCH
dan MANUAL
. BATCH
mode adalah mode titik pemeriksaan otomatis untuk memeriksa seluruh batch peristiwa bersama-sama setelah diterima. MANUAL
mode adalah untuk memeriksa peristiwa oleh pengguna. Saat digunakan, Checkpointer akan diteruskan ke header pesan, dan pengguna dapat menggunakannya untuk melakukan titik pemeriksaan.
Kebijakan penggunaan batch dapat ditentukan oleh properti max-size
dan max-wait-time
, di mana max-size
merupakan properti yang diperlukan sementara max-wait-time
bersifat opsional.
Untuk menentukan strategi penggunaan batch, pengembang dapat menggunakan EventHubsContainerProperties
untuk konfigurasi. Lihat bagian berikut untuk contoh cara mengonfigurasi EventHubsContainerProperties
.
Penyiapan dependensi
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Konfigurasi
Starter ini menyediakan 3 bagian opsi konfigurasi berikut:
Properti Konfigurasi Koneksi ion
Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Event Hubs.
Catatan
Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.
Koneksi ion properti spring-cloud-azure-starter-integration-eventhubs yang dapat dikonfigurasi:
Properti | Tipe | Deskripsi |
---|---|---|
spring.cloud.azure.eventhubs.enabled | Boolean | Apakah Azure Event Hubs diaktifkan. |
spring.cloud.azure.eventhubs.connection-string | String | Nilai string koneksi Ruang Nama Azure Event Hubs. |
spring.cloud.azure.eventhubs.namespace | String | Nilai Namespace Layanan Azure Event Hubs, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | String | Nama domain nilai Namespace Azure Event Hubs. |
spring.cloud.azure.eventhubs.custom-endpoint-address | String | Alamat Titik Akhir Kustom. |
spring.cloud.azure.eventhubs.shared-connection | Boolean | Apakah EventProcessorClient dan EventHubProducerAsyncClient yang mendasarinya menggunakan koneksi yang sama. Secara default, koneksi baru dibuat dan digunakan untuk setiap klien Event Hub yang dibuat. |
Properti Konfigurasi Titik Pemeriksaan
Bagian ini berisi opsi konfigurasi untuk layanan Storage Blobs, yang digunakan untuk mempertahankan kepemilikan partisi dan informasi titik pemeriksaan.
Catatan
Dari versi 4.0.0, ketika properti spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tidak diaktifkan secara manual, tidak ada kontainer Storage yang akan dibuat secara otomatis.
Titik pemeriksaan properti yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-eventhubs:
Properti | Tipe | Deskripsi |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolean | Apakah akan mengizinkan pembuatan kontainer jika tidak ada. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | String | Nama untuk akun penyimpanan. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | String | Kunci akses akun penyimpanan. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | String | Nama kontainer penyimpanan. |
Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk penyimpanan titik pemeriksaan Blob Penyimpanan. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan spring.cloud.azure.
terpadu atau awalan spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Properti konfigurasi prosesor Azure Event Hub
EventHubsInboundChannelAdapter
menggunakan EventProcessorClient
untuk menggunakan pesan dari pusat aktivitas, untuk mengonfigurasi properti keseluruhan , EventProcessorClient
pengembang dapat menggunakan EventHubsContainerProperties
untuk konfigurasi. Lihat bagian berikut tentang cara bekerja dengan EventHubsInboundChannelAdapter
.
Penggunaan dasar
Mengirim pesan ke Azure Event Hubs
Isi opsi konfigurasi kredensial.
Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Catatan
Nilai yang diizinkan untuk tenant-id
adalah: common
, organizations
, consumers
, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Buat
DefaultMessageHandler
dengan bijiEventHubsTemplate
untuk mengirim pesan ke Azure Event Hubs.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Buat pengikatan gateway pesan dengan handler pesan di atas melalui saluran pesan.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Mengirim pesan menggunakan gateway.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Menerima pesan dari Azure Event Hubs
Isi opsi konfigurasi kredensial.
Buat biji saluran pesan sebagai saluran input.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Buat
EventHubsInboundChannelAdapter
dengan bijiEventHubsMessageListenerContainer
untuk menerima pesan dari Azure Event Hubs.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Buat pengikatan penerima pesan dengan EventHubsInboundChannelAdapter melalui saluran pesan yang dibuat sebelumnya.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Mengonfigurasi EventHubsMessageConverter untuk menyesuaikan objectMapper
EventHubsMessageConverter
dibuat sebagai kacang yang dapat dikonfigurasi untuk memungkinkan pengguna menyesuaikan ObjectMapper.
Dukungan konsumen batch
Untuk menggunakan pesan dari Azure Event Hubs dalam batch mirip dengan sampel di atas, selain itu pengguna harus mengatur opsi konfigurasi terkait yang menggunakan batch untuk EventHubsInboundChannelAdapter
.
Saat membuat EventHubsInboundChannelAdapter
, mode pendengar harus diatur sebagai BATCH
. Saat membuat biji EventHubsMessageListenerContainer
, atur mode titik pemeriksaan sebagai MANUAL
atau BATCH
, dan opsi batch dapat dikonfigurasi sesuai kebutuhan.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Header pesan Azure Event Hubs
Tabel berikut ini menggambarkan bagaimana properti pesan Azure Event Hubs dipetakan ke header pesan Spring. Untuk Azure Event Hubs, pesan disebut sebagai event
.
Pemetaan antara Pesan Event Hubs / Properti Peristiwa dan Header Pesan Spring dalam Mode Pendengar Rekaman:
Properti Peristiwa Azure Event Hubs | Konstanta Header Pesan Spring | Jenis | Deskripsi |
---|---|---|---|
Waktu antrean | EventHubsHeaders#ENQUEUED_TIME | Instan | Instan, di UTC, saat peristiwa diantrekan di partisi Event Hub. |
Offset | EventHubsHeaders#OFFSET | Panjang | Offset peristiwa saat diterima dari partisi Event Hub terkait. |
Kunci partisi | AzureHeaders#PARTITION_KEY | String | Kunci hash partisi jika diatur saat awalnya menerbitkan peristiwa. |
ID partisi | AzureHeaders#RAW_PARTITION_ID | String | ID partisi Pusat Aktivitas. |
Nomor urut | EventHubsHeaders#SEQUENCE_NUMBER | Panjang | Nomor urutan yang ditetapkan ke peristiwa saat diantrekan di partisi Event Hub terkait. |
Properti peristiwa antrean terakhir | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Properti peristiwa antrean terakhir dalam partisi ini. |
NA | AzureHeaders#CHECKPOINTER | Titik pemeriksaan | Header untuk titik pemeriksaan pesan tertentu. |
Pengguna dapat mengurai header pesan untuk informasi terkait dari setiap peristiwa. Untuk mengatur header pesan untuk peristiwa, semua header yang dikustomisasi akan dimasukkan sebagai properti aplikasi dari suatu peristiwa, di mana header diatur sebagai kunci properti. Ketika peristiwa diterima dari Azure Event Hubs, semua properti aplikasi akan dikonversi ke header pesan.
Catatan
Header pesan kunci partisi, waktu antrean, offset, dan nomor urut tidak didukung untuk diatur secara manual.
Saat mode konsumen batch diaktifkan, header tertentu dari pesan batch dicantumkan sebagai berikut, yang berisi daftar nilai dari setiap peristiwa Event Hubs tunggal.
Pemetaan antara Pesan Event Hubs / Properti Peristiwa dan Header Pesan Spring dalam Mode Pendengar Batch:
Properti Peristiwa Azure Event Hubs | Konstanta Header Pesan Spring Batch | Jenis | Deskripsi |
---|---|---|---|
Waktu antrean | EventHubsHeaders#ENQUEUED_TIME | Daftar Instan | Daftar instan, di UTC, saat setiap peristiwa diantrekan di partisi Event Hub. |
Offset | EventHubsHeaders#OFFSET | Daftar Panjang | Daftar offset setiap peristiwa saat diterima dari partisi Event Hub terkait. |
Kunci partisi | AzureHeaders#PARTITION_KEY | Daftar String | Daftar kunci hashing partisi jika ditetapkan saat awalnya menerbitkan setiap peristiwa. |
Nomor urut | EventHubsHeaders#SEQUENCE_NUMBER | Daftar Panjang | Daftar nomor urut yang ditetapkan untuk setiap peristiwa saat diantrekan di partisi Event Hub terkait. |
Properti sistem | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Daftar Peta | Daftar properti sistem dari setiap peristiwa. |
Properti aplikasi | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Daftar Peta | Daftar properti aplikasi dari setiap peristiwa, tempat semua header pesan atau properti peristiwa yang dikustomisasi ditempatkan. |
Catatan
Saat menerbitkan pesan, semua header batch di atas akan dihapus dari pesan jika ada.
Sampel
Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.
Integrasi Spring dengan Azure Service Bus
Konsep kunci
Integrasi Spring memungkinkan olahpesan ringan dalam aplikasi berbasis Spring dan mendukung integrasi dengan sistem eksternal melalui adaptor deklaratif.
Proyek ekstensi Spring Integration for Azure Bus Layanan menyediakan adaptor saluran masuk dan keluar untuk Azure Bus Layanan.
Catatan
API dukungan CompletableFuture telah tidak digunakan lagi dari versi 2.10.0, dan digantikan oleh Reactor Core dari versi 4.0.0. Lihat Javadoc untuk detailnya.
Penyiapan dependensi
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Konfigurasi
Starter ini menyediakan 2 bagian opsi konfigurasi berikut:
properti konfigurasi Koneksi ion
Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Bus Layanan.
Catatan
Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.
Koneksi si properti yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-servicebus:
Properti | Tipe | Deskripsi |
---|---|---|
spring.cloud.azure.servicebus.enabled | Boolean | Apakah Azure Bus Layanan diaktifkan. |
spring.cloud.azure.servicebus.connection-string | String | Bus Layanan nilai string koneksi Namespace. |
spring.cloud.azure.servicebus.namespace | String | Bus Layanan nilai Namespace, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | String | Nama domain nilai Azure Bus Layanan Namespace. |
Bus Layanan properti konfigurasi prosesor
ServiceBusInboundChannelAdapter
menggunakan ServiceBusProcessorClient
untuk menggunakan pesan, untuk mengonfigurasi properti keseluruhan , ServiceBusProcessorClient
pengembang dapat menggunakan ServiceBusContainerProperties
untuk konfigurasi. Lihat bagian berikut tentang cara bekerja dengan ServiceBusInboundChannelAdapter
.
Penggunaan dasar
Mengirim pesan ke Azure Bus Layanan
Isi opsi konfigurasi kredensial.
Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Catatan
Nilai yang diizinkan untuk tenant-id
adalah: common
, organizations
, consumers
, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Catatan
Nilai yang diizinkan untuk tenant-id
adalah: common
, organizations
, consumers
, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Buat
DefaultMessageHandler
denganServiceBusTemplate
biji untuk mengirim pesan ke Bus Layanan, atur jenis entitas untuk ServiceBusTemplate. Sampel ini mengambil antrean Bus Layanan sebagai contoh.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Buat pengikatan gateway pesan dengan handler pesan di atas melalui saluran pesan.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Mengirim pesan menggunakan gateway.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Menerima pesan dari Azure Bus Layanan
Isi opsi konfigurasi kredensial.
Buat biji saluran pesan sebagai saluran input.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Buat
ServiceBusInboundChannelAdapter
dengan bijiServiceBusMessageListenerContainer
untuk menerima pesan ke Bus Layanan. Sampel ini mengambil antrean Bus Layanan sebagai contoh.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Buat pengikatan penerima pesan dengan
ServiceBusInboundChannelAdapter
melalui saluran pesan yang kami buat sebelumnya.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Mengonfigurasi ServiceBusMessageConverter untuk menyesuaikan objectMapper
ServiceBusMessageConverter
dibuat sebagai kacang yang dapat dikonfigurasi untuk memungkinkan pengguna menyesuaikan ObjectMapper
.
Bus Layanan header pesan
Untuk beberapa header Bus Layanan yang dapat dipetakan ke beberapa konstanta header Spring, prioritas header Spring yang berbeda tercantum.
Pemetaan antara Header Bus Layanan dan Header Spring:
Bus Layanan header dan properti pesan | Konstanta header pesan Spring | Jenis | Dapat dikonfigurasi | Deskripsi |
---|---|---|---|---|
Jenis konten | MessageHeaders#CONTENT_TYPE |
String | Ya | Pendeskripsi Tipe Konten RFC2045 pesan. |
ID Korelasi | ServiceBusMessageHeaders#CORRELATION_ID |
String | Ya | ID korelasi pesan |
ID Pesan | ServiceBusMessageHeaders#MESSAGE_ID |
String | Ya | ID pesan pesan, header ini memiliki prioritas yang lebih tinggi daripada MessageHeaders#ID . |
ID Pesan | MessageHeaders#ID |
UUID | Ya | ID pesan pesan, header ini memiliki prioritas yang lebih rendah daripada ServiceBusMessageHeaders#MESSAGE_ID . |
Kunci partisi | ServiceBusMessageHeaders#PARTITION_KEY |
String | Ya | Kunci partisi untuk mengirim pesan ke entitas yang dipartisi. |
Balas ke | MessageHeaders#REPLY_CHANNEL |
String | Ya | Alamat entitas yang akan dikirimi balasan. |
Balas ke ID sesi | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
String | Ya | Nilai properti ReplyToGroupId pesan. |
Waktu antrean terjadwal utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Ya | Tanggalwaktu di mana pesan harus diantrekan dalam Bus Layanan, header ini memiliki prioritas yang lebih tinggi daripada AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Waktu antrean terjadwal utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Bilangan bulat | Ya | Tanggalwaktu di mana pesan harus diantrekan dalam Bus Layanan, header ini memiliki prioritas yang lebih rendah daripada ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
ID Sesi | ServiceBusMessageHeaders#SESSION_ID |
String | Ya | IDentifier sesi untuk entitas yang sadar sesi. |
Waktu untuk aktif | ServiceBusMessageHeaders#TIME_TO_LIVE |
Durasi | Ya | Durasi waktu sebelum pesan ini kedaluwarsa. |
Untuk | ServiceBusMessageHeaders#TO |
String | Ya | Alamat "ke" pesan, dicadangkan untuk digunakan di masa mendatang dalam skenario perutean dan saat ini diabaikan oleh broker itu sendiri. |
Subjek | ServiceBusMessageHeaders#SUBJECT |
String | Ya | Subjek untuk pesan. |
Deskripsi kesalahan surat mati | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
String | No | Deskripsi untuk pesan yang telah di-dead-letter. |
Alasan surat mati | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
String | No | Alasan pesan itu surat mati. |
Sumber surat mati | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
String | No | Entitas tempat pesan di-dead-letter. |
Jumlah pengiriman | ServiceBusMessageHeaders#DELIVERY_COUNT |
long | No | Berapa kali pesan ini dikirimkan kepada klien. |
Nomor urutan yang diantrekan | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
long | No | Nomor urutan antrean yang ditetapkan ke pesan dengan Bus Layanan. |
Waktu antrean | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | No | Tanggalwaktu di mana pesan ini diantrekan dalam Bus Layanan. |
Kedaluwarsa pada | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | No | Tanggalwaktu saat pesan ini akan kedaluwarsa. |
Kunci token | ServiceBusMessageHeaders#LOCK_TOKEN |
String | No | Token kunci untuk pesan saat ini. |
Terkunci hingga | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | No | Tanggalwaktu saat kunci pesan ini kedaluwarsa. |
Nomor urut | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
long | No | Nomor unik yang ditetapkan ke pesan menurut Bus Layanan. |
Provinsi | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | No | Status pesan, yang dapat Aktif, Ditangguhkan, atau Dijadwalkan. |
Dukungan kunci partisi
Pemula ini mendukung partisi Bus Layanan dengan mengizinkan pengaturan kunci partisi dan ID sesi di header pesan. Bagian ini memperkenalkan cara mengatur kunci partisi untuk pesan.
Disarankan: Gunakan ServiceBusMessageHeaders.PARTITION_KEY
sebagai kunci header.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Tidak disarankan tetapi saat ini didukung:AzureHeaders.PARTITION_KEY
sebagai kunci header.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Catatan
Ketika keduanya ServiceBusMessageHeaders.PARTITION_KEY
dan AzureHeaders.PARTITION_KEY
diatur di header pesan, ServiceBusMessageHeaders.PARTITION_KEY
lebih disukai.
Dukungan sesi
Contoh ini menunjukkan cara mengatur ID sesi pesan secara manual dalam aplikasi.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Catatan
ServiceBusMessageHeaders.SESSION_ID
Ketika diatur di header pesan, dan header yang berbeda ServiceBusMessageHeaders.PARTITION_KEY
juga diatur, nilai ID sesi akhirnya akan digunakan untuk menimpa nilai kunci partisi.
Sampel
Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.
Integrasi Spring dengan Antrean Azure Storage
Konsep kunci
Azure Queue Storage adalah layanan untuk menyimpan pesan dalam jumlah besar. Anda bisa mengakses pesan dari mana saja di dunia melalui panggilan terautentikasi menggunakan HTTP atau HTTPS. Pesan antrean dapat berukuran hingga 64 KB. Antrean dapat berisi jutaan pesan, hingga mencapai batas kapasitas total dari akun penyimpanan. Antrean umumnya digunakan untuk membuat backlog pekerjaan untuk diproses secara asinkron.
Penyiapan dependensi
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Konfigurasi
Starter ini menyediakan opsi konfigurasi berikut:
properti konfigurasi Koneksi ion
Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Antrean Azure Storage.
Catatan
Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.
Koneksi ion properti yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-storage-queue:
Properti | Tipe | Deskripsi |
---|---|---|
spring.cloud.azure.storage.queue.enabled | Boolean | Apakah Antrean Azure Storage diaktifkan. |
spring.cloud.azure.storage.queue.connection-string | String | Nilai string koneksi Ruang Nama Antrean Penyimpanan. |
spring.cloud.azure.storage.queue.accountName | String | Nama akun Antrean Penyimpanan. |
spring.cloud.azure.storage.queue.accountKey | String | Kunci akun Antrean Penyimpanan. |
spring.cloud.azure.storage.queue.endpoint | String | Titik akhir layanan Antrean Penyimpanan. |
spring.cloud.azure.storage.queue.sasToken | String | Kredensial token Sas |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion yang digunakan saat membuat permintaan API. |
spring.cloud.azure.storage.queue.messageEncoding | String | Pengodean pesan antrean. |
Penggunaan dasar
Mengirim pesan ke Antrean Azure Storage
Isi opsi konfigurasi kredensial.
Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Catatan
Nilai yang diizinkan untuk tenant-id
adalah: common
, organizations
, consumers
, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Catatan
Nilai yang diizinkan untuk tenant-id
adalah: common
, organizations
, consumers
, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.
Buat
DefaultMessageHandler
dengan bijiStorageQueueTemplate
untuk mengirim pesan ke Antrean Penyimpanan.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Buat pengikatan gateway Pesan dengan handler pesan di atas melalui saluran pesan.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Mengirim pesan menggunakan gateway.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Menerima pesan dari Antrean Azure Storage
Isi opsi konfigurasi kredensial.
Buat biji saluran pesan sebagai saluran input.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Buat
StorageQueueMessageSource
dengan bijiStorageQueueTemplate
untuk menerima pesan ke Antrean Penyimpanan.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Buat pengikatan penerima pesan dengan StorageQueueMessageSource yang dibuat di langkah terakhir melalui saluran pesan yang kami buat sebelumnya.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Sampel
Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk