Bagikan melalui


Dukungan Spring Cloud Azure untuk Spring Cloud Stream

Spring Cloud Stream adalah kerangka kerja untuk membangun layanan mikro berbasis peristiwa yang sangat dapat diskalakan yang terhubung dengan sistem olahpesan bersama.

Kerangka kerja ini menyediakan model pemrograman fleksibel yang dibangun di atas idiom Spring yang sudah mapan dan akrab serta praktik terbaik. Praktik terbaik ini termasuk dukungan untuk semantik pub/sub persisten, grup konsumen, dan partisi stateful.

Implementasi binder saat ini meliputi:

Spring Cloud Stream Binder untuk Azure Event Hubs

Konsep utama

Spring Cloud Stream Binder untuk Azure Event Hubs menyediakan implementasi pengikatan untuk kerangka kerja Spring Cloud Stream. Implementasi ini menggunakan Adaptor Saluran Spring Integration Event Hubs pada dasarnya. Dari perspektif desain, Event Hubs mirip dengan Kafka. Selain itu, Azure Event Hubs dapat diakses melalui Kafka API. Jika proyek Anda memiliki dependensi yang ketat pada Kafka API, Anda dapat mencoba Events Hub dengan Sampel Api Kafka

Grup konsumen

Azure Event Hubs memberikan dukungan serupa dari grup konsumen sebagai Apache Kafka, tetapi dengan sedikit logika yang berbeda. Meskipun Kafka menyimpan semua offset yang diterapkan di broker, Anda harus menyimpan offset pesan Azure Event Hubs yang diproses secara manual. Azure Event Hubs SDK menyediakan fungsi untuk menyimpan offset tersebut di dalam Azure Storage.

Dukungan partisi

Event Hubs menyediakan konsep partisi fisik yang serupa dengan Kafka. Tetapi tidak seperti penyeimbangan ulang otomatis Kafka antara konsumen dan partisi, Azure Event Hubs menyediakan semacam mode preemptive. Akun penyimpanan bertindak sebagai sewa untuk menentukan konsumen mana yang memiliki partisi mana. Ketika konsumen baru dimulai, ia mencoba mencuri beberapa partisi dari konsumen yang paling banyak dimuat untuk mencapai keseimbangan beban kerja.

Untuk menentukan strategi penyeimbangan beban, properti spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* disediakan. Untuk informasi selengkapnya, lihat bagian properti konsumen .

Dukungan konsumen batch

Pengikat Spring Cloud Azure Stream Event Hubs mendukung fitur Spring Cloud Stream Batch Consumer.

Untuk bekerja dengan mode batch-consumer, atur properti spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode ke true. Saat diaktifkan, pesan dengan payload daftar peristiwa batch diterima dan diteruskan ke fungsi Consumer. Setiap header pesan juga dikonversi ke daftar, yang kontennya adalah nilai header terkait yang diurai dari setiap peristiwa. Header komunal ID partisi, checkpointer, dan properti antrean terakhir disajikan sebagai nilai tunggal karena seluruh batch peristiwa berbagi nilai yang sama. Untuk informasi selengkapnya, lihat bagian header pesan Azure Event Hubs dukungan Spring Cloud Azure untuk Spring Integration.

Nota

Header titik pemeriksaan hanya ada ketika mode titik pemeriksaan MANUAL digunakan.

Titik pemeriksaan konsumen batch mendukung dua mode: BATCH dan MANUAL. mode BATCH adalah mode titik pemeriksaan otomatis untuk memeriksa seluruh batch peristiwa bersama-sama setelah pengikat menerimanya. mode MANUAL adalah memeriksa peristiwa oleh pengguna. Saat digunakan, Checkpointer diteruskan ke header pesan, dan pengguna dapat menggunakannya untuk melakukan titik pemeriksaan.

Anda dapat menentukan ukuran batch dengan mengatur properti max-size dan max-wait-time yang memiliki awalan spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. Properti max-size diperlukan dan properti max-wait-time bersifat opsional. Untuk informasi selengkapnya, lihat bagian properti konsumen .

Penyiapan dependensi

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Atau, Anda juga dapat menggunakan Spring Cloud Azure Stream Event Hubs Starter, seperti yang ditunjukkan dalam contoh berikut untuk Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Konfigurasi

Pengikat menyediakan tiga bagian opsi konfigurasi berikut:

Properti konfigurasi koneksi

Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Event Hubs.

Nota

Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.

Properti koneksi yang dapat dikonfigurasi dari spring-cloud-azure-stream-binder-eventhubs:

Harta benda Jenis Deskripsi
spring.cloud.azure.eventhubs.enabled Boolean Apakah Azure Event Hubs diaktifkan.
spring.cloud.azure.eventhubs.connection-string Tali Nilai string koneksi Namespace Layanan Event Hubs.
spring.cloud.azure.eventhubs.namespace Tali Nilai Namespace Layanan Azure Event Hubs, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName
musim semi.cloud.azure.eventhubs.nama-domain Tali Nama domain nilai Namespace Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Tali Alamat Titik Akhir Kustom.

Ujung

Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk pengikat Spring Cloud Azure Stream Event Hubs. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan terpadu spring.cloud.azure. atau awalan spring.cloud.azure.eventhubs..

Pengikat juga mendukung Spring Could Azure Resource Manager secara default. Untuk mempelajari tentang cara mengambil string koneksi dengan prinsip keamanan yang tidak diberikan dengan peran terkait , lihat bagian penggunaan Dasar Spring Could Azure Resource Manager.

Properti konfigurasi titik pemeriksaan

Bagian ini berisi opsi konfigurasi untuk layanan Storage Blobs, yang digunakan untuk mempertahankan kepemilikan partisi dan informasi titik pemeriksaan.

Nota

Dari versi 4.0.0, saat properti spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tidak diaktifkan secara manual, tidak ada kontainer Storage yang akan dibuat secara otomatis dengan nama dari spring.cloud.stream.bindings.binding-name.destination.

Titik pemeriksaan properti yang dapat dikonfigurasi dari spring-cloud-azure-stream-binder-eventhubs:

Harta benda Jenis Deskripsi
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolean Apakah akan mengizinkan pembuatan kontainer jika tidak ada.
musim semi.cloud.azure.eventhubs.processor.checkpoint-store.account-name Tali Nama untuk akun penyimpanan.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Tali Kunci akses akun penyimpanan.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Tali Nama kontainer penyimpanan.

Ujung

Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk penyimpanan titik pemeriksaan Blob Penyimpanan. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan terpadu spring.cloud.azure. atau awalan spring.cloud.azure.eventhubs.processor.checkpoint-store.

Properti konfigurasi Pengikatan Azure Event Hubs

Opsi berikut dibagi menjadi empat bagian: Properti Konsumen, Konfigurasi Konsumen Tingkat Lanjut, Properti Produsen, dan Konfigurasi Produsen Tingkat Lanjut.

Properti konsumen

Properti ini diekspos melalui EventHubsConsumerProperties.

Nota

Untuk menghindari pengulangan, karena versi 4.17.0 dan 5.11.0, Spring Cloud Azure Stream Binder Event Hubs mendukung pengaturan nilai untuk semua saluran, dalam format spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.

Properti yang dapat dikonfigurasi konsumen dari spring-cloud-azure-stream-binder-eventhubs:

Harta benda Jenis Deskripsi
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode Mode Pos Pemeriksaan Mode titik pemeriksaan yang digunakan saat konsumen memutuskan cara mengirim pesan titik pemeriksaan
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Integer Memutuskan jumlah pesan untuk setiap partisi untuk melakukan satu titik pemeriksaan. Akan berlaku hanya ketika mode titik pemeriksaan PARTITION_COUNT digunakan.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Durasi Memutuskan interval waktu untuk melakukan satu titik pemeriksaan. Akan berlaku hanya ketika mode titik pemeriksaan TIME digunakan.
spring.cloud.stream.eventhubs.bindings.<ukuran.batch.max binding-name.consumer Integer Jumlah maksimum peristiwa dalam batch. Diperlukan untuk mode konsumen batch.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Durasi Durasi waktu maksimum untuk mengkonsumsi batch. Akan berlaku hanya ketika mode konsumen batch diaktifkan dan bersifat opsional.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Durasi Durasi waktu interval untuk memperbarui.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy Strategi Penyeimbangan Beban Strategi penyeimbangan beban.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Durasi Durasi waktu setelah kepemilikan partisi kedaluwarsa.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Boolean Apakah prosesor peristiwa harus meminta informasi tentang peristiwa antrean terakhir pada partisi terkait, dan melacak informasi tersebut saat peristiwa diterima.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Integer Jumlah yang digunakan oleh konsumen untuk mengontrol jumlah peristiwa yang akan diterima dan antrean konsumen Event Hub secara aktif secara lokal.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Petakan dengan kunci sebagai ID partisi, dan nilai StartPositionProperties Peta yang berisi posisi peristiwa yang akan digunakan untuk setiap partisi jika titik pemeriksaan untuk partisi tidak ada di penyimpanan titik pemeriksaan. Peta ini di-key off dari ID partisi.

Nota

Konfigurasi initial-partition-event-position menerima map untuk menentukan posisi awal untuk setiap hub peristiwa. Dengan demikian, kuncinya adalah ID partisi, dan nilainya adalah StartPositionProperties, yang mencakup properti offset, nomor urutan, waktu tanggal antrean dan apakah inklusif. Misalnya, Anda dapat mengaturnya sebagai

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Konfigurasi konsumen tingkat lanjut

koneksi di atas, titik pemeriksaan , dan kustomisasi dukungan konfigurasi klien Azure SDK umum untuk setiap konsumen binder, yang dapat Anda konfigurasi dengan awalan .

Properti produsen

Properti ini diekspos melalui EventHubsProducerProperties.

Nota

Untuk menghindari pengulangan, karena versi 4.17.0 dan 5.11.0, Spring Cloud Azure Stream Binder Event Hubs mendukung pengaturan nilai untuk semua saluran, dalam format spring.cloud.stream.eventhubs.default.producer.<property>=<value>.

Properti yang dapat dikonfigurasi produsen dari spring-cloud-azure-stream-binder-eventhubs:

Harta benda Jenis Deskripsi
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync Boolean Bendera sakelar untuk sinkronisasi produsen. Jika true, produser akan menunggu respons setelah operasi pengiriman.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout panjang Jumlah waktu untuk menunggu respons setelah operasi pengiriman. Akan berlaku hanya ketika produsen sinkronisasi diaktifkan.
Konfigurasi produsen tingkat lanjut

koneksi di atas dan konfigurasi klien Azure SDK umum mendukung penyesuaian untuk setiap produsen binder, yang dapat Anda konfigurasi dengan awalan .

Penggunaan dasar

Mengirim dan menerima pesan dari/ke Azure Event Hubs

  1. Isi opsi konfigurasi dengan informasi kredensial.

    • Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

      Nota

      Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.

    • Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Nota

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  • Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Tentukan pemasok dan konsumen.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Dukungan partisi

PartitionSupplier dengan informasi partisi yang disediakan pengguna dibuat untuk mengonfigurasi informasi partisi tentang pesan yang akan dikirim. Diagram alur berikut menunjukkan proses mendapatkan prioritas yang berbeda untuk ID dan kunci partisi:

Diagram memperlihatkan diagram alur proses dukungan partisi.

Dukungan konsumen batch

  1. Berikan opsi konfigurasi batch, seperti yang ditunjukkan dalam contoh berikut:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Tentukan pemasok dan konsumen.

    Untuk mode titik pemeriksaan sebagai BATCH, Anda dapat menggunakan kode berikut untuk mengirim pesan dan menggunakan dalam batch.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Untuk mode titik pemeriksaan sebagai MANUAL, Anda dapat menggunakan kode berikut untuk mengirim pesan dan menggunakan/titik pemeriksaan dalam batch.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Nota

Dalam mode penggunaan batch, jenis konten default pengikat Spring Cloud Stream application/json, jadi pastikan payload pesan selaras dengan jenis konten. Misalnya, saat menggunakan jenis konten default application/json untuk menerima pesan dengan payload String, payload harus JSON String, dikelilingi dengan tanda kutip ganda untuk teks String asli. Sementara untuk jenis konten text/plain, ini bisa menjadi objek String secara langsung. Untuk informasi selengkapnya, lihat Negosiasi Jenis Konten Spring Cloud Stream.

Menangani pesan kesalahan

  • Menangani pesan kesalahan pengikatan keluar

    Secara default, Integrasi Spring membuat saluran kesalahan global yang disebut errorChannel. Konfigurasikan titik akhir pesan berikut untuk menangani pesan kesalahan pengikatan keluar.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Menangani pesan kesalahan pengikatan masuk

    Spring Cloud Stream Event Hubs Binder mendukung satu solusi untuk menangani kesalahan untuk pengikatan pesan masuk: penanganan kesalahan.

    Penanganan Kesalahan :

    Spring Cloud Stream memaparkan mekanisme bagi Anda untuk menyediakan handler kesalahan kustom dengan menambahkan Consumer yang menerima instans ErrorMessage. Untuk informasi selengkapnya, lihat Menangani Pesan Kesalahan dalam dokumentasi Spring Cloud Stream.

    • Handler kesalahan pengikatan-default

      Konfigurasikan satu biji Consumer untuk mengonsumsi semua pesan kesalahan pengikatan masuk. Fungsi default berikut berlangganan ke setiap saluran kesalahan pengikatan masuk:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Anda juga perlu mengatur properti spring.cloud.stream.default.error-handler-definition ke nama fungsi.

    • Penangan kesalahan khusus pengikatan

      Konfigurasikan biji Consumer untuk menggunakan pesan kesalahan pengikatan masuk tertentu. Fungsi berikut berlangganan saluran kesalahan pengikatan masuk tertentu dan memiliki prioritas yang lebih tinggi daripada handler kesalahan pengikatan-default:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Anda juga perlu mengatur properti spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition ke nama fungsi.

Header pesan Azure Event Hubs

Untuk header pesan dasar yang didukung, lihat bagian header pesan Azure Event Hubs dukungan Spring Cloud Azure untuk Spring Integration.

Dukungan beberapa pengikat

Koneksi ke beberapa namespace Layanan Azure Event Hubs juga didukung dengan menggunakan beberapa pengikat. Sampel ini mengambil string koneksi sebagai contoh. Kredensial perwakilan layanan dan identitas terkelola juga didukung. Anda dapat mengatur properti terkait di pengaturan lingkungan setiap binder.

  1. Untuk menggunakan beberapa pengikat dengan Azure Event Hubs, konfigurasikan properti berikut dalam file application.yml Anda:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Nota

    File aplikasi sebelumnya menunjukkan cara mengonfigurasi satu poller default untuk aplikasi ke semua pengikatan. Jika Anda ingin mengonfigurasi poller untuk pengikatan tertentu, Anda dapat menggunakan konfigurasi seperti spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

    Nota

    Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.

  2. Kita perlu mendefinisikan dua pemasok dan dua konsumen:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Provisi sumber daya

Binder Azure Event Hubs mendukung provisi pusat aktivitas dan grup konsumen, pengguna dapat menggunakan properti berikut untuk mengaktifkan provisi.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Nota

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

Sampel

Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.

Spring Cloud Stream Binder untuk Azure Service Bus

Konsep utama

Spring Cloud Stream Binder untuk Azure Service Bus menyediakan implementasi pengikatan untuk Spring Cloud Stream Framework. Implementasi ini menggunakan Adaptor Saluran Bus Layanan Integrasi Spring pada dasarnya.

Pesan terjadwal

Pengikat ini mendukung pengiriman pesan ke topik untuk pemrosesan tertunda. Pengguna dapat mengirim pesan terjadwal dengan header x-delay mengekspresikan dalam milidetik waktu penundaan untuk pesan tersebut. Pesan akan dikirimkan ke topik masing-masing setelah x-delay milidetik.

Grup konsumen

Topik Bus Layanan memberikan dukungan serupa dari grup konsumen sebagai Apache Kafka, tetapi dengan sedikit logika yang berbeda. Pengikat ini bergantung pada Subscription topik untuk bertindak sebagai grup konsumen.

Penyiapan dependensi

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Atau, Anda juga dapat menggunakan Spring Cloud Azure Stream Service Bus Starter, seperti yang ditunjukkan dalam contoh berikut untuk Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Konfigurasi

Pengikat menyediakan dua bagian opsi konfigurasi berikut:

Properti konfigurasi koneksi

Bagian ini berisi opsi konfigurasi yang digunakan untuk menyambungkan ke Azure Service Bus.

Nota

Jika Anda memilih untuk menggunakan prinsip keamanan untuk mengautentikasi dan mengotorisasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure, lihat Mengotorisasi akses dengan ID Microsoft Entra untuk memastikan prinsip keamanan telah diberikan izin yang memadai untuk mengakses sumber daya Azure.

