Bagikan melalui


Dukungan Spring Cloud Azure untuk Spring Cloud Stream

Artikel ini berlaku untuk: ✔️ Versi 4.14.0 ✔️ Versi 5.8.0

Spring Cloud Stream adalah kerangka kerja untuk membangun layanan mikro berbasis peristiwa yang sangat dapat diskalakan yang terhubung dengan sistem olahpesan bersama.

Kerangka kerja ini menyediakan model pemrograman fleksibel yang dibangun di atas idiom Spring yang sudah mapan dan akrab serta praktik terbaik. Praktik terbaik ini termasuk dukungan untuk semantik pub/sub persisten, grup konsumen, dan partisi stateful.

Implementasi binder saat ini meliputi:

Spring Cloud Stream Binder untuk Azure Event Hubs

Konsep kunci

Spring Cloud Stream Binder untuk Azure Event Hubs menyediakan implementasi pengikatan untuk kerangka kerja Spring Cloud Stream. Implementasi ini menggunakan Adaptor Saluran Spring Integration Event Hubs pada dasarnya. Dari perspektif desain, Event Hubs mirip dengan Kafka. Selain itu, Azure Event Hubs dapat diakses melalui Kafka API. Jika proyek Anda memiliki dependensi yang ketat pada Kafka API, Anda dapat mencoba Events Hub dengan Sampel API Kafka

Grup konsumen

Azure Event Hubs memberikan dukungan serupa dari grup konsumen sebagai Apache Kafka, tetapi dengan sedikit logika yang berbeda. Meskipun Kafka menyimpan semua offset yang diterapkan di broker, Anda harus menyimpan offset pesan Azure Event Hubs yang diproses secara manual. Azure Event Hubs SDK menyediakan fungsi untuk menyimpan offset tersebut di dalam Azure Storage.

Dukungan partisi

Event Hubs menyediakan konsep partisi fisik yang serupa dengan Kafka. Tetapi tidak seperti penyeimbangan ulang otomatis Kafka antara konsumen dan partisi, Azure Event Hubs menyediakan semacam mode preemptive. Akun penyimpanan bertindak sebagai sewa untuk menentukan konsumen mana yang memiliki partisi mana. Ketika konsumen baru dimulai, ia mencoba mencuri beberapa partisi dari konsumen yang paling banyak dimuat untuk mencapai keseimbangan beban kerja.

Untuk menentukan strategi penyeimbangan beban, properti spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* disediakan. Untuk informasi selengkapnya, lihat bagian Properti konsumen .

Dukungan konsumen batch

Pengikat Spring Cloud Azure Stream Event Hubs mendukung fitur Spring Cloud Stream Batch Consumer.

Untuk bekerja dengan mode batch-consumer, atur spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode properti ke true. Saat diaktifkan, pesan dengan payload daftar peristiwa batch diterima dan diteruskan ke Consumer fungsi . Setiap header pesan juga dikonversi ke daftar, yang kontennya adalah nilai header terkait yang diurai dari setiap peristiwa. Header komunal ID partisi, checkpointer, dan properti antrean terakhir disajikan sebagai nilai tunggal karena seluruh batch peristiwa berbagi nilai yang sama. Untuk informasi selengkapnya, lihat bagian header pesan Event Hubs dari dukungan Spring Cloud Azure untuk Integrasi Spring.

Catatan

Header titik pemeriksaan hanya ada ketika MANUAL mode titik pemeriksaan digunakan.

Titik pemeriksaan konsumen batch mendukung dua mode: BATCH dan MANUAL. BATCH mode adalah mode titik pemeriksaan otomatis untuk memeriksa seluruh batch peristiwa bersama-sama setelah pengikat menerimanya. MANUAL mode adalah untuk memeriksa peristiwa oleh pengguna. Saat digunakan, Checkpointer diteruskan ke header pesan, dan pengguna dapat menggunakannya untuk melakukan titik pemeriksaan.

Anda dapat menentukan ukuran batch dengan mengatur max-size properti dan max-wait-time yang memiliki awalan spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. Properti max-size diperlukan dan max-wait-time properti bersifat opsional. Untuk informasi selengkapnya, lihat bagian Properti konsumen .

Penyiapan dependensi

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Atau, Anda juga dapat menggunakan Spring Cloud Azure Stream Event Hubs Starter, seperti yang ditunjukkan dalam contoh berikut untuk Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Konfigurasi

Pengikat menyediakan tiga bagian opsi konfigurasi berikut:

properti konfigurasi Koneksi ion

Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Event Hubs.

Catatan

Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.

Koneksi ion properti spring-cloud-azure-stream-binder-eventhubs yang dapat dikonfigurasi:

Properti Tipe Deskripsi
spring.cloud.azure.eventhubs.enabled Boolean Apakah Azure Event Hubs diaktifkan.
spring.cloud.azure.eventhubs.connection-string String Nilai string koneksi Ruang Nama Azure Event Hubs.
spring.cloud.azure.eventhubs.namespace String Nilai Namespace Layanan Azure Event Hubs, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name String Nama domain nilai Namespace Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address String Alamat Titik Akhir Kustom.

Tip

Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk pengikat Spring Cloud Azure Stream Event Hubs. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan spring.cloud.azure. terpadu atau awalan spring.cloud.azure.eventhubs..

Pengikat juga mendukung Spring Could Azure Resource Manager secara default. Untuk mempelajari tentang cara mengambil string koneksi dengan prinsip keamanan yang tidak diberikan dengan Data peran terkait, lihat bagian Penggunaan dasar Spring Could Azure Resource Manager.

Properti konfigurasi titik pemeriksaan

Bagian ini berisi opsi konfigurasi untuk layanan Storage Blobs, yang digunakan untuk mempertahankan kepemilikan partisi dan informasi titik pemeriksaan.

Catatan

Dari versi 4.0.0, ketika properti spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tidak diaktifkan secara manual, tidak ada kontainer Storage yang akan dibuat secara otomatis dengan nama dari spring.cloud.stream.bindings.binding-name.destination.

Titik pemeriksaan properti yang dapat dikonfigurasi dari spring-cloud-azure-stream-binder-eventhubs:

Properti Tipe Deskripsi
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolean Apakah akan mengizinkan pembuatan kontainer jika tidak ada.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name String Nama untuk akun penyimpanan.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key String Kunci akses akun penyimpanan.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name String Nama kontainer penyimpanan.

Tip

Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk penyimpanan titik pemeriksaan Blob Penyimpanan. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan spring.cloud.azure. terpadu atau awalan spring.cloud.azure.eventhubs.processor.checkpoint-store.

Properti konfigurasi Pengikatan Azure Event Hubs

Opsi berikut dibagi menjadi empat bagian: Properti Konsumen, Konfigurasi Konsumen Tingkat Lanjut, Properti Produsen, dan Konfigurasi Produsen Tingkat Lanjut.

Properti konsumen

Properti ini diekspos melalui EventHubsConsumerProperties.

Properti yang dapat dikonfigurasi konsumen dari spring-cloud-azure-stream-binder-eventhubs:

Properti Tipe Deskripsi
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Mode titik pemeriksaan yang digunakan saat konsumen memutuskan cara mengirim pesan titik pemeriksaan
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Bilangan bulat Memutuskan jumlah pesan untuk setiap partisi untuk melakukan satu titik pemeriksaan. Akan berlaku hanya ketika PARTITION_COUNT mode titik pemeriksaan digunakan.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Durasi Memutuskan interval waktu untuk melakukan satu titik pemeriksaan. Akan berlaku hanya ketika TIME mode titik pemeriksaan digunakan.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Bilangan bulat Jumlah maksimum peristiwa dalam batch. Diperlukan untuk mode konsumen batch.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Durasi Durasi waktu maksimum untuk mengkonsumsi batch. Akan berlaku hanya ketika mode konsumen batch diaktifkan dan bersifat opsional.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Durasi Durasi waktu interval untuk memperbarui.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy Strategi penyeimbangan beban.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Durasi Durasi waktu setelah kepemilikan partisi kedaluwarsa.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Boolean Apakah prosesor peristiwa harus meminta informasi tentang peristiwa antrean terakhir pada partisi terkait, dan melacak informasi tersebut saat peristiwa diterima.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Bilangan bulat Jumlah yang digunakan oleh konsumen untuk mengontrol jumlah peristiwa yang akan diterima dan antrean konsumen Event Hub secara aktif secara lokal.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Memetakan dengan kunci sebagai ID partisi, dan nilai StartPositionProperties Peta yang berisi posisi peristiwa yang akan digunakan untuk setiap partisi jika titik pemeriksaan untuk partisi tidak ada di penyimpanan titik pemeriksaan. Peta ini di-key off dari ID partisi.

Catatan

Konfigurasi initial-partition-event-position menerima map untuk menentukan posisi awal untuk setiap hub peristiwa. Dengan demikian, kuncinya adalah ID partisi, dan nilainya termasuk StartPositionProperties properti offset, nomor urut, waktu tanggal antrean dan apakah inklusif. Misalnya, Anda dapat mengaturnya sebagai

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Konfigurasi konsumen tingkat lanjut

Koneksi, titik pemeriksaan, dan konfigurasi klien Azure SDK umum di atas mendukung penyesuaian untuk setiap konsumen binder, yang dapat Anda konfigurasi dengan awalan spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

Properti produsen

Properti ini diekspos melalui EventHubsProducerProperties.

Properti yang dapat dikonfigurasi produsen dari spring-cloud-azure-stream-binder-eventhubs:

Properti Tipe Deskripsi
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync Boolean Bendera sakelar untuk sinkronisasi produsen. Jika true, produser akan menunggu respons setelah operasi pengiriman.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout long Jumlah waktu untuk menunggu respons setelah operasi pengiriman. Akan berlaku hanya ketika produsen sinkronisasi diaktifkan.
Konfigurasi produsen tingkat lanjut

Koneksi di atas dan konfigurasi klien Azure SDK umum mendukung kustomisasi untuk setiap produsen binder, yang dapat Anda konfigurasi dengan awalan spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

Penggunaan dasar

Mengirim dan menerima pesan dari/ke Azure Event Hubs

  1. Isi opsi konfigurasi dengan informasi kredensial.

    • Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Catatan

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  • Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Tentukan pemasok dan konsumen.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Dukungan partisi

Dengan PartitionSupplier informasi partisi yang disediakan pengguna dibuat untuk mengonfigurasi informasi partisi tentang pesan yang akan dikirim. Diagram alur berikut menunjukkan proses mendapatkan prioritas yang berbeda untuk ID dan kunci partisi:

Diagram showing a flowchart of the partitioning support process.

Dukungan konsumen batch

  1. Berikan opsi konfigurasi batch, seperti yang ditunjukkan dalam contoh berikut:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Tentukan pemasok dan konsumen.

    Untuk mode titik pemeriksaan sebagai BATCH, Anda dapat menggunakan kode berikut untuk mengirim pesan dan menggunakan dalam batch.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Untuk mode titik pemeriksaan sebagai MANUAL, Anda dapat menggunakan kode berikut untuk mengirim pesan dan menggunakan/titik pemeriksaan dalam batch.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Catatan

Dalam mode penggunaan batch, jenis konten default pengikat Spring Cloud Stream adalah application/json, jadi pastikan payload pesan selaras dengan jenis konten. Misalnya, saat menggunakan jenis application/json konten default untuk menerima pesan dengan String payload, payload harus , dikelilingi JSON Stringdengan tanda kutip ganda untuk teks asli String . Sementara untuk text/plain jenis konten, itu bisa menjadi objek secara String langsung. Untuk informasi selengkapnya, lihat Negosiasi Jenis Konten Spring Cloud Stream.

