Spring Cloud Stream z usługą Azure Service Bus
W tym artykule pokazano, jak używać integratora strumienia Spring Cloud do wysyłania komunikatów do elementów queues
i topics
usługi Service Bus oraz odbierania komunikatów z usługi Service Bus.
Platforma Azure udostępnia rozwiązanie do asynchronicznej obsługi komunikatów o nazwie Azure Service Bus („Service Bus”) oparte na standardzie Advanced Message Queueing Protocol 1.0 („AMQP 1.0”). Usługi Service Bus można używać na różnych obsługiwanych platformach Azure.
Wymagania wstępne
Subskrypcja platformy Azure — utwórz bezpłatnie.
Zestaw Java Development Kit (JDK) w wersji 8 lub nowszej.
Apache Maven, wersja 3.2 lub nowsza.
cURL lub podobne narzędzie HTTP do testowania funkcjonalności.
Kolejka lub temat dla usługi Azure Service Bus. Jeśli go nie masz, utwórz kolejkę usługi Service Bus lub utwórz temat usługi Service Bus.
Aplikacja Spring Boot. Jeśli go nie masz, utwórz projekt Maven za pomocą narzędzia Spring Initializr. Pamiętaj, aby wybrać pozycję Projekt Maven i w obszarze Zależności dodaj zależności platformy Spring Web i Pomoc techniczna platformy Azure, a następnie wybierz pozycję Java w wersji 8 lub nowszej.
Uwaga
Aby udzielić kontu dostępu do zasobów usługi Azure Service Bus, przypisz Azure Service Bus Data Sender
rolę i Azure Service Bus Data Receiver
do konta Microsoft Entra, którego obecnie używasz. Aby uzyskać więcej informacji na temat udzielania ról dostępu, zobacz Przypisywanie ról platformy Azure przy użyciu witryny Azure Portal i Uwierzytelnianie i autoryzowanie aplikacji przy użyciu identyfikatora Entra firmy Microsoft w celu uzyskania dostępu do jednostek usługi Azure Service Bus.
Ważne
Do wykonania kroków opisanych w tym artykule jest wymagany program Spring Boot w wersji 2.5 lub nowszej.
Wysyłanie i odbieranie komunikatów z usługi Azure Service Bus
Za pomocą kolejki lub tematu dla usługi Azure Service Bus można wysyłać i odbierać komunikaty przy użyciu usługi Service Bus powiązania usługi Azure Stream Spring Cloud.
Aby zainstalować moduł Spring Cloud Azure Stream Binder Service Bus, dodaj następujące zależności do pliku pom.xml :
Projekt Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.17.1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Uwaga
Jeśli używasz środowiska Spring Boot 2.x, pamiętaj, aby ustawić
spring-cloud-azure-dependencies
wersję na4.19.0
. Ten rachunek materiału (BOM) należy skonfigurować w<dependencyManagement>
sekcji pliku pom.xml . Gwarantuje to, że wszystkie zależności platformy Azure platformy Spring Cloud korzystają z tej samej wersji. Aby uzyskać więcej informacji na temat wersji używanej dla tego modelu BOM, zobacz Która wersja platformy Spring Cloud platformy Azure powinna być używana.Artefakt usługi Service Bus binder usługi Azure Stream Spring Cloud:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
Kodowanie aplikacji
Wykonaj poniższe kroki, aby skonfigurować aplikację do używania kolejki lub tematu usługi Service Bus do wysyłania i odbierania komunikatów.
Skonfiguruj poświadczenia usługi Service Bus w pliku
application.properties
konfiguracji .spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0
W poniższej tabeli opisano pola w konfiguracji:
Pole opis spring.cloud.azure.servicebus.namespace
Określ przestrzeń nazw uzyskaną w usłudze Service Bus w witrynie Azure Portal. spring.cloud.stream.bindings.consume-in-0.destination
Podaj kolejkę lub temat usługi Service Bus, których użyto w tym samouczku. spring.cloud.stream.bindings.supply-out-0.destination
Podaj tę samą wartość, której użyto dla miejsca docelowego elementu wejściowego. spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
Określ, czy komunikaty mają być automatycznie rozliczane. W przypadku ustawienia wartości false nagłówek komunikatu Checkpointer
zostanie dodany, aby umożliwić deweloperom ręczne rozliczanie komunikatów.spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
Określ typ jednostki dla powiązania wyjściowego, może to być queue
lubtopic
.spring.cloud.function.definition
Określ, która fasola funkcjonalna ma być powiązana z zewnętrznymi miejscami docelowymi udostępnianymi przez powiązania. spring.cloud.stream.poller.fixed-delay
Określ stałe opóźnienie domyślnegollera w milisekundach. Wartość domyślna to 1000 L. Zalecana wartość to 60000. spring.cloud.stream.poller.initial-delay
Określ początkowe opóźnienie dla wyzwalaczy okresowych. Wartość domyślna to 0. Edytuj plik klasy uruchamiania, aby wyświetlić następującą zawartość.
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
Napiwek
W tym samouczku nie ma żadnych operacji uwierzytelniania w konfiguracjach ani kodzie. Jednak nawiązywanie połączenia z usługami platformy Azure wymaga uwierzytelniania. Aby ukończyć uwierzytelnianie, musisz użyć usługi Azure Identity. Platforma Spring Cloud na platformie Azure używa
DefaultAzureCredential
biblioteki tożsamości platformy Azure, która ułatwia uzyskiwanie poświadczeń bez żadnych zmian w kodzie.DefaultAzureCredential
obsługuje wiele metod uwierzytelniania i określa, która metoda ma być używana w czasie wykonywania. Takie podejście umożliwia aplikacji używanie różnych metod uwierzytelniania w różnych środowiskach (takich jak środowiska lokalne i produkcyjne) bez implementowania kodu specyficznego dla środowiska. Aby uzyskać więcej informacji, zobacz DefaultAzureCredential.Aby ukończyć uwierzytelnianie w lokalnych środowiskach deweloperskich, możesz użyć interfejsu wiersza polecenia platformy Azure, programu Visual Studio Code, programu PowerShell lub innych metod. Aby uzyskać więcej informacji, zobacz Uwierzytelnianie platformy Azure w środowiskach deweloperskich Java. Aby ukończyć uwierzytelnianie w środowiskach hostingu platformy Azure, zalecamy użycie tożsamości zarządzanej przypisanej przez użytkownika. Aby uzyskać więcej informacji, zobacz Co to są tożsamości zarządzane dla zasobów platformy Azure?
Uruchom aplikację. Komunikaty podobne do poniższego przykładu zostaną opublikowane w dzienniku aplikacji:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed