Azure Service Bus를 사용하는 Spring Cloud Stream

이 문서에서는 Spring Cloud Stream Binder를 사용하여 Service Bus queuestopics및 에서 메시지를 보내고 받는 방법을 보여 줍니다.

Azure는 AMQP 1.0("고급 메시지 큐 프로토콜 1.0") 표준을 기반으로 하는 Service Bus("Azure Service Bus")라는 비동기 메시징 플랫폼을 제공합니다. Service Bus는 지원되는 Azure 플랫폼 범위에서 사용할 수 있습니다.

필수 조건

  • Azure 구독 - 체험 구독 만들기

  • JDK(Java Development Kit) 버전 8 이상.

  • Apache Maven, 버전 3.2 이상.

  • cURL 또는 유사한 HTTP 유틸리티를 사용하여 기능을 테스트합니다.

  • Azure Service Bus에 대한 큐 또는 토픽입니다. 없는 경우 Service Bus 큐 를 만들거나 Service Bus 토픽을 만듭니다.

  • Spring Boot 애플리케이션 없는 경우 Spring Initializr를 사용하여 Maven 프로젝트를 만듭니다. Maven 프로젝트를 선택하고 종속성에서 Spring WebAzure 지원 종속성을 추가한 다음 Java 버전 8 이상을 선택합니다.

참고 항목

계정에 Azure Service Bus 리소스에 대한 액세스 권한을 부여하려면 현재 사용 중인 Microsoft Entra 계정에 해당 및 Azure Service Bus Data Receiver 역할을 할당 Azure Service Bus Data Sender 합니다. 액세스 역할 부여에 대한 자세한 내용은 Azure Portal을 사용하여 Azure 역할 할당을 참조하고, Azure Service Bus 엔터티에 액세스하기 위해 Microsoft Entra ID를 사용하여 애플리케이션을 인증하고 권한을 부여합니다.

Important

이 문서의 단계를 완료하려면 Spring Boot 버전 2.5 이상이 필요합니다.

Azure Service Bus에서 메시지 보내기 및 받기

Azure Service Bus에 대한 큐 또는 토픽을 사용하면 Spring Cloud Azure Stream Binder Service Bus를 사용하여 메시지를 보내고 받을 수 있습니다.

Spring Cloud Azure Stream Binder Service Bus 모듈을 설치하려면 pom.xml 파일에 다음 종속성을 추가합니다.

  • Spring Cloud AZURE BOM(청구서)

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.11.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    참고 항목

    Spring Boot 2.x를 사용하는 경우 버전을 .로 설정 spring-cloud-azure-dependencies 해야 합니다 4.17.0. 이 BOM(청구서)은 pom.xml 파일의 섹션에서 구성 <dependencyManagement> 해야 합니다. 이렇게 하면 모든 Spring Cloud Azure 종속성이 동일한 버전을 사용합니다. 이 BOM에 사용되는 버전에 대한 자세한 내용은 어떤 버전의 Spring Cloud Azure를 사용해야 하는지를 참조하세요.

  • Spring Cloud Azure Stream Binder Service Bus 아티팩트:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

애플리케이션 코딩

다음 단계를 사용하여 Service Bus 큐 또는 토픽을 사용하여 메시지를 보내고 받도록 애플리케이션을 구성합니다.

  1. 구성 파일 application.properties에서 Service Bus 자격 증명을 구성합니다.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    다음 표에서는 구성의 필드에 대해 설명합니다.

    필드 설명
    spring.cloud.azure.servicebus.namespace Azure Portal에서 Service Bus에서 가져온 네임스페이스를 지정합니다.
    spring.cloud.stream.bindings.consume-in-0.destination 이 자습서에서 사용한 Service Bus 큐 또는 Service Bus 항목을 지정합니다.
    spring.cloud.stream.bindings.supply-out-0.destination 입력 대상에 사용한 것과 동일한 값을 지정합니다.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete 메시지를 자동으로 정정할지 여부를 지정합니다. false설정하면 개발자가 메시지를 수동으로 해결할 수 있도록 메시지 헤더 Checkpointer 가 추가됩니다.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type 출력 바인딩 queuetopic에 대한 엔터티 형식을 지정합니다.
    spring.cloud.function.definition 바인딩에 의해 노출되는 외부 대상에 바인딩할 기능 빈을 지정합니다.
    spring.cloud.stream.poller.fixed-delay 기본 폴러의 고정 지연 시간(밀리초)을 지정합니다. 기본값은 1000L입니다. 권장 값은 60000입니다.
    spring.cloud.stream.poller.initial-delay 주기적 트리거에 대한 초기 지연을 지정합니다. 기본값은 0입니다.
  2. 시작 클래스 파일을 편집하여 다음 콘텐츠를 표시합니다.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    이 자습서에서는 구성 또는 코드에 인증 작업이 없습니다. 그러나 Azure 서비스에 연결하려면 인증이 필요합니다. 인증을 완료하려면 Azure ID를 사용해야 합니다. Spring Cloud Azure는 코드 변경 없이 자격 증명을 가져오는 데 도움이 되는 Azure ID 라이브러리를 사용합니다 DefaultAzureCredential.

    DefaultAzureCredential은 여러 인증 방법을 지원하고 런타임에 사용할 방법을 결정합니다. 이 방법을 사용하면 앱이 환경별 코드를 구현하지 않고도 다양한 환경(예: 로컬 및 프로덕션 환경)에서 다양한 인증 방법을 사용할 수 있습니다. 자세한 내용은 DefaultAzureCredential을 참조 하세요.

    로컬 개발 환경에서 인증을 완료하려면 Azure CLI, Visual Studio Code, PowerShell 또는 기타 방법을 사용할 수 있습니다. 자세한 내용은 Java 개발 환경에서 Azure 인증을 참조 하세요. Azure 호스팅 환경에서 인증을 완료하려면 사용자 할당 관리 ID를 사용하는 것이 좋습니다. 자세한 내용은 Azure 리소스에 대한 관리 ID란?을 참조하세요.

  3. 애플리케이션을 시작합니다. 다음 예제와 같은 메시지가 애플리케이션 로그에 게시됩니다.

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

다음 단계