Aracılığıyla paylaş


Spring Cloud Stream için Spring Cloud Azure desteği

Spring Cloud Stream, paylaşılan mesajlaşma sistemleriyle bağlantılı yüksek oranda ölçeklenebilir olay odaklı mikro hizmetler oluşturmaya yönelik bir çerçevedir.

Çerçeve, önceden oluşturulmuş ve tanıdık Spring deyimleri ve en iyi uygulamalar üzerine oluşturulmuş esnek bir programlama modeli sağlar. Bu en iyi yöntemler kalıcı pub/sub semantiği, tüketici grupları ve durum bilgisi olan bölümler için destek içerir.

Geçerli bağlayıcı uygulamaları şunlardır:

  • - Daha fazla bilgi için bkz. Azure Event Hubs için Spring Cloud Stream Binder
  • - Daha fazla bilgi için bkz. Azure Service Bus için Spring Cloud Stream Binder

Azure Event Hubs için Spring Cloud Stream Binder

Temel kavramlar

Azure Event Hubs için Spring Cloud Stream Binder, Spring Cloud Stream çerçevesi için bağlama uygulamasını sağlar. Bu uygulama, temelinde Spring Integration Event Hubs Kanal Bağdaştırıcılarını kullanır. Tasarım açısından bakıldığında Event Hubs, Kafka ile benzerdir. Ayrıca Event Hubs'a Kafka API aracılığıyla erişilebilir. Projenizin Kafka API'sinde sıkı bağımlılığı varsa, Kafka API Örneği ile Olay Hub'ını deneyebilirsiniz

Tüketici grubu

Event Hubs, tüketici grubu için Apache Kafka ile benzer destek sağlar ancak biraz farklı mantık sunar. Kafka, işlenen tüm uzaklıkları aracıda depolasa da, el ile işlenen Event Hubs iletilerinin uzaklıklarını depolamanız gerekir. Event Hubs SDK'sı, bu tür uzaklıkları Azure Depolama'da depolama işlevi sağlar.

Bölümleme desteği

Event Hubs, Kafka ile benzer bir fiziksel bölüm kavramı sağlar. Ancak Kafka'nın tüketiciler ve bölümler arasında otomatik olarak yeniden dengelenmesinden farklı olarak, Event Hubs bir tür önleyici mod sağlar. Depolama hesabı, hangi tüketicinin hangi bölüme sahip olduğunu belirlemek için kira görevi görür. Yeni bir tüketici başladığında, iş yükü bakiyesini elde etmek için en yoğun yüke sahip tüketicilerden bazı bölümleri çalmaya çalışır.

Yük dengeleme stratejisini belirtmek için spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* özellikleri sağlanır. Daha fazla bilgi için Tüketici özellikleri bölümüne bakın.

Batch tüketici desteği

Spring Cloud Azure Stream Event Hubs bağlayıcısıSpring Cloud Stream Batch Consumer özelliğini destekler.

Batch-consumer moduyla çalışmak için spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode özelliğini trueolarak ayarlayın. Etkinleştirildiğinde, toplu olaylar listesinin yükünü içeren bir ileti alınır ve Consumer işlevine geçirilir. Her ileti üst bilgisi, içeriği her olaydan ayrıştırılan ilişkili üst bilgi değeri olan bir listeye de dönüştürülür. Tüm olay grubu aynı değeri paylaştığından bölüm kimliği, denetim noktası ve son sıralanan özelliklerin ortak üst bilgileri tek bir değer olarak sunulur. Daha fazla bilgi için Spring Integrationiçin Spring Cloud Azure desteğinin Event Hubs ileti üst bilgileri bölümüne bakın.

Not

Denetim noktası üst bilgisi yalnızca MANUAL denetim noktası modu kullanıldığında bulunur.

Toplu tüketici denetim noktası oluşturma iki modu destekler: BATCH ve MANUAL. BATCH modu, bağlayıcı bunları aldıktan sonra olayların tamamını birlikte kontrol etmek için otomatik bir denetim noktası modudur. MANUAL modu, kullanıcılara göre olayları kontrol etmektir. Kullanıldığında, Checkpointer ileti üst bilgisine geçirilir ve kullanıcılar bunu denetim noktası oluşturma amacıyla kullanabilir.

