Bagikan melalui


Dukungan Spring Cloud Azure untuk Integrasi Spring

Artikel ini berlaku untuk: ✔️ Versi 4.14.0 ✔️ Versi 5.8.0

Spring Integration Extension untuk Azure menyediakan adaptor Integrasi Spring untuk berbagai layanan yang disediakan oleh Azure SDK untuk Java. Kami menyediakan dukungan Integrasi Spring untuk layanan Azure ini: Event Hubs, Bus Layanan, Storage Queue. Berikut ini adalah daftar adaptor yang didukung:

Integrasi Spring dengan Azure Event Hubs

Konsep kunci

Azure Event Hubs adalah platform streaming data besar dan layanan penyerapan peristiwa. Layanan ini dapat menerima dan memproses jutaan peristiwa per detik. Data yang dikirim ke hub peristiwa dapat ditransformasikan dan disimpan menggunakan penyedia analitik real-time atau adapter batching/penyimpanan.

Integrasi Spring memungkinkan olahpesan ringan dalam aplikasi berbasis Spring dan mendukung integrasi dengan sistem eksternal melalui adaptor deklaratif. Adaptor tersebut memberikan tingkat abstraksi yang lebih tinggi atas dukungan Spring untuk jarak jauh, olahpesan, dan penjadwalan. Proyek ekstensi Spring Integration for Event Hubs menyediakan adaptor dan gateway saluran masuk dan keluar untuk Azure Event Hubs.

Catatan

API dukungan RxJava dihilangkan dari versi 4.0.0. Lihat Javadoc untuk detailnya.

Grup konsumen

Azure Event Hubs memberikan dukungan serupa dari grup konsumen sebagai Apache Kafka, tetapi dengan sedikit logika yang berbeda. Meskipun Kafka menyimpan semua offset yang diterapkan di broker, Anda harus menyimpan offset pesan Azure Event Hubs yang diproses secara manual. Azure Event Hubs SDK menyediakan fungsi untuk menyimpan offset tersebut di dalam Azure Storage.

Dukungan partisi

Event Hubs menyediakan konsep partisi fisik yang serupa dengan Kafka. Tetapi tidak seperti penyeimbangan ulang otomatis Kafka antara konsumen dan partisi, Azure Event Hubs menyediakan semacam mode preemptive. Akun penyimpanan bertindak sebagai sewa untuk menentukan partisi mana yang dimiliki oleh konsumen mana. Ketika konsumen baru dimulai, ia akan mencoba mencuri beberapa partisi dari sebagian besar konsumen yang dimuat berat untuk mencapai keseimbangan beban kerja.

Untuk menentukan strategi penyeimbangan beban, pengembang dapat menggunakan EventHubsContainerProperties untuk konfigurasi. Lihat bagian berikut untuk contoh cara mengonfigurasi EventHubsContainerProperties.

Dukungan konsumen batch

Mendukung EventHubsInboundChannelAdapter mode penggunaan batch. Untuk mengaktifkannya, pengguna dapat menentukan mode pendengar seperti ListenerMode.BATCH saat membuat EventHubsInboundChannelAdapter instans. Saat diaktifkan, Pesan yang payload-nya adalah daftar peristiwa batch akan diterima dan diteruskan ke saluran hilir. Setiap header pesan juga dikonversi sebagai daftar, yang kontennya adalah nilai header terkait yang diurai dari setiap peristiwa. Untuk header komunal ID partisi, checkpointer, dan properti antrean terakhir, mereka disajikan sebagai nilai tunggal untuk seluruh batch peristiwa berbagi yang sama. Untuk informasi selengkapnya, lihat bagian Header Pesan Azure Event Hubs .

Catatan

Header titik pemeriksaan hanya ada ketika mode titik pemeriksaan MANUAL digunakan.

Titik pemeriksaan konsumen batch mendukung dua mode: BATCH dan MANUAL. BATCH mode adalah mode titik pemeriksaan otomatis untuk memeriksa seluruh batch peristiwa bersama-sama setelah diterima. MANUAL mode adalah untuk memeriksa peristiwa oleh pengguna. Saat digunakan, Checkpointer akan diteruskan ke header pesan, dan pengguna dapat menggunakannya untuk melakukan titik pemeriksaan.

Kebijakan penggunaan batch dapat ditentukan oleh properti max-size dan max-wait-time, di mana max-size merupakan properti yang diperlukan sementara max-wait-time bersifat opsional. Untuk menentukan strategi penggunaan batch, pengembang dapat menggunakan EventHubsContainerProperties untuk konfigurasi. Lihat bagian berikut untuk contoh cara mengonfigurasi EventHubsContainerProperties.