Menangani pesan kesalahan

  • Menangani pesan kesalahan pengikatan keluar

    Secara default, Integrasi Spring membuat saluran kesalahan global yang disebut errorChannel. Konfigurasikan titik akhir pesan berikut untuk menangani pesan kesalahan pengikatan keluar:

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Menangani pesan kesalahan pengikatan masuk

    Spring Cloud Stream Event Hubs Binder mendukung dua solusi untuk menangani kesalahan untuk pengikatan pesan masuk: saluran kesalahan kustom dan handler.

    Saluran kesalahan:

    Spring Cloud Stream menyediakan saluran kesalahan untuk setiap pengikatan masuk. Dikirim ErrorMessage ke saluran kesalahan. Untuk informasi selengkapnya, lihat Menangani Kesalahan dalam dokumentasi Spring Cloud Stream.

    • Saluran kesalahan default

      Anda dapat menggunakan saluran kesalahan global bernama errorChannel untuk menggunakan semua pesan kesalahan pengikatan masuk. Untuk menangani pesan ini, konfigurasikan titik akhir pesan berikut:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Saluran kesalahan khusus pengikatan

      Anda dapat menggunakan saluran kesalahan tertentu untuk menggunakan pesan kesalahan pengikatan masuk tertentu dengan prioritas yang lebih tinggi daripada saluran kesalahan default. Untuk menangani pesan ini, konfigurasikan titik akhir pesan berikut:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Catatan

      Saluran kesalahan khusus pengikatan saling eksklusif dengan penanganan kesalahan dan saluran lain yang disediakan.

    Penanganan Kesalahan:

    Spring Cloud Stream memaparkan mekanisme bagi Anda untuk menyediakan penangan kesalahan kustom dengan menambahkan Consumer yang menerima instans ErrorMessage . Untuk informasi selengkapnya, lihat Penanganan Kesalahan dalam dokumentasi Spring Cloud Stream.

    Catatan

    Ketika penangan kesalahan pengikatan dikonfigurasi, ini dapat berfungsi dengan saluran kesalahan default.

    • Handler kesalahan pengikatan-default

      Konfigurasikan satu Consumer biji untuk mengonsumsi semua pesan kesalahan pengikatan masuk. Fungsi default berikut berlangganan ke setiap saluran kesalahan pengikatan masuk:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Anda juga perlu mengatur properti ke spring.cloud.stream.default.error-handler-definition nama fungsi.

    • Penangan kesalahan khusus pengikatan

      Konfigurasikan Consumer biji untuk mengonsumsi pesan kesalahan pengikatan masuk tertentu. Fungsi berikut berlangganan saluran kesalahan pengikatan masuk tertentu dan memiliki prioritas yang lebih tinggi daripada handler kesalahan pengikatan-default:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Anda juga perlu mengatur properti ke spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition nama fungsi.

Header pesan Azure Event Hubs

Untuk header pesan dasar yang didukung, lihat bagian header pesan Event Hubs dari dukungan Spring Cloud Azure untuk Integrasi Spring.

Dukungan beberapa pengikat

Koneksi ke beberapa namespace Azure Event Hubs juga didukung dengan menggunakan beberapa pengikat. Sampel ini mengambil string koneksi sebagai contoh. Kredensial perwakilan layanan dan identitas terkelola juga didukung. Anda dapat mengatur properti terkait di pengaturan lingkungan setiap binder.

  1. Untuk menggunakan beberapa pengikat dengan Azure Event Hubs, konfigurasikan properti berikut dalam file application.yml Anda:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Catatan

    File aplikasi sebelumnya menunjukkan cara mengonfigurasi satu poller default untuk aplikasi ke semua pengikatan. Jika Anda ingin mengonfigurasi poller untuk pengikatan tertentu, Anda dapat menggunakan konfigurasi seperti spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Kita perlu mendefinisikan dua pemasok dan dua konsumen:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Provisi sumber daya