max-sizeön ekini içeren max-wait-time ve spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch. özelliklerini ayarlayarak toplu iş boyutunu belirtebilirsiniz. max-size özelliği gereklidir ve max-wait-time özelliği isteğe bağlıdır. Daha fazla bilgi için Tüketici özellikleri bölümüne bakın.

Bağımlılık kurulumu

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Alternatif olarak, aşağıdaki Maven örneğinde gösterildiği gibi Spring Cloud Azure Stream Event Hubs Starter'ı da kullanabilirsiniz:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Konfigürasyon

Bağlayıcı, yapılandırma seçeneklerinin aşağıdaki üç bölümünü sağlar:

Bağlantı yapılandırma özellikleri

Bu bölüm, Azure Event Hubs'a bağlanmak için kullanılan yapılandırma seçeneklerini içerir.

Not

Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz güvenlik sorumlusuna Azure kaynağına erişmek için yeterli izin verildiğinden emin olmak için microsoft entra id erişimi yetkilendirme bölümüne bakın.

spring-cloud-azure-stream-binder-eventhubs'ın bağlantı yapılandırılabilir özellikleri:

Mülk Tür Açıklama
spring.cloud.azure.eventhubs.enabled Boolean Azure Event Hubs'ın etkinleştirilip etkinleştirilmediği.
spring.cloud.azure.eventhubs.connection-string Dizgi Event Hubs Ad Alanı bağlantı dizesi değeri.
spring.cloud.azure.eventhubs.namespace Dizgi FQDN'nin ön eki olan Event Hubs Ad Alanı değeri. FQDN, NamespaceName.DomainName dosyasından oluşmalıdır
spring.cloud.azure.eventhubs.domain-name Dizgi Azure Event Hubs Ad Alanı değerinin etki alanı adı.
spring.cloud.azure.eventhubs.custom-endpoint-address Dizgi Özel Uç Nokta adresi.

Bahşiş

Yaygın Azure Hizmeti SDK yapılandırma seçenekleri Spring Cloud Azure Stream Event Hubs bağlayıcısı için de yapılandırılabilir. Desteklenen yapılandırma seçenekleri Spring Cloud Azure yapılandırmasunulmuştur ve birleştirilmiş ön ek spring.cloud.azure. veya spring.cloud.azure.eventhubs.ön eki ile yapılandırılabilir.

Bağlayıcı ayrıca Spring Azure Resource Manager'ın varsayılan olarak destekler. İlgili rolle verilmeyen güvenlik sorumlularıyla bağlantı dizesini alma hakkında bilgi edinmek için spring could Azure Resource ManagerTemel kullanım bölümüne bakın.

Denetim noktası yapılandırma özellikleri

Bu bölüm, bölüm sahipliğini ve denetim noktası bilgilerini kalıcı hale etmek için kullanılan Depolama Blobları hizmetinin yapılandırma seçeneklerini içerir.

Not

Sürüm 4.0.0'dan itibaren, spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists özelliği el ile etkinleştirilmediğinde, spring.cloud.stream.bindings.binding-name.destinationadlı depolama kapsayıcısı otomatik olarak oluşturulmaz.

spring-cloud-azure-stream-binder-eventhubs'ın yapılandırılabilir özelliklerini denetleme:

Mülk Tür Açıklama
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolean (Boole Mantığı) Mevcut değilse kapsayıcı oluşturmaya izin verilip verilmeyeceği.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Dizgi Depolama hesabının adı.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Dizgi Depolama hesabı erişim anahtarı.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Dizgi Depolama kapsayıcısı adı.

Bahşiş

Yaygın Azure Hizmeti SDK yapılandırma seçenekleri, Depolama Blobu denetim noktası deposu için de yapılandırılabilir. Desteklenen yapılandırma seçenekleri Spring Cloud Azure yapılandırmasunulmuştur ve birleştirilmiş ön ek spring.cloud.azure. veya spring.cloud.azure.eventhubs.processor.checkpoint-storeön eki ile yapılandırılabilir.

Azure Event Hubs Bağlama yapılandırma özellikleri

Aşağıdaki seçenekler dört bölüme ayrılmıştır: Tüketici Özellikleri, Gelişmiş Tüketici Yapılandırmaları, Üretici Özellikleri ve Gelişmiş Üretici Yapılandırmaları.

Tüketici özellikleri

Bu özellikler EventHubsConsumerPropertiesaracılığıyla kullanıma sunulur.

Not

