Spring Cloud Stream s Azure Service Bus

Tento článek ukazuje, jak pomocí aplikace Spring Cloud Stream Binder odesílat zprávy a přijímat zprávy z Service Bus queues a topics.

Azure poskytuje asynchronní platformu zasílání zpráv s názvem Azure Service Bus ("Service Bus") založenou na standardu Advanced Message Queueing Protocol 1.0 ("AMQP 1.0"). Service Bus lze použít v celé řadě podporovaných platforem Azure.

Požadavky

Poznámka:

Pokud chcete účtu udělit přístup k prostředkům Azure Service Bus, přiřaďte roli Azure Service Bus Data Sender a Azure Service Bus Data Receiver účtu Microsoft Entra, který právě používáte. Další informace o udělování přístupových rolí najdete v tématu Přiřazení rolí Azure pomocí portálu Azure a Ověření a autorizace aplikace pomocí Microsoft Entra ID pro přístup k entitám Azure Service Bus.

Důležité

K dokončení kroků v tomto článku se vyžaduje Spring Boot verze 2.5 nebo vyšší.

Odesílání a příjem zpráv z Azure Service Bus

Pomocí fronty nebo tématu pro Azure Service Bus můžete odesílat a přijímat zprávy pomocí aplikace Spring Cloud Azure Stream Binder Service Bus.

Pokud chcete nainstalovat modul Spring Cloud Azure Stream Binder Service Bus, přidejte do souboru pom.xml následující závislosti:

  • Spring Cloud Azure BOM:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>7.2.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Poznámka:

    Pokud používáte Spring Boot 4.0.x, nezapomeňte nastavit spring-cloud-azure-dependencies verzi na 7.2.0.

    Pokud používáte Spring Boot 3.5.x, nezapomeňte nastavit spring-cloud-azure-dependencies verzi na 6.2.0hodnotu.

    Pokud používáte Spring Boot 3.1.x-3.5.x, nezapomeňte nastavit verzi na spring-cloud-azure-dependencies.

    Pokud používáte Spring Boot 2.x, nezapomeňte nastavit spring-cloud-azure-dependencies verzi na 4.20.0.

    Tato faktura materiálu (BOM) by měla být nakonfigurována v <dependencyManagement> části vašeho pom.xml souboru. Tím zajistíte, že všechny závislosti Spring Cloud Azure budou používat stejnou verzi.

    Další informace o verzi použité pro tento kusovník najdete v tématu Kterou verzi Spring Cloud Azure bych měl použít.

  • Komponenta Spring Cloud Azure Stream Binder pro Service Bus:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Vytvoření kódu aplikace

Pomocí následujícího postupu nakonfigurujte aplikaci tak, aby k odesílání a příjmu zpráv používala Service Bus frontu nebo téma.

  1. Nakonfigurujte přihlašovací údaje Service Bus v konfiguračním souboru application.properties.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    Následující tabulka popisuje pole v konfiguraci:

    Pole Popis
    spring.cloud.azure.servicebus.namespace Zadejte obor názvů, který jste získali ve vašem prostředí Service Bus z portálu Azure.
    spring.cloud.stream.bindings.consume-in-0.destination Zadejte frontu Service Bus nebo téma Service Busu, které jste použili během tohoto kurzu.
    spring.cloud.stream.bindings.supply-out-0.destination Zadejte stejnou hodnotu jako u cíle vstupu.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Určete, zda se mají zprávy automaticky urovnat. Pokud je nastavená jako false, přidá se hlavička zprávy Checkpointer, která vývojářům umožní ručně vyřešit zprávy.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Zadejte typ entity pro výstupní vazbu, může být queue nebo topic.
    spring.cloud.function.definition Určete, která funkční komponenta se má svázat s externími cíli, které jsou dostupné skrze vazby.
    spring.cloud.stream.poller.fixed-delay Zadejte pevné zpoždění pro výchozí sondovací modul v milisekundách. Výchozí hodnota je 1000 L. Doporučená hodnota je 60000.
    spring.cloud.stream.poller.initial-delay Zadejte počáteční zpoždění pro pravidelné aktivační události. Výchozí hodnota je 0.
  2. Upravte soubor spouštěcí třídy, aby se zobrazil následující obsah.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Návod

    V tomto kurzu nejsou v konfiguracích ani kódu žádné ověřovací operace. Připojení ke službám Azure ale vyžaduje ověření. K dokončení ověřování je potřeba použít Azure Identity. Spring Cloud Azure používá DefaultAzureCredential, které knihovna identit Azure poskytuje, aby vám pomohla získat přihlašovací údaje bez jakýchkoli změn kódu.

    DefaultAzureCredential podporuje více metod ověřování a určuje, kterou metodu použít za běhu. Tento přístup umožňuje vaší aplikaci používat různé metody ověřování v různých prostředích (například v místních a produkčních prostředích) bez implementace kódu specifického pro prostředí. Další informace naleznete v tématu DefaultAzureCredential.

    K dokončení ověřování v místních vývojových prostředích můžete použít Azure CLI, Visual Studio Code, PowerShell nebo jiné metody. Další informace najdete v tématu ověřování Azure pro vývojová prostředí Java. K dokončení ověřování v Azure hostitelských prostředích doporučujeme použít spravovanou identitu přiřazenou uživatelem. Další informace najdete v tématu Co jsou spravované identity pro prostředky Azure?

  3. Spusťte aplikaci. Zprávy jako v následujícím příkladu se publikuje v protokolu vaší aplikace:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Další kroky