Megosztás a következőn keresztül:


Spring Cloud Stream az Azure Service Bus használatával

Ez a cikk bemutatja, hogyan használhatja a Spring Cloud Azure Stream Bindert a Service Bus queues és topics elemeinek küldött és innen fogadott üzenetekhez.

Az Azure egy Azure Service Bus („Service Bus”) nevű aszinkron üzenetkezelő platformmal rendelkezik, amely az Advanced Message Queueing Protocol 1.0 („AMQP 1.0”) szabványon alapul. A Service Bus minden támogatott Azure-platformon használható.

Előfeltételek

Feljegyzés

Ha hozzáférést szeretne adni a fiókjának az Azure Service Bus-erőforrásokhoz, rendelje hozzá a Azure Service Bus Data Sender szerepkört a Azure Service Bus Data Receiver jelenleg használt Microsoft Entra-fiókhoz. A hozzáférési szerepkörök engedélyezésével kapcsolatos további információkért lásd : Azure-szerepkörök hozzárendelése az Azure Portallal és hitelesítés, valamint egy Microsoft Entra-azonosítóval rendelkező alkalmazás engedélyezése az Azure Service Bus-entitásokhoz való hozzáféréshez.

Fontos

A cikkben ismertetett lépések elvégzéséhez a Spring Boot 2.5-ös vagy újabb verziójára van szükség.

Üzenetek küldése és fogadása az Azure Service Busból

Az Azure Service Bus üzenetsorával vagy témakörével üzeneteket küldhet és fogadhat a Spring Cloud Azure Stream Binder Service Bus használatával.

A Spring Cloud Azure Stream Binder Service Bus modul telepítéséhez adja hozzá a következő függőségeket a pom.xml fájlhoz:

  • A Spring Cloud Azure Anyagjegyzéke (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.13.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Feljegyzés

    Ha Spring Boot 2.x-et használ, mindenképpen állítsa be a verziót 4.19.0.spring-cloud-azure-dependencies Ezt az anyagjegyzéket (BOM) a <dependencyManagement> pom.xml fájl szakaszában kell konfigurálni. Ez biztosítja, hogy minden Spring Cloud Azure-függőség ugyanazt a verziót használja. A BOM-hez használt verzióról további információt a Spring Cloud Azure melyik verzióját érdemes használni.

  • A Spring Cloud Azure Stream Binder Service Bus-összetevő:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Az alkalmazás kódolása

Az alábbi lépésekkel konfigurálhatja az alkalmazást, hogy Service Bus-üzenetsort vagy -témakört használjon az üzenetek küldéséhez és fogadásához.

  1. Konfigurálja a Service Bus hitelesítő adatait a konfigurációs fájlban application.properties.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    Az alábbi táblázat a konfiguráció mezőit ismerteti:

    Mező Leírás
    spring.cloud.azure.servicebus.namespace Adja meg a Service Busban az Azure Portalról beszerzett névteret.
    spring.cloud.stream.bindings.consume-in-0.destination Adja meg az oktatóanyag során használt Service Bus-üzenetsort vagy Service Bus-témakört.
    spring.cloud.stream.bindings.supply-out-0.destination A bemeneti célnak használt értéket adja meg.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Adja meg, hogy automatikusan rendezze-e az üzeneteket. Ha hamisként van beállítva, a rendszer hozzáad egy üzenetfejlécetCheckpointer, hogy a fejlesztők manuálisan rendezhessék az üzeneteket.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Adja meg a kimeneti kötés entitástípusát, lehet queue vagy topic.
    spring.cloud.function.definition Adja meg, hogy melyik funkcionális babot kell a kötések által közzétett külső célhelyhez kötni.
    spring.cloud.stream.poller.fixed-delay Adja meg az alapértelmezett poller rögzített késleltetését ezredmásodpercben. Az alapértelmezett érték 1000 L. A javasolt érték 60000.
    spring.cloud.stream.poller.initial-delay Adja meg a rendszeres eseményindítók kezdeti késleltetését. Az alapértelmezett érték 0.
  2. Szerkessze az indítási osztályfájlt a következő tartalom megjelenítéséhez.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Tipp.

    Ebben az oktatóanyagban nincsenek hitelesítési műveletek a konfigurációkban vagy a kódban. Az Azure-szolgáltatásokhoz való csatlakozáshoz azonban hitelesítés szükséges. A hitelesítés befejezéséhez az Azure Identity-et kell használnia. A Spring Cloud Azure DefaultAzureCredentialaz Azure Identity-kódtár használatával segít a hitelesítő adatok kódmódosítások nélküli beszerzésében.

    DefaultAzureCredential több hitelesítési módszert támogat, és meghatározza, hogy melyik metódust használja futásidőben. Ez a megközelítés lehetővé teszi, hogy az alkalmazás különböző hitelesítési módszereket használjon különböző környezetekben (például helyi és éles környezetekben) környezetspecifikus kód implementálása nélkül. További információ: DefaultAzureCredential.

    A helyi fejlesztési környezetekben a hitelesítés befejezéséhez használhatja az Azure CLI-t, a Visual Studio Code-ot, a PowerShellt vagy más módszereket. További információ: Azure-hitelesítés Java-fejlesztési környezetekben. A hitelesítés azure-beli üzemeltetési környezetekben való elvégzéséhez javasoljuk a felhasználó által hozzárendelt felügyelt identitás használatát. További információ: Mik az Azure-erőforrások felügyelt identitásai?

  3. Indítsa el az alkalmazást. Az alábbi példához hasonló üzenetek megjelennek az alkalmazásnaplóban:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Következő lépések