Sürüm 4.17.0 ve 5.11.0'dan bu yana yinelemeyi önlemek için Spring Cloud Azure Stream Binder Event Hubs, spring.cloud.stream.eventhubs.default.consumer.<property>=<value>biçiminde tüm kanallar için değerlerin ayarlanmasını destekler.

Spring-cloud-azure-stream-binder-eventhubs'ın tüketici tarafından yapılandırılabilir özellikleri:

Mülk Tür Açıklama
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode Denetim Noktası Modu Tüketici denetim noktası iletisinin nasıl denetleneceğine karar verince kullanılan denetim noktası modu
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Tam sayı Her bölümün tek bir denetim noktası yapması için ileti miktarına karar verir. Yalnızca PARTITION_COUNT denetim noktası modu kullanıldığında geçerlilik kazanır.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Süre Bir denetim noktası yapmak için zaman aralığına karar verir. Yalnızca TIME denetim noktası modu kullanıldığında geçerlilik kazanır.
spring.cloud.stream.eventhubs.bindings .<binding-name.consumer.batch.max-size Tam sayı Toplu iş içindeki en fazla olay sayısı. Toplu tüketici modu için gereklidir.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Süre Toplu işlem için en uzun süre. Yalnızca toplu iş tüketici modu etkinleştirildiğinde ve isteğe bağlı olduğunda geçerlilik kazanır.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Süre Güncelleştirme için aralık süresi.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy Yük Dengeleme Stratejisi Yük dengeleme stratejisi.
spring.cloud.stream.eventhubs.bindings.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Süre Bölümün sahipliğinin sona erdiği süre.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Boolean (Boole Mantığı) Olay işlemcisinin ilişkili bölümünde en son sıraya alınan olayla ilgili bilgi isteyip istememesi ve olaylar alınırken bu bilgileri izlemesi gerekip gerekmediği.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Tam sayı Olay Hub'ı tüketicisinin etkin olarak alıp yerel olarak kuyruğa alacağı olay sayısını denetlemek için tüketici tarafından kullanılan sayı.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Bölüm kimliği ve StartPositionProperties değerleri olarak anahtarla eşleme Denetim noktası deposunda bölüm için bir denetim noktası yoksa, her bölüm için kullanılacak olay konumunu içeren eşleme. Bu eşleme bölüm kimliğinin dışında anahtarlanır.

Not

initial-partition-event-position yapılandırması, her olay hub'ı için başlangıç konumunu belirtmek üzere bir map kabul eder. Bu nedenle, anahtarı bölüm kimliğidir ve değer, uzaklık, sıra numarası, sıralanan tarih saati ve dahil olup olmadığını içeren StartPositionPropertiesdeğeridir. Örneğin, bunu olarak ayarlayabilirsiniz

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Gelişmiş tüketici yapılandırması

Yukarıdaki bağlantıdenetim noktasıve ortak Azure SDK istemcisi yapılandırma desteği her bağlayıcı tüketicisi için özelleştirmeyi destekler ve bu özelleştirmeyi spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.ön ekiyle yapılandırabilirsiniz.

Üretici özellikleri

Bu özellikler EventHubsProducerPropertiesaracılığıyla kullanıma sunulur.

Not

Sürüm 4.17.0 ve 5.11.0'dan bu yana yinelemeyi önlemek için Spring Cloud Azure Stream Binder Event Hubs, spring.cloud.stream.eventhubs.default.producer.<property>=<value>biçiminde tüm kanallar için değerlerin ayarlanmasını destekler.

spring-cloud-azure-stream-binder-eventhubs üretici tarafından yapılandırılabilir özellikler:

Mülk Tür Açıklama
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync Boolean Üretici eşitlemesi için anahtar bayrağı. True ise, üretici bir gönderme işleminden sonra yanıt bekler.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout uzun Gönderme işleminden sonra yanıt bekleme süresi. Yalnızca eşitleme üreticisi etkinleştirildiğinde geçerlilik kazanır.
Gelişmiş üretici yapılandırması

Yukarıdaki bağlantı ve ortak Azure SDK istemcisi yapılandırması, her bağlayıcı üreticisi için özelleştirmeyi destekler ve bu özelleştirmeyi spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.ön ekiyle yapılandırabilirsiniz.

Temel kullanım

Event Hubs'dan/Event Hubs'a ileti gönderme ve alma

  1. Yapılandırma seçeneklerini kimlik bilgileriyle doldurun.

    • Bağlantı dizesi olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

      Not

      Microsoft, kullanılabilir en güvenli kimlik doğrulama akışının kullanılmasını önerir. Veritabanları, önbellekler, mesajlaşma veya yapay zeka hizmetleri gibi bu yordamda açıklanan kimlik doğrulama akışı, uygulamaya çok yüksek düzeyde güven gerektirir ve diğer akışlarda mevcut olmayan riskler taşır. Bu akışı yalnızca parolasız veya anahtarsız bağlantılar için yönetilen kimlikler gibi daha güvenli seçenekler uygun olmadığında kullanın. Yerel makine işlemleri için parolasız veya anahtarsız bağlantılar için kullanıcı kimliklerini tercih edin.

    • Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Not

tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.

  • Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Sağlayıcıyı ve tüketiciyi tanımlayın.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Bölümleme desteği

Gönderilecek iletiyle ilgili bölüm bilgilerini yapılandırmak için kullanıcı tarafından sağlanan bölüm bilgilerini içeren bir PartitionSupplier oluşturulur. Aşağıdaki akış çizelgesinde bölüm kimliği ve anahtarı için farklı öncelikleri alma işlemi gösterilmektedir:

bölümleme destek işleminin akış çizelgesini gösteren Diyagramı.

Batch tüketici desteği

  1. Aşağıdaki örnekte gösterildiği gibi toplu iş yapılandırma seçeneklerini belirtin:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Sağlayıcıyı ve tüketiciyi tanımlayın.

    BATCHolarak denetim noktası oluşturma modu için, iletileri göndermek ve toplu olarak kullanmak için aşağıdaki kodu kullanabilirsiniz.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    MANUALolarak denetim noktası oluşturma modu için, iletileri göndermek ve toplu olarak kullanmak/kontrol noktası yapmak için aşağıdaki kodu kullanabilirsiniz.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Not

Toplu işlem modunda Spring Cloud Stream bağlayıcısının varsayılan içerik türü application/jsonolduğundan ileti yükünün içerik türüyle hizalandığından emin olun. Örneğin, application/json yüküne sahip iletileri almak için varsayılan String içerik türünü kullanırken yük, özgün JSON String metni için çift tırnak içine alınan Stringolmalıdır. text/plain içerik türü için ise doğrudan bir String nesnesi olabilir. Daha fazla bilgi için bkz. Spring Cloud Stream İçerik Türü Anlaşması.

Hata iletilerini işleme

  • Giden bağlama hata iletilerini işleme

    Varsayılan olarak Spring Integration, errorChanneladlı bir genel hata kanalı oluşturur. Giden bağlama hata iletilerini işlemek için aşağıdaki ileti uç noktasını yapılandırın.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Gelen bağlama hata iletilerini işleme

    Spring Cloud Stream Event Hubs Binder, gelen ileti bağlamalarındaki hataları işlemek için tek bir çözümü destekler: hata işleyicileri.

    Hata İşleyicisi:

    Spring Cloud Stream, Consumer örnekleri kabul eden bir ErrorMessage ekleyerek özel hata işleyicisi sağlamanız için bir mekanizma sunar. Daha fazla bilgi için Spring Cloud Stream belgelerinde Hata İletilerini İşleme bakın.

    • Bağlama varsayılan hata işleyicisi

      Tüm gelen bağlama hata iletilerini kullanmak için tek bir Consumer çekirdeği yapılandırın. Aşağıdaki varsayılan işlev her gelen bağlama hata kanalına abone olur:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      ayrıca spring.cloud.stream.default.error-handler-definition özelliğini işlev adına ayarlamanız gerekir.

    • Bağlamaya özgü hata işleyicisi

      Belirli gelen bağlama hata iletilerini kullanmak için bir Consumer çekirdeği yapılandırın. Aşağıdaki işlev, belirli bir gelen bağlama hata kanalına abone olur ve bağlama varsayılan hata işleyicisinden daha yüksek önceliğe sahiptir:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      ayrıca spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition özelliğini işlev adına ayarlamanız gerekir.

Event Hubs ileti üst bilgileri

Desteklenen temel ileti üst bilgileri için Spring Integrationiçin Spring Cloud Azure desteğinin Event Hubs ileti üst bilgileri bölümüne bakın.

Birden çok bağlayıcı desteği

Birden çok Event Hubs ad alanına bağlantı, birden çok bağlayıcı kullanılarak da desteklenir. Bu örnek, örnek olarak bir bağlantı dizesi alır. Hizmet sorumlularının ve yönetilen kimliklerin kimlik bilgileri de desteklenir. Her bağlayıcının ortam ayarlarında ilgili özellikleri ayarlayabilirsiniz.

  1. Event Hubs ile birden çok bağlayıcı kullanmak için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Not

    Önceki uygulama dosyası, tüm bağlamalara uygulama için tek bir varsayılan poller yapılandırmayı gösterir. Poller'ı belirli bir bağlama için yapılandırmak istiyorsanız, spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000gibi bir yapılandırma kullanabilirsiniz.

    Not

    Microsoft, kullanılabilir en güvenli kimlik doğrulama akışının kullanılmasını önerir. Veritabanları, önbellekler, mesajlaşma veya yapay zeka hizmetleri gibi bu yordamda açıklanan kimlik doğrulama akışı, uygulamaya çok yüksek düzeyde güven gerektirir ve diğer akışlarda mevcut olmayan riskler taşır. Bu akışı yalnızca parolasız veya anahtarsız bağlantılar için yönetilen kimlikler gibi daha güvenli seçenekler uygun olmadığında kullanın. Yerel makine işlemleri için parolasız veya anahtarsız bağlantılar için kullanıcı kimliklerini tercih edin.

  2. İki tedarikçi ve iki tüketici tanımlamamız gerekir:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Kaynak sağlama

Event Hubs bağlayıcısı olay hub'ının ve tüketici grubunun sağlanmasını destekler; kullanıcılar sağlamayı etkinleştirmek için aşağıdaki özellikleri kullanabilir.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Not

tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.

Örnekleri

Daha fazla bilgi için GitHub'daki azure-spring-boot-samples deposuna bakın.

Azure Service Bus için Spring Cloud Stream Binder

Temel kavramlar

Azure Service Bus için Spring Cloud Stream Binder, Spring Cloud Stream Framework için bağlama uygulamasını sağlar. Bu uygulama, temelinde Spring Integration Service Bus Kanal Bağdaştırıcılarını kullanır.

Zamanlanmış ileti

Bu bağlayıcı, gecikmeli işleme için bir konuya ileti göndermeyi destekler. Kullanıcılar, iletinin gecikme süresini milisaniye cinsinden ifade eden üst bilgi x-delay zamanlanmış iletiler gönderebilir. İleti, x-delay milisaniye sonra ilgili konulara teslim edilir.

Tüketici grubu

Service Bus Konusu, tüketici grubu için Apache Kafka ile benzer destek sağlar, ancak biraz farklı mantık sunar. Bu bağlayıcı, tüketici grubu olarak hareket etmek için bir konunun Subscription dayanır.

Bağımlılık kurulumu

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Alternatif olarak, aşağıdaki Maven örneğinde gösterildiği gibi Spring Cloud Azure Stream Service Bus Starter'ı da kullanabilirsiniz:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Konfigürasyon

Bağlayıcı, yapılandırma seçeneklerinin aşağıdaki iki bölümünü sağlar:

Bağlantı yapılandırma özellikleri

Bu bölüm, Azure Service Bus'a bağlanmak için kullanılan yapılandırma seçeneklerini içerir.

Not

Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz güvenlik sorumlusuna Azure kaynağına erişmek için yeterli izin verildiğinden emin olmak için microsoft entra id erişimi yetkilendirme bölümüne bakın.

spring-cloud-azure-stream-binder-servicebus'ın bağlantı yapılandırılabilir özellikleri:

Mülk Tür Açıklama
spring.cloud.azure.servicebus.enabled Boolean Azure Service Bus'ın etkinleştirilip etkinleştirilmediği.
spring.cloud.azure.servicebus.connection-string Dizgi Service Bus Ad Alanı bağlantı dizesi değeri.
spring.cloud.azure.servicebus.custom-endpoint-address Dizgi Service Bus'a bağlanırken kullanılacak özel uç nokta adresi.
spring.cloud.azure.servicebus.namespace Dizgi FQDN ön eki olan Service Bus Ad Alanı değeri. FQDN, NamespaceName.DomainName dosyasından oluşmalıdır
spring.cloud.azure.servicebus.etki alanı-adı Dizgi Azure Service Bus Ad Alanı değerinin etki alanı adı.

Not

Yaygın Azure Hizmet SDK'sı yapılandırma seçenekleri Spring Cloud Azure Stream Service Bus bağlayıcısı için de yapılandırılabilir. Desteklenen yapılandırma seçenekleri Spring Cloud Azure yapılandırmasunulmuştur ve birleştirilmiş ön ek spring.cloud.azure. veya spring.cloud.azure.servicebus.ön eki ile yapılandırılabilir.

Bağlayıcı ayrıca Spring Azure Resource Manager'ın varsayılan olarak destekler. İlgili rolle verilmeyen güvenlik sorumlularıyla bağlantı dizesini alma hakkında bilgi edinmek için spring could Azure Resource ManagerTemel kullanım bölümüne bakın.

Azure Service Bus bağlama yapılandırma özellikleri

Aşağıdaki seçenekler dört bölüme ayrılmıştır: Tüketici Özellikleri, Gelişmiş Tüketici Yapılandırmaları, Üretici Özellikleri ve Gelişmiş Üretici Yapılandırmaları.

Tüketici özellikleri

Bu özellikler ServiceBusConsumerPropertiesaracılığıyla kullanıma sunulur.

Not

Sürüm 4.17.0 ve 5.11.0'dan bu yana yinelemeyi önlemek için Spring Cloud Azure Stream Binder Service Bus, spring.cloud.stream.servicebus.default.consumer.<property>=<value>biçiminde tüm kanallar için değerlerin ayarlanmasını destekler.

spring-cloud-azure-stream-binder-servicebus'ın tüketici tarafından yapılandırılabilir özellikleri:

Mülk Tür Temerrüt Açıklama
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-reddedildi Boolean yanlış Başarısız iletiler DLQ'ya yönlendirilirse.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Tam sayı 1 Service Bus işlemci istemcisinin işlemesi gereken en fazla eşzamanlı ileti. Oturum etkinleştirildiğinde, her oturum için geçerlidir.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Tam sayı sıfır Herhangi bir zamanda işlenmek üzere en fazla eş zamanlı oturum sayısı.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Boolean (Boole Mantığı) sıfır Oturumun etkinleştirilip etkinleştirilmediği.
spring.cloud.stream.servicebus.bindings.binding-name.consumer .session-idle-timeout Süre sıfır Etkin durumdaki oturumda bir iletinin alınmasını beklemek için en uzun süreyi (Süre) ayarlar.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Tam sayı 0 Service Bus işlemci istemcisinin ön işlem sayısı.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue Alt Sorgu hiç kimse Bağlanacak alt kuyruğun türü.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Süre 5 dk. Kilidi otomatik yenilemeye devam etme süresi.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Service Bus işlemci istemcisinin alma modu.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Boolean (Boole Mantığı) doğru İletilerin otomatik olarak kapatılıp kapatılmayacağı. False olarak ayarlanırsa, geliştiricilerin iletileri el ile kapatmasını sağlamak için Checkpointer bir ileti üst bilgisi eklenir.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabayt Uzun 1024 Kuyruk/konu başlığı için ayrılan belleğin boyutu olan megabayt cinsinden en büyük kuyruk/konu boyutu.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Süre P10675199DT2H48M5.4775807S. (10675199 gün, 2 saat, 48 dakika, 5 saniye ve 477 milisaniye) İletinin Service Bus'a gönderilmesinden başlayarak, iletinin süresinin dolma süresi.

Önemli

Azure Resource Manager (ARM) kullandığınızda, özelliğini yapılandırmanız gerekir. Daha fazla bilgi için GitHub'da servicebus-queue-binder-arm örneğine bakın.

Gelişmiş tüketici yapılandırması

Yukarıdaki bağlantı ve ortak Azure SDK istemcisi yapılandırması, her bağlayıcı tüketicisi için özelleştirmeyi destekler ve bu özelleştirmeyi spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.ön ekiyle yapılandırabilirsiniz.

Üretici özellikleri

Bu özellikler ServiceBusProducerPropertiesaracılığıyla kullanıma sunulur.

Not

Sürüm 4.17.0 ve 5.11.0'dan bu yana yinelemeyi önlemek için Spring Cloud Azure Stream Binder Service Bus, spring.cloud.stream.servicebus.default.producer.<property>=<value>biçiminde tüm kanallar için değerlerin ayarlanmasını destekler.