Properti koneksi yang dapat dikonfigurasi dari spring-cloud-azure-stream-binder-servicebus:

Harta benda Jenis Deskripsi
spring.cloud.azure.servicebus.enabled Boolean Apakah Azure Service Bus diaktifkan.
spring.cloud.azure.servicebus.connection-string Tali Nilai string koneksi Namespace Service Bus.
spring.cloud.azure.servicebus.custom-endpoint-address Tali Alamat titik akhir kustom yang akan digunakan saat menyambungkan ke Azure Service Bus.
spring.cloud.azure.servicebus.namespace Tali Nilai Namespace Service Bus, yang merupakan awalan FQDN. FQDN harus terdiri dari NamespaceName.DomainName
musim semi.cloud.azure.servicebus.domain-name Tali Nama domain nilai Namespace Azure Service Bus.

Nota

Opsi konfigurasi Azure Service SDK umum juga dapat dikonfigurasi untuk binder Azure Stream Service Bus Spring Cloud. Opsi konfigurasi yang didukung diperkenalkan dalam konfigurasi Spring Cloud Azure, dan dapat dikonfigurasi dengan awalan terpadu spring.cloud.azure. atau awalan spring.cloud.azure.servicebus..

Pengikat juga mendukung Spring Could Azure Resource Manager secara default. Untuk mempelajari tentang cara mengambil string koneksi dengan prinsip keamanan yang tidak diberikan dengan peran terkait , lihat bagian penggunaan Dasar Spring Could Azure Resource Manager.

Properti konfigurasi pengikatan Azure Service Bus

Opsi berikut dibagi menjadi empat bagian: Properti Konsumen, Konfigurasi Konsumen Tingkat Lanjut, Properti Produsen, dan Konfigurasi Produsen Tingkat Lanjut.

Properti konsumen

Properti ini diekspos melalui ServiceBusConsumerProperties.

Nota

Untuk menghindari pengulangan, karena versi 4.17.0 dan 5.11.0, Spring Cloud Azure Stream Binder Service Bus mendukung pengaturan nilai untuk semua saluran, dalam format spring.cloud.stream.servicebus.default.consumer.<property>=<value>.

Properti yang dapat dikonfigurasi konsumen dari spring-cloud-azure-stream-binder-servicebus:

Harta benda Jenis Bawaan Deskripsi
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected Boolean palsu Jika pesan yang gagal dirutekan ke DLQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Integer 1 Pesan bersamaan maksimum yang harus diproses oleh klien prosesor Azure Service Bus. Saat sesi diaktifkan, sesi berlaku untuk setiap sesi.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Integer nol Jumlah maksimum sesi bersamaan untuk diproses pada waktu tertentu.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Boolean nol Apakah sesi diaktifkan.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-idle-timeout Durasi nol Mengatur jumlah waktu maksimum (Durasi) untuk menunggu pesan diterima untuk sesi yang saat ini aktif.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Integer 0 Jumlah prefetch klien prosesor Azure Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-antrean Sub-Antrean tidak Jenis sub antrean yang akan disambungkan.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Durasi 5m Jumlah waktu untuk melanjutkan perpanjangan kunci secara otomatis.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Mode penerima klien prosesor Bus Layanan.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Boolean benar Apakah akan menyelesaikan pesan secara otomatis. Jika diatur sebagai false, header pesan Checkpointer akan ditambahkan untuk memungkinkan pengembang menyelesaikan pesan secara manual.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabyte Panjang 1024 Ukuran maksimum antrean/topik dalam megabyte, yang merupakan ukuran memori yang dialokasikan untuk antrean/topik.
musim semi.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Durasi P10675199DT2H48M5.4775807D. (10675199 hari, 2 jam, 48 menit, 5 detik, dan 477 milidetik) Durasi setelah pesan kedaluwarsa, dimulai dari saat pesan dikirim ke Azure Service Bus.

Penting

Saat menggunakan Azure Resource Manager (ARM) , Anda harus mengonfigurasi properti . Untuk informasi selengkapnya, lihat sampel servicebus-queue-binder-arm di GitHub.

Konfigurasi konsumen tingkat lanjut