Binder Azure Event Hubs mendukung provisi pusat aktivitas dan grup konsumen, pengguna dapat menggunakan properti berikut untuk mengaktifkan provisi.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Catatan

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

Sampel

Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.

Pengikat Stream Cloud Spring untuk Azure Service Bus

Konsep kunci

Spring Cloud Stream Binder untuk Azure Bus Layanan menyediakan implementasi pengikatan untuk Spring Cloud Stream Framework. Implementasi ini menggunakan Spring Integration Bus Layanan Channel Adapters pada dasarnya.

Pesan terjadwal

Pengikat ini mendukung pengiriman pesan ke topik untuk pemrosesan tertunda. Pengguna dapat mengirim pesan terjadwal dengan header x-delay yang mengekspresikan dalam milidetik waktu penundaan untuk pesan tersebut. Pesan akan dikirimkan ke topik masing-masing setelah x-delay milidetik.

Grup konsumen

Bus Layanan Topic memberikan dukungan serupa dari kelompok konsumen sebagai Apache Kafka, tetapi dengan logika yang sedikit berbeda. Pengikat ini bergantung pada Subscription topik untuk bertindak sebagai grup konsumen.

Penyiapan dependensi

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Atau, Anda juga dapat menggunakan Spring Cloud Azure Stream Bus Layanan Starter, seperti yang ditunjukkan dalam contoh berikut untuk Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Konfigurasi

Pengikat menyediakan dua bagian opsi konfigurasi berikut:

properti konfigurasi Koneksi ion

Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Bus Layanan.

Catatan

Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.

Koneksi ion properti spring-cloud-azure-stream-binder-servicebus yang dapat dikonfigurasi:

Properti Tipe Deskripsi
spring.cloud.azure.servicebus.enabled Boolean Apakah Azure Bus Layanan diaktifkan.
spring.cloud.azure.servicebus.connection-string String Bus Layanan nilai string koneksi Namespace.
spring.cloud.azure.servicebus.namespace String Bus Layanan nilai Namespace, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name String Nama domain nilai Azure Bus Layanan Namespace.

Catatan

Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk pengikat Bus Layanan Spring Cloud Azure Stream. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan spring.cloud.azure. terpadu atau awalan spring.cloud.azure.servicebus..

Pengikat juga mendukung Spring Could Azure Resource Manager secara default. Untuk mempelajari tentang cara mengambil string koneksi dengan prinsip keamanan yang tidak diberikan dengan Data peran terkait, lihat bagian Penggunaan dasar Spring Could Azure Resource Manager.

Properti konfigurasi pengikatan Azure Bus Layanan

Opsi berikut dibagi menjadi empat bagian: Properti Konsumen, Konfigurasi Konsumen Tingkat Lanjut, Properti Produsen, dan Konfigurasi Produsen Tingkat Lanjut.

Properti konsumen

Properti ini diekspos melalui ServiceBusConsumerProperties.

Properti yang dapat dikonfigurasi konsumen dari spring-cloud-azure-stream-binder-servicebus:

Properti Jenis Default Deskripsi
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected Boolean salah Jika pesan yang gagal dirutekan ke DLQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Bilangan bulat 1 Pesan bersamaan maksimum yang harus diproses oleh klien prosesor Bus Layanan.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Bilangan bulat nihil Jumlah maksimum sesi bersamaan untuk diproses pada waktu tertentu.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Boolean nihil Apakah sesi diaktifkan.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Bilangan bulat 0 Jumlah prefetch klien prosesor Bus Layanan.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue Sub-Antrean tidak ada Jenis sub antrean yang akan disambungkan.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Durasi 5m Jumlah waktu untuk melanjutkan perpanjangan kunci secara otomatis.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Mode penerima klien prosesor Bus Layanan.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Boolean benar Apakah akan menyelesaikan pesan secara otomatis. Jika diatur sebagai false, header Checkpointer pesan akan ditambahkan untuk memungkinkan pengembang menyelesaikan pesan secara manual.
Konfigurasi konsumen tingkat lanjut

