Przeczytaj w języku angielskim

Udostępnij za pośrednictwem


Spring Cloud Stream z usługą Azure Service Bus

W tym artykule pokazano, jak używać bindera Spring Cloud Stream do wysyłania i odbierania komunikatów za pomocą usługi Service Bus queues i topics.

Platforma Azure udostępnia rozwiązanie do asynchronicznej obsługi komunikatów o nazwie Azure Service Bus („Service Bus”) oparte na standardzie Advanced Message Queueing Protocol 1.0 („AMQP 1.0”). Usługi Service Bus można używać na różnych obsługiwanych platformach Azure.

Wymagania wstępne

Uwaga

Aby udzielić dostępu do zasobów usługi Azure Service Bus, przypisz role Azure Service Bus Data Sender i Azure Service Bus Data Receiver do konta Microsoft Entra, którego obecnie używasz. Aby uzyskać więcej informacji na temat udzielania ról dostępu, zobacz Przypisywanie ról platformy Azure przy użyciu witryny Azure Portal i Uwierzytelnianie i autoryzowanie aplikacji przy użyciu identyfikatora Entra firmy Microsoft w celu uzyskania dostępu do jednostek usługi Azure Service Bus.

Ważne

Do wykonania kroków opisanych w tym artykule jest wymagany program Spring Boot w wersji 2.5 lub nowszej.

Wysyłanie i odbieranie komunikatów z usługi Azure Service Bus

Za pomocą kolejki lub tematu dla usługi Azure Service Bus można wysyłać i odbierać komunikaty przy użyciu Spring Cloud Azure Stream Binder Service Bus.

Aby zainstalować moduł Spring Cloud Azure Stream Binder Service Bus, dodaj następujące zależności do pliku pom.xml :

  • Projekt Spring Cloud Azure Bill of Materials (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.21.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Uwaga

    Jeśli używasz środowiska Spring Boot 2.x, pamiętaj, aby ustawić spring-cloud-azure-dependencies wersję na 4.19.0. Ta struktura materiałowa (BOM) powinna być skonfigurowana w sekcji <dependencyManagement> twojego pliku pom.xml. Gwarantuje to, że wszystkie zależności Spring Cloud Azure korzystają z jednej wersji. Aby uzyskać więcej informacji na temat wersji używanej dla tego BOM, zobacz Która wersja Spring Cloud Azure powinna być używana.

  • Artefakt Spring Cloud Azure Stream Binder Service Bus

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Kodowanie aplikacji

Wykonaj poniższe kroki, aby skonfigurować aplikację do używania kolejki lub tematu usługi Service Bus do wysyłania i odbierania komunikatów.

  1. Skonfiguruj poświadczenia usługi Service Bus w pliku application.properties konfiguracji.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    W poniższej tabeli opisano pola w konfiguracji:

    Pole opis
    spring.cloud.azure.servicebus.namespace Określ przestrzeń nazw uzyskaną w usłudze Service Bus w portalu Azure.
    spring.cloud.stream.bindings.consume-in-0.destination Podaj kolejkę lub temat usługi Service Bus, których użyto w tym samouczku.
    spring.cloud.stream.bindings.supply-out-0.destination Podaj tę samą wartość, której użyto dla miejsca docelowego elementu wejściowego.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Określ, czy komunikaty mają być automatycznie rozliczane. Jeśli ustawiono wartość false, zostanie dodany nagłówek komunikatu Checkpointer, aby umożliwić programistom ręczne zarządzanie komunikatami.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Określ typ jednostki dla powiązania wyjściowego, może to być queue lub topic.
    spring.cloud.function.definition Określ, który komponent funkcjonalny ma być powiązany z zewnętrznymi miejscami docelowymi udostępnianymi przez powiązania.
    spring.cloud.stream.poller.fixed-delay Określ stałe opóźnienie domyślnego pollera w milisekundach. Wartość domyślna to 1000 L. Zalecana wartość to 60000.
    spring.cloud.stream.poller.initial-delay Określ początkowe opóźnienie dla wyzwalaczy okresowych. Wartość domyślna to 0.
  2. Edytuj plik klasy uruchamiania, aby wyświetlić następującą zawartość.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Napiwek

    W tym samouczku nie ma żadnych procedur uwierzytelniania w konfiguracji czy kodzie. Jednak nawiązywanie połączenia z usługami platformy Azure wymaga uwierzytelniania. Aby ukończyć uwierzytelnianie, musisz użyć usługi Azure Identity. Platforma Spring Cloud na platformie Azure używa DefaultAzureCredentialbiblioteki tożsamości platformy Azure, która ułatwia uzyskiwanie poświadczeń bez żadnych zmian w kodzie.

    DefaultAzureCredential obsługuje wiele metod uwierzytelniania i określa, która metoda ma być używana w czasie wykonywania. Takie podejście umożliwia aplikacji używanie różnych metod uwierzytelniania w różnych środowiskach (takich jak środowiska lokalne i produkcyjne) bez implementowania kodu specyficznego dla środowiska. Aby uzyskać więcej informacji, zobacz DefaultAzureCredential.

    Aby ukończyć uwierzytelnianie w lokalnych środowiskach deweloperskich, możesz użyć interfejsu wiersza polecenia platformy Azure, programu Visual Studio Code, programu PowerShell lub innych metod. Aby uzyskać więcej informacji, zobacz Uwierzytelnianie platformy Azure w środowiskach deweloperskich Java. Aby ukończyć uwierzytelnianie w środowiskach hostingu platformy Azure, zalecamy użycie tożsamości zarządzanej przypisanej przez użytkownika. Aby uzyskać więcej informacji, zobacz Co to są tożsamości zarządzane dla zasobów platformy Azure?

  3. Uruchom aplikację. Komunikaty podobne do poniższego przykładu zostaną opublikowane w dzienniku aplikacji:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Następne kroki