koneksi di atas dan konfigurasi klien Azure SDK umum mendukung penyesuaian untuk setiap konsumen pengikat, yang dapat Anda konfigurasi dengan awalan .

Properti produsen

Properti ini diekspos melalui ServiceBusProducerProperties.

Nota

Untuk menghindari pengulangan, karena versi 4.17.0 dan 5.11.0, Spring Cloud Azure Stream Binder Service Bus mendukung pengaturan nilai untuk semua saluran, dalam format spring.cloud.stream.servicebus.default.producer.<property>=<value>.

Properti yang dapat dikonfigurasi produsen dari spring-cloud-azure-stream-binder-servicebus:

Harta benda Jenis Bawaan Deskripsi
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync Boolean palsu Beralih bendera untuk sinkronisasi produsen.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout panjang 10.000 Nilai waktu habis untuk pengiriman produsen.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType nol Jenis entitas Service Bus dari produsen, diperlukan untuk produsen pengikatan.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabyte Panjang 1024 Ukuran maksimum antrean/topik dalam megabyte, yang merupakan ukuran memori yang dialokasikan untuk antrean/topik.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Durasi P10675199DT2H48M5.4775807D. (10675199 hari, 2 jam, 48 menit, 5 detik, dan 477 milidetik) Durasi setelah pesan kedaluwarsa, dimulai dari saat pesan dikirim ke Azure Service Bus.

Penting

Saat menggunakan produsen pengikatan, properti spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type diperlukan untuk dikonfigurasi.

Konfigurasi produsen tingkat lanjut

koneksi di atas dan konfigurasi klien Azure SDK umum mendukung penyesuaian untuk setiap produsen binder, yang dapat Anda konfigurasi dengan awalan .

Penggunaan dasar

Mengirim dan menerima pesan dari/ke Azure Service Bus

  1. Isi opsi konfigurasi dengan informasi kredensial.

    • Untuk kredensial sebagai string koneksi, konfigurasikan properti berikut dalam file application.yml Anda:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

      Nota

      Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.

    • Untuk kredensial sebagai perwakilan layanan, konfigurasikan properti berikut dalam file application.yml Anda:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Nota

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

  • Untuk kredensial sebagai identitas terkelola, konfigurasikan properti berikut dalam file application.yml Anda:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Tentukan pemasok dan konsumen.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Dukungan kunci partisi

Pengikat mendukung partisi Azure Service Bus dengan mengizinkan pengaturan kunci partisi dan ID sesi di header pesan. Bagian ini memperkenalkan cara mengatur kunci partisi untuk pesan.

Spring Cloud Stream menyediakan properti ekspresi SpEL kunci partisi spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Misalnya, mengatur properti ini sebagai "'partitionKey-' + headers[<message-header-key>]" dan menambahkan header yang disebut message-header-key. Spring Cloud Stream menggunakan nilai untuk header ini saat mengevaluasi ekspresi untuk menetapkan kunci partisi. Kode berikut menyediakan contoh produsen:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Dukungan sesi

Pengikat mendukung sesi pesan Service Bus. ID sesi pesan dapat diatur melalui header pesan.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Nota

Menurut partisi Service Bus, ID sesi memiliki prioritas yang lebih tinggi daripada kunci partisi. Jadi ketika header ServiceBusMessageHeaders#SESSION_ID dan ServiceBusMessageHeaders#PARTITION_KEY diatur, nilai ID sesi akhirnya digunakan untuk menimpa nilai kunci partisi.

Menangani pesan kesalahan

  • Menangani pesan kesalahan pengikatan keluar

    Secara default, Integrasi Spring membuat saluran kesalahan global yang disebut errorChannel. Konfigurasikan titik akhir pesan berikut untuk menangani pesan kesalahan pengikatan keluar.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Menangani pesan kesalahan pengikatan masuk

    Spring Cloud Stream Service Bus Binder mendukung dua solusi untuk menangani kesalahan untuk pengikatan pesan masuk: handler kesalahan pengikat dan handler.

    handler kesalahan Binder :

    Handler kesalahan binder default menangani pengikatan masuk. Anda menggunakan handler ini untuk mengirim pesan yang gagal ke antrean surat mati ketika spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected diaktifkan. Jika tidak, pesan yang gagal akan ditinggalkan. Penangan kesalahan pengikat saling eksklusif dengan penangan kesalahan lain yang disediakan.

    handler kesalahan :

    Spring Cloud Stream memaparkan mekanisme bagi Anda untuk menyediakan handler kesalahan kustom dengan menambahkan Consumer yang menerima instans ErrorMessage. Untuk informasi selengkapnya, lihat Menangani Pesan Kesalahan dalam dokumentasi Spring Cloud Stream.

    • Handler kesalahan pengikatan-default

      Konfigurasikan satu biji Consumer untuk mengonsumsi semua pesan kesalahan pengikatan masuk. Fungsi default berikut berlangganan ke setiap saluran kesalahan pengikatan masuk:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Anda juga perlu mengatur properti spring.cloud.stream.default.error-handler-definition ke nama fungsi.

    • Penangan kesalahan khusus pengikatan

      Konfigurasikan biji Consumer untuk menggunakan pesan kesalahan pengikatan masuk tertentu. Fungsi berikut berlangganan saluran kesalahan pengikatan masuk tertentu dengan prioritas yang lebih tinggi daripada handler kesalahan pengikatan-default.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Anda juga perlu mengatur properti spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition ke nama fungsi.

Header pesan Azure Service Bus

Untuk header pesan dasar yang didukung, lihat bagian header pesan Azure Service Bus dukungan Spring Cloud Azure untuk Spring Integration.

Nota

Saat mengatur kunci partisi, prioritas header pesan lebih tinggi dari properti Spring Cloud Stream. Jadi spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression hanya berlaku ketika tidak ada header ServiceBusMessageHeaders#SESSION_ID dan ServiceBusMessageHeaders#PARTITION_KEY yang dikonfigurasi.

Dukungan beberapa pengikat

Koneksi ke beberapa namespace Bus Layanan juga didukung dengan menggunakan beberapa pengikat. Sampel ini mengambil string koneksi sebagai contoh. Kredensial perwakilan layanan dan identitas terkelola juga didukung, pengguna dapat mengatur properti terkait di setiap pengaturan lingkungan binder.

  1. Untuk menggunakan beberapa pengikat ServiceBus, konfigurasikan properti berikut dalam file application.yml Anda:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Nota

    File aplikasi sebelumnya menunjukkan cara mengonfigurasi satu poller default untuk aplikasi ke semua pengikatan. Jika Anda ingin mengonfigurasi poller untuk pengikatan tertentu, Anda dapat menggunakan konfigurasi seperti spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

    Nota

    Microsoft merekomendasikan penggunaan alur autentikasi paling aman yang tersedia. Alur autentikasi yang dijelaskan dalam prosedur ini, seperti untuk database, cache, olahpesan, atau layanan AI, memerlukan tingkat kepercayaan yang sangat tinggi dalam aplikasi dan membawa risiko yang tidak ada dalam alur lain. Gunakan alur ini hanya ketika opsi yang lebih aman, seperti identitas terkelola untuk koneksi tanpa kata sandi atau tanpa kunci, tidak layak. Untuk operasi komputer lokal, lebih suka identitas pengguna untuk koneksi tanpa kata sandi atau tanpa kunci.

  2. kita perlu mendefinisikan dua pemasok dan dua konsumen

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Provisi sumber daya

Pengikat bus layanan mendukung provisi antrean, topik, dan langganan, pengguna dapat menggunakan properti berikut untuk mengaktifkan provisi.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Nota

Nilai yang diizinkan untuk tenant-id adalah: common, organizations, consumers, atau ID penyewa. Untuk informasi selengkapnya tentang nilai-nilai ini, lihat bagian Menggunakan titik akhir yang salah (akun pribadi dan organisasi)Kesalahan AADSTS50020 - Akun pengguna dari penyedia identitas tidak ada di penyewa. Untuk informasi tentang mengonversi aplikasi penyewa tunggal Anda, lihat Mengonversi aplikasi penyewa tunggal ke multipenyewa di ID Microsoft Entra.

Menyesuaikan properti klien Azure Service Bus

Pengembang dapat menggunakan AzureServiceClientBuilderCustomizer untuk menyesuaikan properti Klien Bus Layanan. Contoh berikut mengkustomisasi properti sessionIdleTimeout di ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Sampel

Untuk informasi selengkapnya, lihat repositori azure-spring-boot-samples di GitHub.