Spring Cloud Stream az Azure Service Bus használatával
Ez a cikk bemutatja, hogyan használhatja a Spring Cloud Azure Stream Bindert a Service Bus queues
és topics
elemeinek küldött és innen fogadott üzenetekhez.
Az Azure egy Azure Service Bus („Service Bus”) nevű aszinkron üzenetkezelő platformmal rendelkezik, amely az Advanced Message Queueing Protocol 1.0 („AMQP 1.0”) szabványon alapul. A Service Bus minden támogatott Azure-platformon használható.
Előfeltételek
Azure-előfizetés – hozzon létre egyet ingyenesen.
A Java Development Kit (JDK) 8- vagy újabb verziója.
Apache Maven, 3.2-es vagy újabb verzió.
cURL vagy hasonló HTTP-segédprogram a funkciók teszteléséhez.
Üzenetsor vagy témakör az Azure Service Bushoz. Ha nincs ilyenje, hozzon létre egy Service Bus-üzenetsort , vagy hozzon létre egy Service Bus-témakört.
Egy Spring Boot-alkalmazás. Ha nem rendelkezik ilyennel, hozzon létre egy Maven-projektet a Spring Initializrrel. Mindenképpen válassza a Maven Projectet, és a Függőségek területen adja hozzá a Spring Web és az Azure támogatási függőségeit, majd válassza a Java 8-es vagy újabb verzióját.
Feljegyzés
Ha hozzáférést szeretne adni a fiókjának az Azure Service Bus-erőforrásokhoz, rendelje hozzá a Azure Service Bus Data Sender
szerepkört a Azure Service Bus Data Receiver
jelenleg használt Microsoft Entra-fiókhoz. A hozzáférési szerepkörök engedélyezésével kapcsolatos további információkért lásd : Azure-szerepkörök hozzárendelése az Azure Portallal és hitelesítés, valamint egy Microsoft Entra-azonosítóval rendelkező alkalmazás engedélyezése az Azure Service Bus-entitásokhoz való hozzáféréshez.
Fontos
A cikkben ismertetett lépések elvégzéséhez a Spring Boot 2.5-ös vagy újabb verziójára van szükség.
Üzenetek küldése és fogadása az Azure Service Busból
Az Azure Service Bus üzenetsorával vagy témakörével üzeneteket küldhet és fogadhat a Spring Cloud Azure Stream Binder Service Bus használatával.
A Spring Cloud Azure Stream Binder Service Bus modul telepítéséhez adja hozzá a következő függőségeket a pom.xml fájlhoz:
A Spring Cloud Azure Anyagjegyzéke (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.13.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Feljegyzés
Ha Spring Boot 2.x-et használ, mindenképpen állítsa be a verziót
4.19.0
.spring-cloud-azure-dependencies
Ezt az anyagjegyzéket (BOM) a<dependencyManagement>
pom.xml fájl szakaszában kell konfigurálni. Ez biztosítja, hogy minden Spring Cloud Azure-függőség ugyanazt a verziót használja. A BOM-hez használt verzióról további információt a Spring Cloud Azure melyik verzióját érdemes használni.A Spring Cloud Azure Stream Binder Service Bus-összetevő:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
Az alkalmazás kódolása
Az alábbi lépésekkel konfigurálhatja az alkalmazást, hogy Service Bus-üzenetsort vagy -témakört használjon az üzenetek küldéséhez és fogadásához.
Konfigurálja a Service Bus hitelesítő adatait a konfigurációs fájlban
application.properties
.spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0
Az alábbi táblázat a konfiguráció mezőit ismerteti:
Mező Leírás spring.cloud.azure.servicebus.namespace
Adja meg a Service Busban az Azure Portalról beszerzett névteret. spring.cloud.stream.bindings.consume-in-0.destination
Adja meg az oktatóanyag során használt Service Bus-üzenetsort vagy Service Bus-témakört. spring.cloud.stream.bindings.supply-out-0.destination
A bemeneti célnak használt értéket adja meg. spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
Adja meg, hogy automatikusan rendezze-e az üzeneteket. Ha hamisként van beállítva, a rendszer hozzáad egy üzenetfejlécet Checkpointer
, hogy a fejlesztők manuálisan rendezhessék az üzeneteket.spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
Adja meg a kimeneti kötés entitástípusát, lehet queue
vagytopic
.spring.cloud.function.definition
Adja meg, hogy melyik funkcionális babot kell a kötések által közzétett külső célhelyhez kötni. spring.cloud.stream.poller.fixed-delay
Adja meg az alapértelmezett poller rögzített késleltetését ezredmásodpercben. Az alapértelmezett érték 1000 L. A javasolt érték 60000. spring.cloud.stream.poller.initial-delay
Adja meg a rendszeres eseményindítók kezdeti késleltetését. Az alapértelmezett érték 0. Szerkessze az indítási osztályfájlt a következő tartalom megjelenítéséhez.
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
Tipp.
Ebben az oktatóanyagban nincsenek hitelesítési műveletek a konfigurációkban vagy a kódban. Az Azure-szolgáltatásokhoz való csatlakozáshoz azonban hitelesítés szükséges. A hitelesítés befejezéséhez az Azure Identity-et kell használnia. A Spring Cloud Azure
DefaultAzureCredential
az Azure Identity-kódtár használatával segít a hitelesítő adatok kódmódosítások nélküli beszerzésében.DefaultAzureCredential
több hitelesítési módszert támogat, és meghatározza, hogy melyik metódust használja futásidőben. Ez a megközelítés lehetővé teszi, hogy az alkalmazás különböző hitelesítési módszereket használjon különböző környezetekben (például helyi és éles környezetekben) környezetspecifikus kód implementálása nélkül. További információ: DefaultAzureCredential.A helyi fejlesztési környezetekben a hitelesítés befejezéséhez használhatja az Azure CLI-t, a Visual Studio Code-ot, a PowerShellt vagy más módszereket. További információ: Azure-hitelesítés Java-fejlesztési környezetekben. A hitelesítés azure-beli üzemeltetési környezetekben való elvégzéséhez javasoljuk a felhasználó által hozzárendelt felügyelt identitás használatát. További információ: Mik az Azure-erőforrások felügyelt identitásai?
Indítsa el az alkalmazást. Az alábbi példához hasonló üzenetek megjelennek az alkalmazásnaplóban:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed
Következő lépések
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: