Spring Cloud Stream mit Azure Service Bus
In diesem Artikel wird veranschaulicht, wie Spring Cloud Stream Binder zum Senden und Empfangen von Nachrichten von queues
und topics
von Service Bus verwendet wird.
Azure stellt eine asynchrone Nachrichtenplattform namens Azure Service Bus (auch „Service Bus“) bereit, die auf dem Standard AMQP 1.0 („Advanced Message Queueing Protocol 1.0“) basiert. Service Bus kann für alle unterstützten Azure-Plattformen verwendet werden.
Voraussetzungen
Azure-Abonnement (kostenloses Abonnement erstellen)
Java Development Kit (JDK) Version 8 oder höher.
Apache Maven, Version 3.2 oder höher.
cURL oder ein ähnliches HTTP-Hilfsprogramm zum Testen der Funktionalität.
Eine Warteschlange oder ein Thema für Azure Service Bus. Wenn Sie über keines verfügen, erstellen Sie eine ServiceBus-Warteschlange , oder erstellen Sie ein Service Bus-Thema.
Ein Spring Boot-Anwendung. Wenn Sie keine Spring Boot-Anwendung besitzen, erstellen Sie mit Spring Initializr ein Maven-Projekt. Wählen Sie "Maven Project" aus, und fügen Sie unter "Abhängigkeiten" die Abhängigkeiten "Spring Web" und "Azure Support" hinzu, und wählen Sie dann Java Version 8 oder höher aus.
Hinweis
Um Ihrem Konto Zugriff auf Ihre Azure Service Bus-Ressourcen zu gewähren, weisen Sie die und Azure Service Bus Data Receiver
die Azure Service Bus Data Sender
Rolle dem Microsoft Entra-Konto zu, das Sie derzeit verwenden. Weitere Informationen zum Gewähren von Zugriffsrollen finden Sie unter Zuweisen von Azure-Rollen mithilfe der Azure-Portal und Authentifizieren und Autorisieren einer Anwendung mit Microsoft Entra ID für den Zugriff auf Azure Service Bus-Entitäten.
Wichtig
Spring Boot Version 2.5 oder höher ist erforderlich, um die Schritte in diesem Artikel auszuführen.
Senden und Empfangen von Nachrichten von Azure Service Bus
Mit einer Warteschlange oder einem Thema für Azure Service Bus können Sie Nachrichten mit Spring Cloud Azure Stream Binder Service Bus senden und empfangen.
Um das Spring Cloud Azure Stream Binder Service Bus-Modul zu installieren, fügen Sie Ihrer pom.xml Datei die folgenden Abhängigkeiten hinzu:
Die Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Hinweis
Wenn Sie Spring Boot 2.x verwenden, stellen Sie sicher, dass Sie die
spring-cloud-azure-dependencies
Version auf4.17.0
. Diese Stückliste (Bill of Material, BOM) sollte im<dependencyManagement>
Abschnitt Ihrer pom.xml Datei konfiguriert werden. Dadurch wird sichergestellt, dass alle Spring Cloud Azure-Abhängigkeiten dieselbe Version verwenden. Weitere Informationen zu der version, die für diese BOM verwendet wird, finden Sie unter Welche Version von Spring Cloud Azure sollte ich verwenden.Das Spring Cloud Azure Stream Binder Service Bus-Artefakt:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
Codieren der Anwendung
Führen Sie die folgenden Schritte aus, um Ihre Anwendung so zu konfigurieren, dass eine ServiceBus-Warteschlange oder ein Thema zum Senden und Empfangen von Nachrichten verwendet wird.
Konfigurieren Sie die Service Bus-Anmeldeinformationen in der Konfigurationsdatei
application.properties
.spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0
In der folgenden Tabelle werden die Felder in der Konfiguration beschrieben:
Feld Beschreibung spring.cloud.azure.servicebus.namespace
Geben Sie den Namespace an, den Sie in Ihrem Service Bus aus dem Azure-Portal abgerufen haben. spring.cloud.stream.bindings.consume-in-0.destination
Geben Sie Service Bus-Warteschlange oder das Service Bus-Thema an, das Sie in diesem Tutorial verwendet haben. spring.cloud.stream.bindings.supply-out-0.destination
Geben Sie denselben Wert an, den Sie für das Eingabeziel verwendet haben. spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
Geben Sie an, ob Nachrichten automatisch abgleichen sollen. Wenn dieser Wert auf "false" festgelegt ist, wird eine Nachrichtenkopfzeile Checkpointer
hinzugefügt, mit der Entwickler Nachrichten manuell abgleichen können.spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
Geben Sie den Entitätstyp für die Ausgabebindung an, kann oder queue
topic
.spring.cloud.function.definition
Geben Sie an, welches funktionale Bean an die externen Ziele gebunden werden soll, die von den Bindungen verfügbar gemacht werden. spring.cloud.stream.poller.fixed-delay
Geben Sie feste Verzögerung für standardabfrager in Millisekunden an. Der Standardwert ist 1000 L. Der empfohlene Wert ist 60000. spring.cloud.stream.poller.initial-delay
Geben Sie die anfängliche Verzögerung für periodische Trigger an. Der Standardwert ist 0. Bearbeiten Sie die Startklassendatei, um den folgenden Inhalt anzuzeigen.
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
Tipp
In diesem Lernprogramm gibt es keine Authentifizierungsvorgänge in den Konfigurationen oder im Code. Für die Verbindung mit Azure-Diensten ist jedoch eine Authentifizierung erforderlich. Um die Authentifizierung abzuschließen, müssen Sie Azure Identity verwenden. Spring Cloud Azure verwendet
DefaultAzureCredential
, die von der Azure Identity-Bibliothek bereitgestellt wird, um Anmeldeinformationen ohne Codeänderungen zu erhalten.„
DefaultAzureCredential
“ unterstützt mehrere Authentifizierungsmethoden und bestimmt, welche Methode zur Laufzeit verwendet wird. Mit diesem Ansatz kann Ihre App unterschiedliche Authentifizierungsmethoden in verschiedenen Umgebungen (z. B. in lokalen und Produktionsumgebungen) verwenden, ohne umgebungsspezifischen Code zu implementieren. Weitere Informationen finden Sie unter DefaultAzureCredential.Um die Authentifizierung in lokalen Entwicklungsumgebungen abzuschließen, können Sie Azure CLI, Visual Studio Code, PowerShell oder andere Methoden verwenden. Weitere Informationen finden Sie unter Azure-Authentifizierung in Java-Entwicklungsumgebungen. Um die Authentifizierung in Azure-Hostingumgebungen abzuschließen, empfehlen wir die Verwendung der vom Benutzer zugewiesenen verwalteten Identität. Weitere Informationen finden Sie unter Was sind verwaltete Identitäten für Azure-Ressourcen?.
Starten Sie die Anwendung. Nachrichten wie das folgende Beispiel werden in Ihrem Anwendungsprotokoll gepostet:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed
Nächste Schritte
Feedback
https://aka.ms/ContentUserFeedback.
Bald verfügbar: Im Laufe des Jahres 2024 werden wir GitHub-Issues stufenweise als Feedbackmechanismus für Inhalte abbauen und durch ein neues Feedbacksystem ersetzen. Weitere Informationen finden Sie unterFeedback senden und anzeigen für