Bagikan melalui


Dukungan Spring Cloud Azure untuk Integrasi Spring

Spring Integration Extension for Azure menyediakan adaptor Integrasi Spring untuk berbagai layanan yang disediakan oleh Azure SDK untuk Java. Kami menyediakan dukungan Integrasi Spring untuk layanan Azure ini: Event Hubs, Service Bus, Storage Queue. Berikut ini adalah daftar adaptor yang didukung:

Integrasi Spring dengan Azure Event Hubs

Konsep utama

Azure Event Hubs adalah platform streaming big data dan layanan penyerapan peristiwa. Ini dapat menerima dan memproses jutaan peristiwa per detik. Data yang dikirim ke pusat aktivitas dapat diubah dan disimpan dengan menggunakan penyedia analitik real-time atau adaptor batching/penyimpanan.

Integrasi Spring memungkinkan olahpesan ringan dalam aplikasi berbasis Spring dan mendukung integrasi dengan sistem eksternal melalui adaptor deklaratif. Adaptor tersebut memberikan tingkat abstraksi yang lebih tinggi atas dukungan Spring untuk jarak jauh, olahpesan, dan penjadwalan. Proyek ekstensi Spring Integration for Event Hubs menyediakan adaptor dan gateway saluran masuk dan keluar untuk Azure Event Hubs.

Nota

API dukungan RxJava dihilangkan dari versi 4.0.0. Lihat Javadoc untuk detailnya.

Grup konsumen

Azure Event Hubs memberikan dukungan serupa dari grup konsumen sebagai Apache Kafka, tetapi dengan sedikit logika yang berbeda. Meskipun Kafka menyimpan semua offset yang diterapkan di broker, Anda harus menyimpan offset pesan Azure Event Hubs yang diproses secara manual. Azure Event Hubs SDK menyediakan fungsi untuk menyimpan offset tersebut di dalam Azure Storage.

Dukungan partisi

Event Hubs menyediakan konsep partisi fisik yang serupa dengan Kafka. Tetapi tidak seperti penyeimbangan ulang otomatis Kafka antara konsumen dan partisi, Azure Event Hubs menyediakan semacam mode preemptive. Akun penyimpanan bertindak sebagai sewa untuk menentukan partisi mana yang dimiliki oleh konsumen mana. Ketika konsumen baru dimulai, ia akan mencoba mencuri beberapa partisi dari sebagian besar konsumen yang dimuat berat untuk mencapai keseimbangan beban kerja.

Untuk menentukan strategi penyeimbangan beban, pengembang dapat menggunakan EventHubsContainerProperties untuk konfigurasi. Lihat bagian berikut untuk contoh cara mengonfigurasi EventHubsContainerProperties.

Dukungan konsumen batch

EventHubsInboundChannelAdapter mendukung mode penggunaan batch. Untuk mengaktifkannya, pengguna dapat menentukan mode pendengar sebagai ListenerMode.BATCH saat membuat instans EventHubsInboundChannelAdapter. Saat diaktifkan, Pesan di mana payload adalah daftar peristiwa batch akan diterima dan diteruskan ke saluran hilir. Setiap header pesan juga dikonversi sebagai daftar, yang kontennya adalah nilai header terkait yang diurai dari setiap peristiwa. Untuk header komunal ID partisi, checkpointer, dan properti antrean terakhir, mereka disajikan sebagai nilai tunggal untuk seluruh batch peristiwa berbagi yang sama. Untuk informasi selengkapnya, lihat bagian Header Pesan Azure Event Hubs .

Nota

Header titik pemeriksaan hanya ada ketika mode titik pemeriksaan MANUAL digunakan.

Titik pemeriksaan konsumen batch mendukung dua mode: BATCH dan MANUAL. mode BATCH adalah mode titik pemeriksaan otomatis untuk memeriksa seluruh batch peristiwa bersama-sama setelah diterima. mode MANUAL adalah memeriksa peristiwa oleh pengguna. Saat digunakan, Checkpointer akan diteruskan ke header pesan, dan pengguna dapat menggunakannya untuk melakukan titik pemeriksaan.