Penyiapan dependensi

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Konfigurasi

Starter ini menyediakan 3 bagian opsi konfigurasi berikut:

Properti Konfigurasi Koneksi ion

Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Event Hubs.

Catatan

Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.

Koneksi ion properti spring-cloud-azure-starter-integration-eventhubs yang dapat dikonfigurasi:

Properti Tipe Deskripsi
spring.cloud.azure.eventhubs.enabled Boolean Apakah Azure Event Hubs diaktifkan.
spring.cloud.azure.eventhubs.connection-string String Nilai string koneksi Ruang Nama Azure Event Hubs.
spring.cloud.azure.eventhubs.namespace String Nilai Namespace Layanan Azure Event Hubs, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name String Nama domain nilai Namespace Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address String Alamat Titik Akhir Kustom.
spring.cloud.azure.eventhubs.shared-connection Boolean Apakah EventProcessorClient dan EventHubProducerAsyncClient yang mendasarinya menggunakan koneksi yang sama. Secara default, koneksi baru dibuat dan digunakan untuk setiap klien Event Hub yang dibuat.

Properti Konfigurasi Titik Pemeriksaan

Bagian ini berisi opsi konfigurasi untuk layanan Storage Blobs, yang digunakan untuk mempertahankan kepemilikan partisi dan informasi titik pemeriksaan.

Catatan

Dari versi 4.0.0, ketika properti spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tidak diaktifkan secara manual, tidak ada kontainer Storage yang akan dibuat secara otomatis.

Titik pemeriksaan properti yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-eventhubs:

Properti Tipe Deskripsi
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolean Apakah akan mengizinkan pembuatan kontainer jika tidak ada.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name String Nama untuk akun penyimpanan.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key String Kunci akses akun penyimpanan.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name String Nama kontainer penyimpanan.

Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk penyimpanan titik pemeriksaan Blob Penyimpanan. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan spring.cloud.azure. terpadu atau awalan spring.cloud.azure.eventhubs.processor.checkpoint-store.

Properti konfigurasi prosesor Azure Event Hub

EventHubsInboundChannelAdapter menggunakan EventProcessorClient untuk menggunakan pesan dari pusat aktivitas, untuk mengonfigurasi properti keseluruhan , EventProcessorClientpengembang dapat menggunakan EventHubsContainerProperties untuk konfigurasi. Lihat bagian berikut tentang cara bekerja dengan EventHubsInboundChannelAdapter.

Penggunaan dasar

Mengirim pesan ke Azure Event Hubs

  1. Isi opsi konfigurasi kredensial.

    • Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Catatan

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  1. Buat DefaultMessageHandler dengan biji EventHubsTemplate untuk mengirim pesan ke Azure Event Hubs.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Buat pengikatan gateway pesan dengan handler pesan di atas melalui saluran pesan.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Mengirim pesan menggunakan gateway.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Menerima pesan dari Azure Event Hubs

  1. Isi opsi konfigurasi kredensial.

  2. Buat biji saluran pesan sebagai saluran input.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Buat EventHubsInboundChannelAdapter dengan biji EventHubsMessageListenerContainer untuk menerima pesan dari Azure Event Hubs.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Buat pengikatan penerima pesan dengan EventHubsInboundChannelAdapter melalui saluran pesan yang dibuat sebelumnya.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Mengonfigurasi EventHubsMessageConverter untuk menyesuaikan objectMapper

EventHubsMessageConverter dibuat sebagai kacang yang dapat dikonfigurasi untuk memungkinkan pengguna menyesuaikan ObjectMapper.

Dukungan konsumen batch

Untuk menggunakan pesan dari Azure Event Hubs dalam batch mirip dengan sampel di atas, selain itu pengguna harus mengatur opsi konfigurasi terkait yang menggunakan batch untuk EventHubsInboundChannelAdapter.

Saat membuat EventHubsInboundChannelAdapter, mode pendengar harus diatur sebagai BATCH. Saat membuat biji EventHubsMessageListenerContainer, atur mode titik pemeriksaan sebagai MANUAL atau BATCH, dan opsi batch dapat dikonfigurasi sesuai kebutuhan.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Header pesan Azure Event Hubs

Tabel berikut ini menggambarkan bagaimana properti pesan Azure Event Hubs dipetakan ke header pesan Spring. Untuk Azure Event Hubs, pesan disebut sebagai event.

Pemetaan antara Pesan Event Hubs / Properti Peristiwa dan Header Pesan Spring dalam Mode Pendengar Rekaman:

Properti Peristiwa Azure Event Hubs Konstanta Header Pesan Spring Jenis Deskripsi
Waktu antrean EventHubsHeaders#ENQUEUED_TIME Instan Instan, di UTC, saat peristiwa diantrekan di partisi Event Hub.
Offset EventHubsHeaders#OFFSET Panjang Offset peristiwa saat diterima dari partisi Event Hub terkait.
Kunci partisi AzureHeaders#PARTITION_KEY String Kunci hash partisi jika diatur saat awalnya menerbitkan peristiwa.
ID partisi AzureHeaders#RAW_PARTITION_ID String ID partisi Pusat Aktivitas.
Nomor urut EventHubsHeaders#SEQUENCE_NUMBER Panjang Nomor urutan yang ditetapkan ke peristiwa saat diantrekan di partisi Event Hub terkait.
Properti peristiwa antrean terakhir EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Properti peristiwa antrean terakhir dalam partisi ini.
NA AzureHeaders#CHECKPOINTER Titik pemeriksaan Header untuk titik pemeriksaan pesan tertentu.

Pengguna dapat mengurai header pesan untuk informasi terkait dari setiap peristiwa. Untuk mengatur header pesan untuk peristiwa, semua header yang dikustomisasi akan dimasukkan sebagai properti aplikasi dari suatu peristiwa, di mana header diatur sebagai kunci properti. Ketika peristiwa diterima dari Azure Event Hubs, semua properti aplikasi akan dikonversi ke header pesan.

Catatan

Header pesan kunci partisi, waktu antrean, offset, dan nomor urut tidak didukung untuk diatur secara manual.

Saat mode konsumen batch diaktifkan, header tertentu dari pesan batch dicantumkan sebagai berikut, yang berisi daftar nilai dari setiap peristiwa Event Hubs tunggal.

Pemetaan antara Pesan Event Hubs / Properti Peristiwa dan Header Pesan Spring dalam Mode Pendengar Batch:

Properti Peristiwa Azure Event Hubs Konstanta Header Pesan Spring Batch Jenis Deskripsi
Waktu antrean EventHubsHeaders#ENQUEUED_TIME Daftar Instan Daftar instan, di UTC, saat setiap peristiwa diantrekan di partisi Event Hub.
Offset EventHubsHeaders#OFFSET Daftar Panjang Daftar offset setiap peristiwa saat diterima dari partisi Event Hub terkait.
Kunci partisi AzureHeaders#PARTITION_KEY Daftar String Daftar kunci hashing partisi jika ditetapkan saat awalnya menerbitkan setiap peristiwa.
Nomor urut EventHubsHeaders#SEQUENCE_NUMBER Daftar Panjang Daftar nomor urut yang ditetapkan untuk setiap peristiwa saat diantrekan di partisi Event Hub terkait.
Properti sistem EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Daftar Peta Daftar properti sistem dari setiap peristiwa.
Properti aplikasi EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Daftar Peta Daftar properti aplikasi dari setiap peristiwa, tempat semua header pesan atau properti peristiwa yang dikustomisasi ditempatkan.

Catatan

Saat menerbitkan pesan, semua header batch di atas akan dihapus dari pesan jika ada.

Sampel

Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.

Integrasi Spring dengan Azure Service Bus

Konsep kunci

Integrasi Spring memungkinkan olahpesan ringan dalam aplikasi berbasis Spring dan mendukung integrasi dengan sistem eksternal melalui adaptor deklaratif.

Proyek ekstensi Spring Integration for Azure Bus Layanan menyediakan adaptor saluran masuk dan keluar untuk Azure Bus Layanan.

Catatan

API dukungan CompletableFuture telah tidak digunakan lagi dari versi 2.10.0, dan digantikan oleh Reactor Core dari versi 4.0.0. Lihat Javadoc untuk detailnya.

Penyiapan dependensi

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Konfigurasi

Starter ini menyediakan 2 bagian opsi konfigurasi berikut:

properti konfigurasi Koneksi ion

Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Bus Layanan.

Catatan

Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.

Koneksi si properti yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-servicebus:

Properti Tipe Deskripsi
spring.cloud.azure.servicebus.enabled Boolean Apakah Azure Bus Layanan diaktifkan.
spring.cloud.azure.servicebus.connection-string String Bus Layanan nilai string koneksi Namespace.
spring.cloud.azure.servicebus.namespace String Bus Layanan nilai Namespace, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name String Nama domain nilai Azure Bus Layanan Namespace.

Bus Layanan properti konfigurasi prosesor

ServiceBusInboundChannelAdapter menggunakan ServiceBusProcessorClient untuk menggunakan pesan, untuk mengonfigurasi properti keseluruhan , ServiceBusProcessorClientpengembang dapat menggunakan ServiceBusContainerProperties untuk konfigurasi. Lihat bagian berikut tentang cara bekerja dengan ServiceBusInboundChannelAdapter.

Penggunaan dasar

Mengirim pesan ke Azure Bus Layanan

  1. Isi opsi konfigurasi kredensial.

    • Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Catatan

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  • Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Catatan

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  1. Buat DefaultMessageHandler dengan ServiceBusTemplate biji untuk mengirim pesan ke Bus Layanan, atur jenis entitas untuk ServiceBusTemplate. Sampel ini mengambil antrean Bus Layanan sebagai contoh.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Buat pengikatan gateway pesan dengan handler pesan di atas melalui saluran pesan.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Mengirim pesan menggunakan gateway.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Menerima pesan dari Azure Bus Layanan

  1. Isi opsi konfigurasi kredensial.

  2. Buat biji saluran pesan sebagai saluran input.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Buat ServiceBusInboundChannelAdapter dengan biji ServiceBusMessageListenerContainer untuk menerima pesan ke Bus Layanan. Sampel ini mengambil antrean Bus Layanan sebagai contoh.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Buat pengikatan penerima pesan dengan ServiceBusInboundChannelAdapter melalui saluran pesan yang kami buat sebelumnya.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Mengonfigurasi ServiceBusMessageConverter untuk menyesuaikan objectMapper

ServiceBusMessageConverter dibuat sebagai kacang yang dapat dikonfigurasi untuk memungkinkan pengguna menyesuaikan ObjectMapper.

Bus Layanan header pesan

Untuk beberapa header Bus Layanan yang dapat dipetakan ke beberapa konstanta header Spring, prioritas header Spring yang berbeda tercantum.

Pemetaan antara Header Bus Layanan dan Header Spring:

Bus Layanan header dan properti pesan Konstanta header pesan Spring Jenis Dapat dikonfigurasi Deskripsi
Jenis konten MessageHeaders#CONTENT_TYPE String Ya Pendeskripsi Tipe Konten RFC2045 pesan.
ID Korelasi ServiceBusMessageHeaders#CORRELATION_ID String Ya ID korelasi pesan
ID Pesan ServiceBusMessageHeaders#MESSAGE_ID String Ya ID pesan pesan, header ini memiliki prioritas yang lebih tinggi daripada MessageHeaders#ID.
ID Pesan MessageHeaders#ID UUID Ya ID pesan pesan, header ini memiliki prioritas yang lebih rendah daripada ServiceBusMessageHeaders#MESSAGE_ID.
Kunci partisi ServiceBusMessageHeaders#PARTITION_KEY String Ya Kunci partisi untuk mengirim pesan ke entitas yang dipartisi.
Balas ke MessageHeaders#REPLY_CHANNEL String Ya Alamat entitas yang akan dikirimi balasan.
Balas ke ID sesi ServiceBusMessageHeaders#REPLY_TO_SESSION_ID String Ya Nilai properti ReplyToGroupId pesan.
Waktu antrean terjadwal utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Ya Tanggalwaktu di mana pesan harus diantrekan dalam Bus Layanan, header ini memiliki prioritas yang lebih tinggi daripada AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Waktu antrean terjadwal utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Bilangan bulat Ya Tanggalwaktu di mana pesan harus diantrekan dalam Bus Layanan, header ini memiliki prioritas yang lebih rendah daripada ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
ID Sesi ServiceBusMessageHeaders#SESSION_ID String Ya IDentifier sesi untuk entitas yang sadar sesi.
Waktu untuk aktif ServiceBusMessageHeaders#TIME_TO_LIVE Durasi Ya Durasi waktu sebelum pesan ini kedaluwarsa.
Untuk ServiceBusMessageHeaders#TO String Ya Alamat "ke" pesan, dicadangkan untuk digunakan di masa mendatang dalam skenario perutean dan saat ini diabaikan oleh broker itu sendiri.
Subjek ServiceBusMessageHeaders#SUBJECT String Ya Subjek untuk pesan.
Deskripsi kesalahan surat mati ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION String No Deskripsi untuk pesan yang telah di-dead-letter.
Alasan surat mati ServiceBusMessageHeaders#DEAD_LETTER_REASON String No Alasan pesan itu surat mati.
Sumber surat mati ServiceBusMessageHeaders#DEAD_LETTER_SOURCE String No Entitas tempat pesan di-dead-letter.
Jumlah pengiriman ServiceBusMessageHeaders#DELIVERY_COUNT long No Berapa kali pesan ini dikirimkan kepada klien.
Nomor urutan yang diantrekan ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER long No Nomor urutan antrean yang ditetapkan ke pesan dengan Bus Layanan.
Waktu antrean ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime No Tanggalwaktu di mana pesan ini diantrekan dalam Bus Layanan.
Kedaluwarsa pada ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime No Tanggalwaktu saat pesan ini akan kedaluwarsa.
Kunci token ServiceBusMessageHeaders#LOCK_TOKEN String No Token kunci untuk pesan saat ini.
Terkunci hingga ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime No Tanggalwaktu saat kunci pesan ini kedaluwarsa.
Nomor urut ServiceBusMessageHeaders#SEQUENCE_NUMBER long No Nomor unik yang ditetapkan ke pesan menurut Bus Layanan.
Provinsi ServiceBusMessageHeaders#STATE ServiceBusMessageState No Status pesan, yang dapat Aktif, Ditangguhkan, atau Dijadwalkan.

Dukungan kunci partisi

Pemula ini mendukung partisi Bus Layanan dengan mengizinkan pengaturan kunci partisi dan ID sesi di header pesan. Bagian ini memperkenalkan cara mengatur kunci partisi untuk pesan.

Disarankan: Gunakan ServiceBusMessageHeaders.PARTITION_KEY sebagai kunci header.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Tidak disarankan tetapi saat ini didukung:AzureHeaders.PARTITION_KEY sebagai kunci header.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Catatan

Ketika keduanya ServiceBusMessageHeaders.PARTITION_KEY dan AzureHeaders.PARTITION_KEY diatur di header pesan, ServiceBusMessageHeaders.PARTITION_KEY lebih disukai.

Dukungan sesi

Contoh ini menunjukkan cara mengatur ID sesi pesan secara manual dalam aplikasi.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Catatan

ServiceBusMessageHeaders.SESSION_ID Ketika diatur di header pesan, dan header yang berbeda ServiceBusMessageHeaders.PARTITION_KEY juga diatur, nilai ID sesi akhirnya akan digunakan untuk menimpa nilai kunci partisi.

Sampel

Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.

Integrasi Spring dengan Antrean Azure Storage

Konsep kunci

Azure Queue Storage adalah layanan untuk menyimpan pesan dalam jumlah besar. Anda bisa mengakses pesan dari mana saja di dunia melalui panggilan terautentikasi menggunakan HTTP atau HTTPS. Pesan antrean dapat berukuran hingga 64 KB. Antrean dapat berisi jutaan pesan, hingga mencapai batas kapasitas total dari akun penyimpanan. Antrean umumnya digunakan untuk membuat backlog pekerjaan untuk diproses secara asinkron.

Penyiapan dependensi

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Konfigurasi

Starter ini menyediakan opsi konfigurasi berikut:

properti konfigurasi Koneksi ion

Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Antrean Azure Storage.

Catatan

Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.

Koneksi ion properti yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-storage-queue:

Properti Tipe Deskripsi
spring.cloud.azure.storage.queue.enabled Boolean Apakah Antrean Azure Storage diaktifkan.
spring.cloud.azure.storage.queue.connection-string String Nilai string koneksi Ruang Nama Antrean Penyimpanan.
spring.cloud.azure.storage.queue.accountName String Nama akun Antrean Penyimpanan.
spring.cloud.azure.storage.queue.accountKey String Kunci akun Antrean Penyimpanan.
spring.cloud.azure.storage.queue.endpoint String Titik akhir layanan Antrean Penyimpanan.
spring.cloud.azure.storage.queue.sasToken String Kredensial token Sas
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion yang digunakan saat membuat permintaan API.
spring.cloud.azure.storage.queue.messageEncoding String Pengodean pesan antrean.

Penggunaan dasar

Mengirim pesan ke Antrean Azure Storage

  1. Isi opsi konfigurasi kredensial.

    • Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Catatan

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  • Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Catatan

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  1. Buat DefaultMessageHandler dengan biji StorageQueueTemplate untuk mengirim pesan ke Antrean Penyimpanan.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Buat pengikatan gateway Pesan dengan handler pesan di atas melalui saluran pesan.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Mengirim pesan menggunakan gateway.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Menerima pesan dari Antrean Azure Storage

  1. Isi opsi konfigurasi kredensial.

  2. Buat biji saluran pesan sebagai saluran input.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Buat StorageQueueMessageSource dengan biji StorageQueueTemplate untuk menerima pesan ke Antrean Penyimpanan.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Buat pengikatan penerima pesan dengan StorageQueueMessageSource yang dibuat di langkah terakhir melalui saluran pesan yang kami buat sebelumnya.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Sampel

Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.