Azure Service Bus ile Spring Cloud Stream

Bu makalede, Service Bus queues ve topics iletileri göndermek ve almak için Spring Cloud Stream Binder'ın nasıl kullanılacağı gösterilmektedir.

Azure Azure Service Bus ("Service Bus") adlı ve Gelişimli İleti Kuyruğa Alma Protokolü 1.0 ("AMQP 1.0") standardını temel alan zaman uyumsuz bir mesajlaşma platformu sağlar. Service Bus desteklenen Azure platformları arasında kullanılabilir.

Önkoşullar

Not

Hesabınıza Azure Service Bus kaynaklarınıza erişim vermek için, kullanmakta olduğunuz Microsoft Entra hesabına Azure Service Bus Data Sender ve Azure Service Bus Data Receiver rolünü atayın. Erişim rolleri verme hakkında daha fazla bilgi için bkz. Azure portalını kullanarak Azure rolleri atama ve Azure Service Bus varlıklarına erişmek için Microsoft Entra ID ile bir uygulamayı kimlik doğrulama ve yetkilendirme.

Önemli

Bu makaledeki adımları tamamlamak için Spring Boot sürüm 2.5 veya üzeri gereklidir.

Azure Service Bus üzerinden mesaj gönderme ve alma

Azure Service Bus için bir kuyruk veya konu aracılığıyla Spring Cloud Azure Stream Binder Service Bus kullanarak ileti gönderip alabilirsiniz.

Spring Cloud Azure Stream Binder Service Bus modülünü yüklemek için pom.xml dosyanıza aşağıdaki bağımlılıkları ekleyin:

  • Spring Cloud Azure Ürün Reçetesi( BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>7.2.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Not

    Spring Boot 4.0.x kullanıyorsanız, spring-cloud-azure-dependencies sürümünü 7.2.0 olarak ayarladığınızdan emin olun.

    Spring Boot 3.5.x kullanıyorsanız, spring-cloud-azure-dependencies sürümünü 6.2.0 olarak ayarladığınızdan emin olun.

    Spring Boot 3.1.x-3.5.x kullanıyorsanız, spring-cloud-azure-dependencies sürümünü 5.25.0 olarak ayarladığınızdan emin olun.

    Eğer Spring Boot 2.x kullanıyorsanız, spring-cloud-azure-dependencies sürümünü 4.20.0 olarak ayarladığınızdan emin olun.

    Bu Parça Listesi (BOM), <dependencyManagement> bölümünde pom.xml dosyanızda yapılandırılmalıdır. Bu, tüm Spring Cloud Azure bağımlılıklarının aynı sürümü kullanmasını sağlar.

    Bu ürün reçetesi için kullanılan sürüm hakkında daha fazla bilgi için bkz: Hangi Spring Cloud Azure Sürümünü Kullanmalıyım.

  • Spring Cloud Azure Stream Binder Service Bus yapıtı:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Uygulamayı kodlama

Uygulamanızı ileti göndermek ve almak için bir Service Bus kuyruğu veya konu başlığı kullanacak şekilde yapılandırmak için aşağıdaki adımları kullanın.

  1. application.properties yapılandırma dosyasındaki Service Bus kimlik bilgilerini yapılandırın.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    Aşağıdaki tabloda yapılandırmadaki alanlar açıklanmaktadır:

    Alan Açıklama
    spring.cloud.azure.servicebus.namespace Azure Portalı'ndan aldığınız Service Bus ad alanını belirtin.
    spring.cloud.stream.bindings.consume-in-0.destination Bu öğreticide kullandığınız Service Bus kuyruğu veya Service Bus konusunu belirtin.
    spring.cloud.stream.bindings.supply-out-0.destination Giriş hedefinde kullanılan değerin aynısını belirtin.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete İletilerin otomatik olarak uzlaştırılıp uzlaştırılmayacağını belirtin. falseolarak ayarlanırsa, geliştiricilerin iletileri el ile çözebilmesi için Checkpointer ileti üst bilgisi eklenir.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Çıkış bağlaması için varlık türünü belirtin; queue veya topic olabilir.
    spring.cloud.function.definition Bağlamalar tarafından kullanıma sunulan dış hedeflere hangi işlevsel çekirdeğin bağlanacağını belirtin.
    spring.cloud.stream.poller.fixed-delay Varsayılan poller için sabit gecikmeyi milisaniye cinsinden belirtin. Varsayılan değer 1000 L değeridir. Önerilen değerdir 60000.
    spring.cloud.stream.poller.initial-delay Düzenli tetikleyiciler için ilk gecikmeyi belirtin. Varsayılan değer 0 değeridir.
  2. Aşağıdaki içeriği göstermek için başlangıç sınıfı dosyasını düzenleyin.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    İpucu

    Bu öğreticide yapılandırmalarda veya kodda kimlik doğrulama işlemi yoktur. Ancak, Azure hizmetlerine bağlanmak için kimlik doğrulaması gerekir. Kimlik doğrulamasını tamamlamak için Azure Identity kullanmanız gerekir. Spring Cloud Azure, Azure Kimlik kitaplığının herhangi bir kod değişikliği yapmadan kimlik bilgilerini almanıza yardımcı olmak için sağladığı DefaultAzureCredential kullanır.

    DefaultAzureCredential birden çok kimlik doğrulama yöntemini destekler ve çalışma zamanında hangi yöntemin kullanılacağını belirler. Bu yaklaşım, uygulamanızın ortama özgü kod uygulamadan farklı ortamlarda (yerel ve üretim ortamları gibi) farklı kimlik doğrulama yöntemleri kullanmasını sağlar. Daha fazla bilgi için bkz. DefaultAzureCredential.

    Yerel geliştirme ortamlarında kimlik doğrulamasını tamamlamak için Azure CLI, Visual Studio Code, PowerShell veya diğer yöntemleri kullanabilirsiniz. Daha fazla bilgi için bkz. Java geliştirme ortamlarında Azure kimlik doğrulaması. Azure barındırma ortamlarında kimlik doğrulamasını tamamlamak için kullanıcı tarafından atanan yönetilen kimliği kullanmanızı öneririz. Daha fazla bilgi için bkz. Azure kaynakları için yönetilen kimlikler nelerdir?

  3. Uygulamayı başlatın. Aşağıdaki örnek gibi iletiler uygulama günlüğünüze gönderilecektir:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Sonraki adımlar