Kebijakan penggunaan batch dapat ditentukan oleh properti max-size dan max-wait-time, di mana max-size adalah properti yang diperlukan sementara max-wait-time bersifat opsional. Untuk menentukan strategi penggunaan batch, pengembang dapat menggunakan EventHubsContainerProperties untuk konfigurasi. Lihat bagian berikut untuk contoh cara mengonfigurasi EventHubsContainerProperties.

Penyiapan dependensi

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Konfigurasi

Starter ini menyediakan 3 bagian opsi konfigurasi berikut:

Properti Konfigurasi Koneksi

Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Event Hubs.

Nota

Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.

Properti koneksi yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-eventhubs:

Harta benda Jenis Deskripsi
spring.cloud.azure.eventhubs.enabled Boolean Apakah Azure Event Hubs diaktifkan.
spring.cloud.azure.eventhubs.connection-string Tali Nilai string koneksi Namespace Layanan Event Hubs.
spring.cloud.azure.eventhubs.namespace Tali Nilai Namespace Layanan Azure Event Hubs, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName
musim semi.cloud.azure.eventhubs.nama-domain Tali Nama domain nilai Namespace Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Tali Alamat Titik Akhir Kustom.
musim semi.cloud.azure.eventhubs.shared-connection Boolean (tipe data yang hanya memiliki dua nilai: true atau false) Apakah EventProcessorClient dan EventHubProducerAsyncClient yang mendasarinya menggunakan koneksi yang sama. Secara default, koneksi baru dibuat dan digunakan untuk setiap klien Event Hub yang dibuat.

Properti Konfigurasi Titik Pemeriksaan

Bagian ini berisi opsi konfigurasi untuk layanan Storage Blobs, yang digunakan untuk mempertahankan kepemilikan partisi dan informasi titik pemeriksaan.

Nota

Dari versi 4.0.0, saat properti spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tidak diaktifkan secara manual, tidak ada kontainer Storage yang akan dibuat secara otomatis.

Titik pemeriksaan properti yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-eventhubs:

Harta benda Jenis Deskripsi
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolean (tipe data yang hanya memiliki dua nilai: true atau false) Apakah akan mengizinkan pembuatan kontainer jika tidak ada.
musim semi.cloud.azure.eventhubs.processor.checkpoint-store.account-name Tali Nama untuk akun penyimpanan.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Tali Kunci akses akun penyimpanan.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Tali Nama kontainer penyimpanan.

Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk penyimpanan titik pemeriksaan Blob Penyimpanan. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan terpadu spring.cloud.azure. atau awalan spring.cloud.azure.eventhubs.processor.checkpoint-store.

Properti konfigurasi prosesor Azure Event Hub

EventHubsInboundChannelAdapter menggunakan EventProcessorClient untuk menggunakan pesan dari hub peristiwa, untuk mengonfigurasi properti keseluruhan EventProcessorClient, pengembang dapat menggunakan EventHubsContainerProperties untuk konfigurasi. Lihat bagian berikut tentang cara bekerja dengan EventHubsInboundChannelAdapter.

Penggunaan dasar

Mengirim pesan ke Azure Event Hubs

  1. Isi opsi konfigurasi kredensial.

    • Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      

      Nota

      Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.

    • Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Nota

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  1. Buat DefaultMessageHandler dengan kacang EventHubsTemplate untuk mengirim pesan ke Azure Event Hubs.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Buat pengikatan gateway pesan dengan handler pesan di atas melalui saluran pesan.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Mengirim pesan menggunakan gateway.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Menerima pesan dari Azure Event Hubs

  1. Isi opsi konfigurasi kredensial.

  2. Buat biji saluran pesan sebagai saluran input.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Buat EventHubsInboundChannelAdapter dengan kacang EventHubsMessageListenerContainer untuk menerima pesan dari Azure Event Hubs.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Buat pengikatan penerima pesan dengan EventHubsInboundChannelAdapter melalui saluran pesan yang dibuat sebelumnya.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Mengonfigurasi EventHubsMessageConverter untuk menyesuaikan objectMapper

EventHubsMessageConverter dibuat sebagai kacang yang dapat dikonfigurasi untuk memungkinkan pengguna menyesuaikan ObjectMapper.

Dukungan konsumen batch

Untuk menggunakan pesan dari Azure Event Hubs dalam batch mirip dengan sampel di atas, selain itu pengguna harus mengatur opsi konfigurasi terkait yang menggunakan batch untuk EventHubsInboundChannelAdapter.

Saat membuat EventHubsInboundChannelAdapter, mode pendengar harus diatur sebagai BATCH. Saat membuat biji EventHubsMessageListenerContainer, atur mode titik pemeriksaan sebagai MANUAL atau BATCH, dan opsi batch dapat dikonfigurasi sesuai kebutuhan.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Header pesan Azure Event Hubs

Tabel berikut ini menggambarkan bagaimana properti pesan Azure Event Hubs dipetakan ke header pesan Spring. Untuk Azure Event Hubs, pesan disebut sebagai event.

Pemetaan antara Pesan Event Hubs / Properti Peristiwa dan Header Pesan Spring dalam Mode Pendengar Rekaman:

Properti Peristiwa Azure Event Hubs Konstanta Header Pesan Spring Jenis Deskripsi
Waktu antrean EventHubsHeaders#ENQUEUED_TIME Instan Instan, di UTC, saat peristiwa diantrekan di partisi Event Hub.
Offset EventHubsHeaders#OFFSET Panjang Offset peristiwa saat diterima dari partisi Event Hub terkait.
Kunci partisi AzureHeaders#PARTITION_KEY Tali Kunci hash partisi jika diatur saat awalnya menerbitkan peristiwa.
ID Partisi AzureHeaders#RAW_PARTITION_ID Tali ID partisi Pusat Aktivitas.
Nomor urut EventHubsHeaders#SEQUENCE_NUMBER Panjang Nomor urutan yang ditetapkan ke peristiwa saat diantrekan di partisi Event Hub terkait.
Properti peristiwa antrean terakhir EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Properti peristiwa antrean terakhir dalam partisi ini.
NA AzureHeaders#CHECKPOINTER Titik pemeriksaan Header untuk titik pemeriksaan pesan tertentu.

Pengguna dapat mengurai header pesan untuk informasi terkait dari setiap peristiwa. Untuk mengatur header pesan untuk peristiwa, semua header yang dikustomisasi akan dimasukkan sebagai properti aplikasi dari suatu peristiwa, di mana header diatur sebagai kunci properti. Ketika peristiwa diterima dari Azure Event Hubs, semua properti aplikasi akan dikonversi ke header pesan.

Nota

Header pesan kunci partisi, waktu antrean, offset, dan nomor urut tidak didukung untuk diatur secara manual.

Saat mode konsumen batch diaktifkan, header tertentu dari pesan batch dicantumkan sebagai berikut, yang berisi daftar nilai dari setiap peristiwa Event Hubs tunggal.

Pemetaan antara Pesan Event Hubs / Properti Peristiwa dan Header Pesan Spring dalam Mode Pendengar Batch:

Properti Peristiwa Azure Event Hubs Konstanta Header Pesan Spring Batch Jenis Deskripsi
Waktu antrean EventHubsHeaders#ENQUEUED_TIME Daftar Instan Daftar instan, di UTC, saat setiap peristiwa diantrekan di partisi Event Hub.
Offset EventHubsHeaders#OFFSET Daftar Panjang Daftar offset setiap peristiwa saat diterima dari partisi Event Hub terkait.
Kunci partisi AzureHeaders#PARTITION_KEY Daftar String Daftar kunci hashing partisi jika ditetapkan saat awalnya menerbitkan setiap peristiwa.
Nomor urut EventHubsHeaders#SEQUENCE_NUMBER Daftar Panjang Daftar nomor urut yang ditetapkan untuk setiap peristiwa saat diantrekan di partisi Event Hub terkait.
Properti sistem EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Daftar Peta Daftar properti sistem dari setiap peristiwa.
Properti aplikasi EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Daftar Peta Daftar properti aplikasi dari setiap peristiwa, tempat semua header pesan atau properti peristiwa yang dikustomisasi ditempatkan.

Nota

Saat menerbitkan pesan, semua header batch di atas akan dihapus dari pesan jika ada.

Sampel

Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.

Integrasi Spring dengan Azure Service Bus

Konsep utama

Integrasi Spring memungkinkan olahpesan ringan dalam aplikasi berbasis Spring dan mendukung integrasi dengan sistem eksternal melalui adaptor deklaratif.

Proyek ekstensi Spring Integration for Azure Service Bus menyediakan adaptor saluran masuk dan keluar untuk Azure Service Bus.

Nota

API dukungan CompletableFuture telah tidak digunakan lagi dari versi 2.10.0, dan digantikan oleh Reactor Core dari versi 4.0.0. Lihat Javadoc untuk detailnya.

Penyiapan dependensi

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Konfigurasi

Starter ini menyediakan 2 bagian opsi konfigurasi berikut:

Properti konfigurasi koneksi

Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Service Bus.

Nota

Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.

Properti koneksi yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-servicebus:

Harta benda Jenis Deskripsi
spring.cloud.azure.servicebus.enabled Boolean Apakah Azure Service Bus diaktifkan.
spring.cloud.azure.servicebus.connection-string Tali Nilai string koneksi Namespace Service Bus.
spring.cloud.azure.servicebus.custom-endpoint-address Tali Alamat titik akhir kustom yang akan digunakan saat menyambungkan ke Azure Service Bus.
spring.cloud.azure.servicebus.namespace Tali Nilai Namespace Service Bus, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName
musim semi.cloud.azure.servicebus.domain-name Tali Nama domain nilai Namespace Azure Service Bus.

Properti konfigurasi prosesor Azure Service Bus

ServiceBusInboundChannelAdapter menggunakan ServiceBusProcessorClient untuk menggunakan pesan, untuk mengonfigurasi properti keseluruhan ServiceBusProcessorClient, pengembang dapat menggunakan ServiceBusContainerProperties untuk konfigurasi. Lihat bagian berikut tentang cara bekerja dengan ServiceBusInboundChannelAdapter.

Penggunaan dasar

Mengirim pesan ke Azure Service Bus

  1. Isi opsi konfigurasi kredensial.

    • Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      

      Nota

      Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.

    • Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Nota

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  • Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Nota

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  1. Buat DefaultMessageHandler dengan kacang ServiceBusTemplate untuk mengirim pesan ke Service Bus, atur jenis entitas untuk ServiceBusTemplate. Sampel ini mengambil Antrean Bus Layanan sebagai contoh.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Buat pengikatan gateway pesan dengan handler pesan di atas melalui saluran pesan.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Mengirim pesan menggunakan gateway.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Menerima pesan dari Azure Service Bus

  1. Isi opsi konfigurasi kredensial.

  2. Buat biji saluran pesan sebagai saluran input.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Buat ServiceBusInboundChannelAdapter dengan kacang ServiceBusMessageListenerContainer untuk menerima pesan ke Azure Service Bus. Sampel ini mengambil Antrean Bus Layanan sebagai contoh.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Buat pengikatan penerima pesan dengan ServiceBusInboundChannelAdapter melalui saluran pesan yang kami buat sebelumnya.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Mengonfigurasi ServiceBusMessageConverter untuk menyesuaikan objectMapper

ServiceBusMessageConverter dibuat sebagai kacang yang dapat dikonfigurasi untuk memungkinkan pengguna menyesuaikan ObjectMapper.

Header pesan Azure Service Bus

Untuk beberapa header Azure Service Bus yang dapat dipetakan ke beberapa konstanta header Spring, prioritas header Spring yang berbeda dicantumkan.

Pemetaan antara Header Bus Layanan dan Header Spring:

Header dan properti pesan Azure Service Bus Konstanta header pesan Spring Jenis Dikonfigurasi Deskripsi
Tipe isi MessageHeaders#CONTENT_TYPE Tali Ya Pendeskripsi Tipe Konten RFC2045 pesan.
ID Korelasi ServiceBusMessageHeaders#CORRELATION_ID Tali Ya ID korelasi pesan
ID Pesan ServiceBusMessageHeaders#MESSAGE_ID Tali Ya ID pesan pesan, header ini memiliki prioritas yang lebih tinggi daripada MessageHeaders#ID.
ID Pesan MessageHeaders#ID UUID (Pengidentifikasi Unik Universal) Ya ID pesan pesan, header ini memiliki prioritas yang lebih rendah daripada ServiceBusMessageHeaders#MESSAGE_ID.
Kunci partisi ServiceBusMessageHeaders#PARTITION_KEY Tali Ya Kunci partisi untuk mengirim pesan ke entitas yang dipartisi.
Balas ke MessageHeaders#REPLY_CHANNEL Tali Ya Alamat entitas yang akan dikirimi balasan.
Balas ke ID sesi ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Tali Ya Nilai properti ReplyToGroupId pesan.
Waktu antrean terjadwal utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Ya Tanggalwaktu di mana pesan harus diantrekan di Azure Service Bus, header ini memiliki prioritas yang lebih tinggi daripada AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Waktu antrean terjadwal utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Integer Ya Tanggalwaktu di mana pesan harus diantrekan di Azure Service Bus, header ini memiliki prioritas yang lebih rendah daripada ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
ID Sesi ServiceBusMessageHeaders#SESSION_ID Tali Ya IDentifier sesi untuk entitas yang sadar sesi.
Waktu hidup ServiceBusMessageHeaders#TIME_TO_LIVE Durasi Ya Durasi waktu sebelum pesan ini kedaluwarsa.
Ke ServiceBusMessageHeaders#TO Tali Ya Alamat "ke" pesan, dicadangkan untuk digunakan di masa mendatang dalam skenario perutean dan saat ini diabaikan oleh broker itu sendiri.
Subyek ServiceBusMessageHeaders#SUBJECT Tali Ya Subjek untuk pesan.
Deskripsi kesalahan surat mati ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Tali Tidak Deskripsi untuk pesan yang telah di-dead-letter.
Alasan surat mati ServiceBusMessageHeaders#DEAD_LETTER_REASON Tali Tidak Alasan pesan itu surat mati.
Sumber surat mati ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Tali Tidak Entitas tempat pesan di-dead-letter.
Jumlah pengiriman ServiceBusMessageHeaders#DELIVERY_COUNT panjang Tidak Berapa kali pesan ini dikirimkan kepada klien.
Nomor urutan yang diantrekan ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER panjang Tidak Nomor urutan antrean yang ditetapkan ke pesan oleh Azure Service Bus.
Waktu antrean ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Tidak Tanggalwaktu saat pesan ini diantrekan di Azure Service Bus.
Kedaluwarsa pada ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Tidak Tanggalwaktu saat pesan ini akan kedaluwarsa.
Kunci token ServiceBusMessageHeaders#LOCK_TOKEN Tali Tidak Token kunci untuk pesan saat ini.
Terkunci hingga ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Tidak Tanggalwaktu saat kunci pesan ini kedaluwarsa.
Nomor urut ServiceBusMessageHeaders#SEQUENCE_NUMBER panjang Tidak Nomor unik yang ditetapkan ke pesan oleh Azure Service Bus.
Negara ServiceBusMessageHeaders#STATE ServiceBusMessageState Tidak Status pesan, yang dapat Aktif, Ditangguhkan, atau Dijadwalkan.

Dukungan kunci partisi

Starter ini mendukung partisi Service Bus dengan mengizinkan pengaturan kunci partisi dan ID sesi di header pesan. Bagian ini memperkenalkan cara mengatur kunci partisi untuk pesan.