spring-cloud-azure-stream-binder-servicebus üretici tarafından yapılandırılabilir özellikler:

Mülk Tür Temerrüt Açıklama
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync Boolean yanlış Üretici eşitlemesi için switch bayrağı.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout uzun 10.000 Üreticinin gönderilmesi için zaman aşımı değeri.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType sıfır Bağlama üreticisi için gerekli olan üreticinin Service Bus varlık türü.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabayt Uzun 1024 Kuyruk/konu başlığı için ayrılan belleğin boyutu olan megabayt cinsinden en büyük kuyruk/konu boyutu.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Süre P10675199DT2H48M5.4775807S. (10675199 gün, 2 saat, 48 dakika, 5 saniye ve 477 milisaniye) İletinin Service Bus'a gönderilmesinden başlayarak, iletinin süresinin dolma süresi.

Önemli

Bağlama üreticisi kullanılırken, spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type özelliğinin yapılandırılması gerekir.

Gelişmiş üretici yapılandırması

Yukarıdaki bağlantı ve ortak Azure SDK istemcisi yapılandırması, her bağlayıcı üreticisi için özelleştirmeyi destekler ve bu özelleştirmeyi spring.cloud.stream.servicebus.bindings.<binding-name>.producer.ön ekiyle yapılandırabilirsiniz.

Temel kullanım

Service Bus'tan/Service Bus'a ileti gönderme ve alma

  1. Yapılandırma seçeneklerini kimlik bilgileriyle doldurun.

    • Bağlantı dizesi olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

      Not

      Microsoft, kullanılabilir en güvenli kimlik doğrulama akışının kullanılmasını önerir. Veritabanları, önbellekler, mesajlaşma veya yapay zeka hizmetleri gibi bu yordamda açıklanan kimlik doğrulama akışı, uygulamaya çok yüksek düzeyde güven gerektirir ve diğer akışlarda mevcut olmayan riskler taşır. Bu akışı yalnızca parolasız veya anahtarsız bağlantılar için yönetilen kimlikler gibi daha güvenli seçenekler uygun olmadığında kullanın. Yerel makine işlemleri için parolasız veya anahtarsız bağlantılar için kullanıcı kimliklerini tercih edin.

    • Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Not

tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.

  • Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Sağlayıcıyı ve tüketiciyi tanımlayın.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Bölüm anahtarı desteği

Bağlayıcı, ileti üst bilgisinde bölüm anahtarı ve oturum kimliği ayarlamaya izin vererek service bus bölümleme destekler. Bu bölümde, iletiler için bölüm anahtarının nasıl ayarlanacağı anlatlenmektedir.

Spring Cloud Stream, spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressionbir bölüm anahtarı SpEL ifade özelliği sağlar. Örneğin, bu özelliği "'partitionKey-' + headers[<message-header-key>]" olarak ayarlayın ve message-header-key adlı bir üst bilgi ekleyin. Spring Cloud Stream, bölüm anahtarı atamak için ifadeyi değerlendirirken bu üst bilgi için değerini kullanır. Aşağıdaki kod örnek bir üretici sağlar:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Oturum desteği

Bağlayıcı, Service Bus ileti oturumlarını destekler. İletinin oturum kimliği, ileti üst bilgisi aracılığıyla ayarlanabilir.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Not

Service Bus bölümlemegöre, oturum kimliği bölüm anahtarından daha yüksek önceliğe sahiptir. Bu nedenle hem ServiceBusMessageHeaders#SESSION_ID hem de ServiceBusMessageHeaders#PARTITION_KEY üst bilgileri ayarlandığında, bölüm anahtarının değerinin üzerine yazmak için oturum kimliğinin değeri sonunda kullanılır.

Hata iletilerini işleme

  • Giden bağlama hata iletilerini işleme

    Varsayılan olarak Spring Integration, errorChanneladlı bir genel hata kanalı oluşturur. Giden bağlama hata iletisini işlemek için aşağıdaki ileti uç noktasını yapılandırın.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Gelen bağlama hata iletilerini işleme

    Spring Cloud Stream Service Bus Bağlayıcısı, gelen ileti bağlamalarının hatalarını işlemek için iki çözümü destekler: bağlayıcı hata işleyicisi ve işleyicileri.

    Bağlayıcı hata işleyicisi:

    Varsayılan bağlayıcı hata işleyicisi gelen bağlamayı işler. spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected etkinleştirildiğinde başarısız iletileri teslim edilemeyen ileti kuyruğuna göndermek için bu işleyiciyi kullanırsınız. Aksi takdirde, başarısız iletiler terk edilir. Bağlayıcı hata işleyicisi, sağlanan diğer hata işleyicileriyle birbirini dışlar.

    Hata işleyicisi:

    Spring Cloud Stream, Consumer örnekleri kabul eden bir ErrorMessage ekleyerek özel hata işleyicisi sağlamanız için bir mekanizma sunar. Daha fazla bilgi için Spring Cloud Stream belgelerinde Hata İletilerini İşleme bakın.

    • Bağlama varsayılan hata işleyicisi

      Tüm gelen bağlama hata iletilerini kullanmak için tek bir Consumer çekirdeği yapılandırın. Aşağıdaki varsayılan işlev her gelen bağlama hata kanalına abone olur:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      ayrıca spring.cloud.stream.default.error-handler-definition özelliğini işlev adına ayarlamanız gerekir.

    • Bağlamaya özgü hata işleyicisi

      Belirli gelen bağlama hata iletilerini kullanmak için bir Consumer çekirdeği yapılandırın. Aşağıdaki işlev, bağlama varsayılan hata işleyicisinden daha yüksek önceliğe sahip belirli bir gelen bağlama hata kanalına abone olur.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      ayrıca spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition özelliğini işlev adına ayarlamanız gerekir.

Service Bus ileti üst bilgileri

Desteklenen temel ileti üst bilgileri için Spring Integration için Spring Cloud Azure desteği Service Bus ileti üst bilgileri bölümüne bakın.

Not

Bölüm anahtarı ayarlanırken, ileti üst bilgisinin önceliği Spring Cloud Stream özelliğinden daha yüksektir. Bu nedenle spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression yalnızca ServiceBusMessageHeaders#SESSION_ID ve ServiceBusMessageHeaders#PARTITION_KEY üst bilgilerinin hiçbiri yapılandırılmadığında geçerlilik kazanır.

Birden çok bağlayıcı desteği

Birden çok Service Bus ad alanına bağlantı, birden çok bağlayıcı kullanılarak da desteklenir. Bu örnek, örnek olarak bağlantı dizesini alır. Hizmet sorumlularının ve yönetilen kimliklerin kimlik bilgileri de desteklenir. Kullanıcılar her bağlayıcının ortam ayarlarında ilgili özellikleri ayarlayabilir.

  1. ServiceBus'ın birden çok bağlayıcısını kullanmak için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Not

    Önceki uygulama dosyası, tüm bağlamalara uygulama için tek bir varsayılan poller yapılandırmayı gösterir. Poller'ı belirli bir bağlama için yapılandırmak istiyorsanız, spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000gibi bir yapılandırma kullanabilirsiniz.

    Not

    Microsoft, kullanılabilir en güvenli kimlik doğrulama akışının kullanılmasını önerir. Veritabanları, önbellekler, mesajlaşma veya yapay zeka hizmetleri gibi bu yordamda açıklanan kimlik doğrulama akışı, uygulamaya çok yüksek düzeyde güven gerektirir ve diğer akışlarda mevcut olmayan riskler taşır. Bu akışı yalnızca parolasız veya anahtarsız bağlantılar için yönetilen kimlikler gibi daha güvenli seçenekler uygun olmadığında kullanın. Yerel makine işlemleri için parolasız veya anahtarsız bağlantılar için kullanıcı kimliklerini tercih edin.

  2. iki tedarikçi ve iki tüketici tanımlamamız gerekiyor

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Kaynak sağlama

Service Bus bağlayıcısı kuyruk, konu ve aboneliğin sağlanmasını destekler; kullanıcılar sağlamayı etkinleştirmek için aşağıdaki özellikleri kullanabilir.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Not

tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.

Service Bus istemci özelliklerini özelleştirme

Geliştiriciler Service Bus İstemcisi özelliklerini özelleştirmek için AzureServiceClientBuilderCustomizer kullanabilir. Aşağıdaki örnek, sessionIdleTimeoutiçindeki ServiceBusClientBuilder özelliğini özelleştirmektedir:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Örnekleri

Daha fazla bilgi için GitHub'daki azure-spring-boot-samples deposuna bakın.