Koneksi di atas dan konfigurasi klien Azure SDK umum mendukung kustomisasi untuk setiap konsumen binder, yang dapat Anda konfigurasi dengan awalan spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

Properti produsen

Properti ini diekspos melalui ServiceBusProducerProperties.

Properti yang dapat dikonfigurasi produsen dari spring-cloud-azure-stream-binder-servicebus:

Properti Jenis Default Deskripsi
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync Boolean salah Beralih bendera untuk sinkronisasi produsen.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout long 10000 Nilai waktu habis untuk pengiriman produsen.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType nihil Bus Layanan jenis entitas produsen, diperlukan untuk produsen pengikatan.

Penting

Saat menggunakan produsen pengikatan, properti spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type harus dikonfigurasi.

Konfigurasi produsen tingkat lanjut

Koneksi di atas dan konfigurasi klien Azure SDK umum mendukung kustomisasi untuk setiap produsen binder, yang dapat Anda konfigurasi dengan awalan spring.cloud.stream.servicebus.bindings.<binding-name>.producer..

Penggunaan dasar

Mengirim dan menerima pesan dari/ke Bus Layanan

  1. Isi opsi konfigurasi dengan informasi kredensial.

    • Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Catatan

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  • Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Tentukan pemasok dan konsumen.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Dukungan kunci partisi

Pengikat mendukung partisi Bus Layanan dengan mengizinkan pengaturan kunci partisi dan ID sesi di header pesan. Bagian ini memperkenalkan cara mengatur kunci partisi untuk pesan.

Spring Cloud Stream menyediakan properti spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressionekspresi SpEL kunci partisi . Misalnya, mengatur properti ini sebagai "'partitionKey-' + headers[<message-header-key>]" dan menambahkan header yang disebut message-header-key. Spring Cloud Stream menggunakan nilai untuk header ini saat mengevaluasi ekspresi untuk menetapkan kunci partisi. Kode berikut menyediakan contoh produsen:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Dukungan sesi

Pengikat mendukung sesi pesan Bus Layanan. ID sesi pesan dapat diatur melalui header pesan.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Catatan

Menurut pemartisian Bus Layanan, ID sesi memiliki prioritas yang lebih tinggi daripada kunci partisi. Jadi ketika kedua ServiceBusMessageHeaders#SESSION_ID header dan ServiceBusMessageHeaders#PARTITION_KEY diatur, nilai ID sesi akhirnya digunakan untuk menimpa nilai kunci partisi.