Disarankan: Gunakan ServiceBusMessageHeaders.PARTITION_KEY sebagai kunci header.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Tidak disarankan tetapi saat ini didukung: AzureHeaders.PARTITION_KEY sebagai kunci header.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nota

Ketika ServiceBusMessageHeaders.PARTITION_KEY dan AzureHeaders.PARTITION_KEY diatur di header pesan, ServiceBusMessageHeaders.PARTITION_KEY lebih disukai.

Dukungan sesi

Contoh ini menunjukkan cara mengatur ID sesi pesan secara manual dalam aplikasi.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nota

Ketika ServiceBusMessageHeaders.SESSION_ID diatur di header pesan, dan header ServiceBusMessageHeaders.PARTITION_KEY yang berbeda juga diatur, nilai ID sesi akhirnya akan digunakan untuk menimpa nilai kunci partisi.

Menyesuaikan properti klien Azure Service Bus

Pengembang dapat menggunakan AzureServiceClientBuilderCustomizer untuk menyesuaikan properti Klien Bus Layanan. Contoh berikut mengkustomisasi properti sessionIdleTimeout di ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Sampel

Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.

Integrasi Spring dengan Antrean Azure Storage

Konsep utama

Azure Queue Storage adalah layanan untuk menyimpan pesan dalam jumlah besar. Anda mengakses pesan dari mana saja di dunia melalui panggilan terautentikasi menggunakan HTTP atau HTTPS. Pesan antrean dapat berukuran hingga 64 KB. Antrean mungkin berisi jutaan pesan, hingga batas kapasitas total akun penyimpanan. Antrean umumnya digunakan untuk membuat backlog pekerjaan untuk diproses secara asinkron.

Penyiapan dependensi

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Konfigurasi

Starter ini menyediakan opsi konfigurasi berikut:

Properti konfigurasi koneksi

Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Antrean Azure Storage.

Nota

Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.

Properti koneksi yang dapat dikonfigurasi dari spring-cloud-azure-starter-integration-storage-queue:

Harta benda Jenis Deskripsi
musim semi.cloud.azure.storage.queue.enabled Boolean Apakah Antrean Azure Storage diaktifkan.
musim semi.cloud.azure.storage.queue.connection-string Tali Nilai string koneksi Ruang Nama Antrean Penyimpanan.
musim semi.cloud.azure.storage.queue.accountName Tali Nama akun Antrean Penyimpanan.
spring.cloud.azure.storage.queue.accountKey Tali Kunci akun Antrean Penyimpanan.
spring.cloud.azure.storage.queue.endpoint Tali Titik akhir layanan Antrean Penyimpanan.
spring.cloud.azure.storage.queue.sasToken Tali Kredensial token Sas
musim semi.cloud.azure.storage.queue.serviceVersion Versi Layanan Antrean QueueServiceVersion yang digunakan saat membuat permintaan API.
musim semi.cloud.azure.storage.queue.messageEncoding Tali Pengodean pesan antrean.

Penggunaan dasar

Mengirim pesan ke Antrean Azure Storage

  1. Isi opsi konfigurasi kredensial.

    • Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      

      Nota

      Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.

    • Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Nota

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  • Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Nota

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  1. Buat DefaultMessageHandler dengan kacang StorageQueueTemplate untuk mengirim pesan ke Antrean Penyimpanan.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Buat pengikatan gateway Pesan dengan handler pesan di atas melalui saluran pesan.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Mengirim pesan menggunakan gateway.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Menerima pesan dari Antrean Azure Storage

  1. Isi opsi konfigurasi kredensial.

  2. Buat biji saluran pesan sebagai saluran input.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Buat StorageQueueMessageSource dengan kacang StorageQueueTemplate untuk menerima pesan ke Antrean Penyimpanan.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Buat pengikatan penerima pesan dengan StorageQueueMessageSource yang dibuat di langkah terakhir melalui saluran pesan yang kami buat sebelumnya.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Sampel

Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.