Menangani pesan kesalahan

  • Menangani pesan kesalahan pengikatan keluar

    Secara default, Integrasi Spring membuat saluran kesalahan global yang disebut errorChannel. Konfigurasikan titik akhir pesan berikut untuk menangani pesan kesalahan pengikatan keluar.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Menangani pesan kesalahan pengikatan masuk

    Spring Cloud Stream Bus Layanan Binder mendukung tiga solusi untuk menangani kesalahan untuk pengikatan pesan masuk: penangan kesalahan pengikat, saluran kesalahan kustom, dan handler.

    Penanganan kesalahan pengikat:

    Handler kesalahan binder default menangani pengikatan masuk. Anda menggunakan handler ini untuk mengirim pesan gagal ke antrean dead-letter ketika spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected diaktifkan. Jika tidak, pesan yang gagal akan ditinggalkan. Kecuali untuk mengonfigurasi saluran kesalahan khusus pengikatan, handler kesalahan pengikat selalu berlaku terlepas dari apakah ada penangan atau saluran kesalahan kustom lainnya.

    Saluran kesalahan:

    Spring Cloud Stream menyediakan saluran kesalahan untuk setiap pengikatan masuk. Dikirim ErrorMessage ke saluran kesalahan. Untuk informasi selengkapnya, lihat Menangani Kesalahan dalam dokumentasi Spring Cloud Stream.

    • Saluran kesalahan default

      Anda dapat menggunakan saluran kesalahan global bernama errorChannel untuk menggunakan semua pesan kesalahan pengikatan masuk. Untuk menangani pesan ini, konfigurasikan titik akhir pesan berikut:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Saluran kesalahan khusus pengikatan

      Anda dapat menggunakan saluran kesalahan tertentu untuk menggunakan pesan kesalahan pengikatan masuk tertentu dengan prioritas yang lebih tinggi daripada saluran kesalahan default. Untuk menangani pesan ini, konfigurasikan titik akhir pesan berikut:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Catatan

      Saluran kesalahan khusus pengikatan saling eksklusif dengan penanganan kesalahan dan saluran lain yang disediakan.

    Penanganan kesalahan:

    Spring Cloud Stream memaparkan mekanisme bagi Anda untuk menyediakan penangan kesalahan kustom dengan menambahkan Consumer yang menerima instans ErrorMessage . Untuk informasi selengkapnya, lihat Penanganan Kesalahan dalam dokumentasi Spring Cloud Stream.

    Catatan

    Ketika penangan kesalahan pengikatan dikonfigurasi, ini dapat berfungsi dengan saluran kesalahan default dan penangan kesalahan pengikat.

    • Handler kesalahan pengikatan-default

      Konfigurasikan satu Consumer biji untuk mengonsumsi semua pesan kesalahan pengikatan masuk. Fungsi default berikut berlangganan ke setiap saluran kesalahan pengikatan masuk:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Anda juga perlu mengatur properti ke spring.cloud.stream.default.error-handler-definition nama fungsi.

    • Penangan kesalahan khusus pengikatan

      Konfigurasikan Consumer biji untuk mengonsumsi pesan kesalahan pengikatan masuk tertentu. Fungsi berikut berlangganan saluran kesalahan pengikatan masuk tertentu dengan prioritas yang lebih tinggi daripada handler kesalahan pengikatan-default.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Anda juga perlu mengatur properti ke spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition nama fungsi.

Bus Layanan header pesan

Untuk header pesan dasar yang didukung, lihat bagian Bus Layanan header pesan dari dukungan Spring Cloud Azure untuk Integrasi Spring.

Catatan

Saat mengatur kunci partisi, prioritas header pesan lebih tinggi dari properti Spring Cloud Stream. Jadi spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression berlaku hanya ketika tidak ada header dan ServiceBusMessageHeaders#PARTITION_KEY yang dikonfigurasiServiceBusMessageHeaders#SESSION_ID.

Dukungan beberapa pengikat

Koneksi ke beberapa namespace Bus Layanan juga didukung dengan menggunakan beberapa pengikat. Sampel ini mengambil string koneksi sebagai contoh. Kredensial perwakilan layanan dan identitas terkelola juga didukung, pengguna dapat mengatur properti terkait di setiap pengaturan lingkungan binder.

  1. Untuk menggunakan beberapa pengikat ServiceBus, konfigurasikan properti berikut dalam file application.yml Anda:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Catatan

    File aplikasi sebelumnya menunjukkan cara mengonfigurasi satu poller default untuk aplikasi ke semua pengikatan. Jika Anda ingin mengonfigurasi poller untuk pengikatan tertentu, Anda dapat menggunakan konfigurasi seperti spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. kita perlu mendefinisikan dua pemasok dan dua konsumen

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Provisi sumber daya

Pengikat bus layanan mendukung provisi antrean, topik, dan langganan, pengguna dapat menggunakan properti berikut untuk mengaktifkan provisi.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Catatan

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir (akun pribadi dan organisasi) yang salah dari Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

Sampel

Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.