다음을 통해 공유


Spring Integration용 Spring Cloud Azure 지원

이 문서는 버전 4.14.0 ✔️ 버전 5.8.0에 적용됩니다. ✔️

Azure용 Spring Integration Extension은 Java용 Azure SDK에서 제공하는 다양한 서비스에 대한 Spring Integration 어댑터를 제공합니다. Event Hubs, Service Bus, Storage 큐와 같은 Azure 서비스에 대한 Spring Integration 지원을 제공합니다. 다음은 지원되는 어댑터 목록입니다.

  • spring-cloud-azure-starter-integration-eventhubs - 자세한 내용은 Azure Event Hubs와 Spring Integration을 참조 하세요.
  • spring-cloud-azure-starter-integration-servicebus - 자세한 내용은 Azure Service Bus와 Spring Integration을 참조하세요 .
  • spring-cloud-azure-starter-integration-storage-queue - 자세한 내용은 Azure Storage 큐와 Spring Integration을 참조 하세요.

Azure Event Hubs와 Spring 통합

주요 개념

Azure Event Hubs는 빅 데이터 스트리밍 플랫폼 및 이벤트 수집 서비스입니다. 초당 수백만 개의 이벤트를 수신하고 처리할 수 있습니다. 이벤트 허브로 전송된 데이터는 실시간 분석 공급자 또는 일괄 처리/스토리지 어댑터를 사용하여 변환하고 저장할 수 있습니다.

Spring Integration은 Spring 기반 애플리케이션 내에서 간단한 메시징을 가능하게 하고 선언적 어댑터를 통해 외부 시스템과의 통합을 지원합니다. 이러한 어댑터는 원격, 메시징 및 일정 예약에 대한 Spring의 지원을 통해 더 높은 수준의 추상화가 제공됩니다. Event Hubs용 Spring Integration 확장 프로젝트는 Azure Event Hubs에 대한 인바운드 및 아웃바운드 채널 어댑터 및 게이트웨이를 제공합니다.

참고 항목

RxJava 지원 API는 버전 4.0.0에서 삭제됩니다. 자세한 내용은 Javadoc를 참조하세요.

소비자 그룹

Event Hubs는 Apache Kafka와 유사한 소비자 그룹을 지원하지만 논리는 약간 다릅니다. Kafka는 모든 커밋된 오프셋을 broker에 저장하지만 수동으로 처리되는 Event Hubs 메시지의 오프셋을 저장해야 합니다. Event Hubs SDK는 Azure Storage 내에 이러한 오프셋을 저장하는 함수를 제공합니다.

분할 지원

Event Hubs는 Kafka와 유사한 물리적 파티션 개념을 제공합니다. 그러나 소비자와 파티션 간에 Kafka의 자동 다시 분산과 달리 Event Hubs는 일종의 선점 모드를 제공합니다. 스토리지 계정은 소비자가 소유하는 파티션을 결정하는 임대 역할을 합니다. 새 소비자가 시작되면 워크로드 분산을 달성하기 위해 가장 많이 로드된 소비자로부터 일부 파티션을 도용하려고 합니다.

부하 분산 전략을 지정하기 위해 개발자는 구성에 사용할 EventHubsContainerProperties 수 있습니다. 구성하는 EventHubsContainerProperties방법에 대한 예제는 다음 섹션을 참조하세요.

Batch 소비자 지원

일괄 EventHubsInboundChannelAdapter 처리 사용 모드를 지원합니다. 이를 사용하도록 설정하기 위해 사용자는 인스턴스를 생성할 때와 같이 ListenerMode.BATCH 수신기 모드를 EventHubsInboundChannelAdapter 지정할 수 있습니다. 사용하도록 설정 하면 페이로드가 일괄 처리된 이벤트 목록인 메시지가 수신되고 다운스트림 채널로 전달됩니다. 또한 각 메시지 헤더는 목록으로 변환되며, 그 중 콘텐츠는 각 이벤트에서 구문 분석된 연결된 헤더 값입니다. 파티션 ID, 검사pointer 및 마지막으로 큐에 추가된 속성의 공용 헤더의 경우 동일한 이벤트를 공유하는 전체 이벤트 일괄 처리에 대한 단일 값으로 표시됩니다. 자세한 내용은 Event Hubs 메시지 헤더 섹션을 참조 하세요 .

참고 항목

검사point 헤더는 MANUAL 검사point 모드를 사용하는 경우에만 존재합니다.

일괄 처리 소비자의 검사점은 두 가지 모드를 지원합니다. BATCHMANUAL BATCHmode는 이벤트가 수신되면 전체 이벤트 일괄 처리를 검사검사포인트 지정 모드입니다. MANUAL모드는 검사사용자별 이벤트를 지정하는 것입니다. 사용하는 경우 검사점메시지 헤더에 전달되고 사용자는 이를 사용하여 검사pointing을 수행할 수 있습니다.

일괄 처리 사용 정책은 선택적이지만 필요한 속성인 경우 및 max-sizemax-wait-timemax-size 해당 속성 max-wait-time 으로 지정할 수 있습니다. 일괄 처리 사용 전략을 지정하기 위해 개발자는 구성에 사용할 EventHubsContainerProperties 수 있습니다. 구성하는 EventHubsContainerProperties방법에 대한 예제는 다음 섹션을 참조하세요.

종속성 설정

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

구성

이 시작에서는 다음과 같은 세 가지 구성 옵션을 제공합니다.

커넥트ion 구성 속성

이 섹션에는 Azure Event Hubs에 연결하는 데 사용되는 구성 옵션이 포함되어 있습니다.

참고 항목

보안 주체를 사용하여 Azure 리소스에 액세스하기 위해 Microsoft Entra ID로 인증하고 권한을 부여하도록 선택한 경우 Microsoft Entra ID를 사용하여 액세스 권한 부여를 참조하여 보안 주체에게 Azure 리소스에 액세스할 수 있는 충분한 권한이 부여되었는지 확인합니다.

spring-cloud-azure-starter-integration-eventhubs의 커넥트ion 구성 가능한 속성:

속성 Type 설명
spring.cloud.azure.eventhubs.enabled 부울 값 Azure Event Hubs를 사용할 수 있는지 여부입니다.
spring.cloud.azure.eventhubs.connection-string 문자열 Event Hubs 네임스페이스 연결 문자열 값입니다.
spring.cloud.azure.eventhubs.namespace 문자열 FQDN의 접두사인 Event Hubs 네임스페이스 값입니다. FQDN은 NamespaceName.Do기본 이름으로 구성되어야 합니다.
spring.cloud.azure.eventhubs.do기본-name 문자열 Azure Event Hubs 네임스페이스 값의 이름을 기본.
spring.cloud.azure.eventhubs.custom-endpoint-address 문자열 사용자 지정 엔드포인트 주소입니다.
spring.cloud.azure.eventhubs.shared-connection 부울 기본 EventProcessorClient 및 EventHubProducerAsyncClient가 동일한 연결을 사용하는지 여부입니다. 기본적으로 새 연결이 생성되고 생성된 각 Event Hub 클라이언트에 대해 생성됩니다.

검사점 구성 속성

이 섹션에는 파티션 소유권 및 검사포인트 정보를 유지하는 데 사용되는 Storage Blobs 서비스에 대한 구성 옵션이 포함되어 있습니다.

참고 항목

버전 4.0.0부터 spring.cloud.azure.eventhubs.processor.검사point-store.create-container-if-exists는 수동으로 사용하도록 설정되지 않으며 스토리지 컨테이너가 자동으로 만들어지지 않습니다.

spring-cloud-azure-starter-integration-eventhubs의 구성 가능한 속성 검사점:

속성 Type 설명
spring.cloud.azure.eventhubs.processor. 검사point-store.create-container-if-not-exists 부울 컨테이너가 없는 경우 컨테이너를 만들 수 있도록 허용할지 여부입니다.
spring.cloud.azure.eventhubs.processor. 검사point-store.account-name 문자열 스토리지 계정의 이름입니다.
spring.cloud.azure.eventhubs.processor. 검사point-store.account-key 문자열 스토리지 계정 액세스 키입니다.
spring.cloud.azure.eventhubs.processor. 검사point-store.container-name 문자열 스토리지 컨테이너 이름입니다.

일반적인 Azure 서비스 SDK 구성 옵션은 Storage Blob 검사point 저장소에도 구성할 수 있습니다. 지원되는 구성 옵션은 Spring Cloud Azure 구성도입되었으며 통합 접두사 spring.cloud.azure. 또는 접두spring.cloud.azure.eventhubs.processor.checkpoint-store사로 구성할 수 있습니다.

Event Hub 프로세서 구성 속성

EventHubsInboundChannelAdapter 경우 이벤트 허브의 메시지를 사용하여 EventProcessorClient 개발자가 구성에 사용할 EventHubsContainerProperties 수 있는 EventProcessorClient전체 속성을 구성합니다. 작업 EventHubsInboundChannelAdapter방법에 대한 다음 섹션을 참조하세요.

기본 사용법

Azure Event Hubs에 메시지 보내기

  1. 자격 증명 구성 옵션을 채웁니다.

    • 연결 문자열 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • 자격 증명을 관리 ID로 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • 서비스 주체로서의 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

참고 항목

허용되는 tenant-id 값은 , commonorganizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.

  1. Bean을 EventHubsTemplate 사용하여 만들어 DefaultMessageHandler Event Hubs에 메시지를 보냅니다.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. 메시지 채널을 통해 위의 메시지 처리기를 사용하여 메시지 게이트웨이 바인딩을 만듭니다.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. 게이트웨이를 사용하여 메시지를 보냅니다.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Azure Event Hubs에서 메시지 받기

  1. 자격 증명 구성 옵션을 채웁니다.

  2. 메시지 채널의 빈을 입력 채널로 만듭니다.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Bean을 EventHubsMessageListenerContainer 사용하여 만들어 EventHubsInboundChannelAdapter Event Hubs에서 메시지를 받습니다.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. 이전에 만든 메시지 채널을 통해 EventHubsInboundChannelAdapter를 사용하여 메시지 수신기 바인딩을 만듭니다.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

objectMapper를 사용자 지정하도록 EventHubsMessageConverter 구성

EventHubsMessageConverter 는 사용자가 ObjectMapper를 사용자 지정할 수 있도록 구성 가능한 콩으로 만들어집니다.

Batch 소비자 지원

Event Hubs의 메시지를 일괄 처리로 사용하려면 위의 샘플과 유사하며, 사용자가 일괄 처리 사용 관련 구성 옵션을 EventHubsInboundChannelAdapter설정해야 합니다.

만들 EventHubsInboundChannelAdapter때 수신기 모드를 .로 BATCH설정해야 합니다. 빈을 EventHubsMessageListenerContainer만들 때 검사포인트 모드를 하나 MANUAL 또는 BATCH로 설정하고 필요에 따라 일괄 처리 옵션을 구성할 수 있습니다.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Event Hubs 메시지 헤더

다음 표에서는 Event Hubs 메시지 속성이 Spring 메시지 헤더에 매핑되는 방법을 보여 줍니다. Azure Event Hubs의 경우 메시지를 .로 event호출합니다.

레코드 수신기 모드에서 Event Hubs 메시지/이벤트 속성과 Spring Message 헤더 간의 매핑:

Event Hubs 이벤트 속성 Spring Message 헤더 상수 Type 설명
큐에 담은 시간 EventHubsHeaders#ENQUEUED_TIME 인스턴트 이벤트가 이벤트 허브 파티션에 큐에 포함된 경우의 인스턴스(UTC)입니다.
Offset EventHubsHeaders#OFFSET Long 연결된 이벤트 허브 파티션에서 수신된 이벤트의 오프셋입니다.
파티션 키 AzureHeaders#PARTITION_KEY 문자열 원래 이벤트를 게시할 때 설정된 파티션 해시 키입니다.
파티션 ID AzureHeaders#RAW_PARTITION_ID 문자열 이벤트 허브의 파티션 ID입니다.
시퀀스 번호 EventHubsHeaders#SEQUENCE_NUMBER Long 연결된 이벤트 허브 파티션에 큐에 추가되었을 때 이벤트에 할당된 시퀀스 번호입니다.
마지막으로 큐에 찍힌 이벤트 속성 EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties 이 파티션에서 마지막으로 큐에 포함된 이벤트의 속성입니다.
해당 없음 AzureHeaders#CHECKPOINTER 검사점 특정 메시지를 검사포인트하기 위한 헤더입니다.

사용자는 각 이벤트의 관련 정보에 대한 메시지 헤더를 구문 분석할 수 있습니다. 이벤트에 대한 메시지 헤더를 설정하려면 모든 사용자 지정된 헤더가 이벤트의 애플리케이션 속성으로 배치됩니다. 여기서 헤더는 속성 키로 설정됩니다. Event Hubs에서 이벤트를 받으면 모든 애플리케이션 속성이 메시지 헤더로 변환됩니다.

참고 항목

파티션 키, 큐에 추가된 시간, 오프셋 및 시퀀스 번호의 메시지 헤더는 수동으로 설정할 수 없습니다.

일괄 처리 소비자 모드를 사용하도록 설정하면 일괄 처리된 메시지의 특정 헤더가 다음과 같이 나열됩니다. 여기에는 각 단일 Event Hubs 이벤트의 값 목록이 포함됩니다.

Batch 수신기 모드에서 Event Hubs 메시지/이벤트 속성과 Spring Message 헤더 간의 매핑:

Event Hubs 이벤트 속성 Spring Batch 메시지 헤더 상수 Type 설명
큐에 담은 시간 EventHubsHeaders#ENQUEUED_TIME 인스턴트 목록 각 이벤트가 이벤트 허브 파티션에 큐에 추가된 시점의 인스턴스 목록(UTC)입니다.
Offset EventHubsHeaders#OFFSET Long 목록 연결된 이벤트 허브 파티션에서 수신된 각 이벤트의 오프셋 목록입니다.
파티션 키 AzureHeaders#PARTITION_KEY 문자열 목록 원래 각 이벤트를 게시할 때 설정된 파티션 해시 키 목록입니다.
시퀀스 번호 EventHubsHeaders#SEQUENCE_NUMBER Long 목록 연결된 이벤트 허브 파티션에 큐에 추가되었을 때 각 이벤트에 할당된 시퀀스 번호 목록입니다.
시스템 속성 EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES 지도 목록 각 이벤트의 시스템 속성 목록입니다.
애플리케이션 속성 EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES 지도 목록 사용자 지정된 모든 메시지 헤더 또는 이벤트 속성이 배치되는 각 이벤트의 애플리케이션 속성 목록입니다.

참고 항목

메시지를 게시할 때 위의 모든 일괄 처리 헤더가 있는 경우 메시지에서 제거됩니다.

샘플

자세한 내용은 GitHub의 azure-spring-boot-samples 리포지토리를 참조하세요.

Azure Service Bus와 Spring 통합

주요 개념

Spring Integration은 Spring 기반 애플리케이션 내에서 간단한 메시징을 가능하게 하고 선언적 어댑터를 통해 외부 시스템과의 통합을 지원합니다.

Azure Service Bus용 Spring Integration 확장 프로젝트는 Azure Service Bus에 대한 인바운드 및 아웃바운드 채널 어댑터를 제공합니다.

참고 항목

CompletableFuture 지원 API는 버전 2.10.0에서 더 이상 사용되지 않으며 버전 4.0.0의 Reactor Core로 대체되었습니다. 자세한 내용은 Javadoc를 참조하세요.

종속성 설정

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

구성

이 시작에서는 다음과 같은 두 가지 구성 옵션을 제공합니다.

커넥트ion 구성 속성

이 섹션에는 Azure Service Bus에 연결하는 데 사용되는 구성 옵션이 포함되어 있습니다.

참고 항목

보안 주체를 사용하여 Azure 리소스에 액세스하기 위해 Microsoft Entra ID로 인증하고 권한을 부여하도록 선택한 경우 Microsoft Entra ID를 사용하여 액세스 권한 부여를 참조하여 보안 주체에게 Azure 리소스에 액세스할 수 있는 충분한 권한이 부여되었는지 확인합니다.

spring-cloud-azure-starter-integration-servicebus의 커넥트ion 구성 가능한 속성:

속성 Type 설명
spring.cloud.azure.servicebus.enabled 부울 값 Azure Service Bus를 사용할 수 있는지 여부입니다.
spring.cloud.azure.servicebus.connection-string 문자열 Service Bus 네임스페이스 연결 문자열 값입니다.
spring.cloud.azure.servicebus.namespace 문자열 FQDN의 접두사인 Service Bus 네임스페이스 값입니다. FQDN은 NamespaceName.Do기본 이름으로 구성되어야 합니다.
spring.cloud.azure.servicebus.do기본-name 문자열 Azure Service Bus 네임스페이스 값의 이름을 기본.

Service Bus 프로세서 구성 속성

ServiceBusInboundChannelAdapter 경우 메시지를 사용하여 ServiceBusProcessorClient 개발자가 구성에 사용할 ServiceBusContainerProperties 수 있는 ServiceBusProcessorClient전체 속성을 구성합니다. 작업 ServiceBusInboundChannelAdapter방법에 대한 다음 섹션을 참조하세요.

기본 사용법

Azure Service Bus로 메시지 보내기

  1. 자격 증명 구성 옵션을 채웁니다.

    • 연결 문자열 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • 자격 증명을 관리 ID로 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

참고 항목

허용되는 tenant-id 값은 , commonorganizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.

  • 서비스 주체로서의 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

참고 항목

허용되는 tenant-id 값은 , commonorganizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.

  1. Bean을 ServiceBusTemplate 사용하여 만들어 DefaultMessageHandler Service Bus에 메시지를 보내고 ServiceBusTemplate에 대한 엔터티 형식을 설정합니다. 이 샘플에서는 Service Bus 큐를 예로 사용합니다.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. 메시지 채널을 통해 위의 메시지 처리기를 사용하여 메시지 게이트웨이 바인딩을 만듭니다.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. 게이트웨이를 사용하여 메시지를 보냅니다.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Azure Service Bus에서 메시지 받기

  1. 자격 증명 구성 옵션을 채웁니다.

  2. 메시지 채널의 빈을 입력 채널로 만듭니다.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Bean을 ServiceBusMessageListenerContainer 사용하여 만들어 ServiceBusInboundChannelAdapter Service Bus에 메시지를 받습니다. 이 샘플에서는 Service Bus 큐를 예로 사용합니다.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. 이전에 만든 메시지 채널을 통해 메시지 수신자 바인딩 ServiceBusInboundChannelAdapter 을 만듭니다.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

objectMapper를 사용자 지정하도록 ServiceBusMessageConverter 구성

ServiceBusMessageConverter 는 사용자가 사용자 지정할 수 있도록 구성 가능한 콩으로 만들어집니다 ObjectMapper.

Service Bus 메시지 헤더

여러 Spring 헤더 상수에 매핑할 수 있는 일부 Service Bus 헤더의 경우 다른 Spring 헤더의 우선 순위가 나열됩니다.

Service Bus 헤더와 Spring 헤더 간의 매핑:

Service Bus 메시지 헤더 및 속성 Spring 메시지 헤더 상수 Type 구성 가능 설명
내용 유형 MessageHeaders#CONTENT_TYPE 문자열 메시지의 RFC2045 콘텐츠 형식 설명자입니다.
상관 관계 ID ServiceBusMessageHeaders#CORRELATION_ID 문자열 메시지의 상관 관계 ID
메시지 ID ServiceBusMessageHeaders#MESSAGE_ID 문자열 메시지의 메시지 ID입니다. 이 헤더의 우선 순위 MessageHeaders#ID는 .
메시지 ID MessageHeaders#ID UUID 메시지의 메시지 ID입니다. 이 헤더의 우선 순위는 .보다 ServiceBusMessageHeaders#MESSAGE_ID낮습니다.
파티션 키 ServiceBusMessageHeaders#PARTITION_KEY 문자열 분할된 엔터티에 메시지를 보내기 위한 파티션 키입니다.
회신 MessageHeaders#REPLY_CHANNEL 문자열 회신을 보낼 엔터티의 주소입니다.
세션 ID에 회신 ServiceBusMessageHeaders#REPLY_TO_SESSION_ID 문자열 메시지의 ReplyToGroupId 속성 값입니다.
예약된 큐에 넣기 시간 utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Service Bus에서 메시지를 큐에 추가해야 하는 날짜/시간이며, 이 헤더의 우선 순위는 .보다 AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE높습니다.
예약된 큐에 넣기 시간 utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE 정수 Service Bus에서 메시지를 큐에 추가해야 하는 날짜/시간이며, 이 헤더의 우선 순위는 .보다 ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME낮습니다.
세션 ID ServiceBusMessageHeaders#SESSION_ID 문자열 세션 인식 엔터티에 대한 세션 ID입니다.
TTL(Time to live) ServiceBusMessageHeaders#TIME_TO_LIVE 기간 이 메시지가 만료되기 전의 기간입니다.
To ServiceBusMessageHeaders#TO 문자열 라우팅 시나리오에서 나중에 사용하도록 예약되고 현재 broker 자체에서 무시되는 메시지의 "to" 주소입니다.
Subject ServiceBusMessageHeaders#SUBJECT 문자열 메시지의 제목입니다.
배달 못한 편지 오류 설명 ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION 문자열 아니요 배달 못한 편지로 된 메시지에 대한 설명입니다.
배달 못한 편지 이유 ServiceBusMessageHeaders#DEAD_LETTER_REASON 문자열 아니요 메시지가 배달 못한 편지인 이유입니다.
배달 못한 편지 원본 ServiceBusMessageHeaders#DEAD_LETTER_SOURCE 문자열 아니요 메시지가 배달 못 한 엔터티입니다.
배달 횟수 ServiceBusMessageHeaders#DELIVERY_COUNT long 아니요 이 메시지가 클라이언트에 전달된 횟수입니다.
큐에 포함된 시퀀스 번호 ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER long 아니요 Service Bus에서 메시지에 할당된 큐에 포함된 시퀀스 번호입니다.
큐에 담은 시간 ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime 아니요 이 메시지가 Service Bus에 큐에 삽입된 날짜/시간입니다.
만료 날짜 ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime 아니요 이 메시지가 만료되는 날짜/시간입니다.
잠금 토큰 ServiceBusMessageHeaders#LOCK_TOKEN 문자열 아니요 현재 메시지의 잠금 토큰입니다.
잠글 때까지 ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime 아니요 이 메시지의 잠금이 만료되는 날짜/시간입니다.
시퀀스 번호 ServiceBusMessageHeaders#SEQUENCE_NUMBER long 아니요 Service Bus에서 메시지에 할당된 고유 번호입니다.
State(상태) ServiceBusMessageHeaders#STATE ServiceBusMessageState 아니요 활성, 지연 또는 예약될 수 있는 메시지의 상태입니다.

파티션 키 지원

이 시작은 메시지 헤더에서 파티션 키 및 세션 ID를 설정하도록 허용하여 Service Bus 분할 을 지원합니다. 이 섹션에서는 메시지의 파티션 키를 설정하는 방법을 소개합니다.

권장: 헤더의 키로 사용합니다 ServiceBusMessageHeaders.PARTITION_KEY .

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

권장되지 않지만 현재 지원됩니다AzureHeaders.PARTITION_KEY . 헤더의 키입니다.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

참고 항목

메시지 헤더에 둘 다 ServiceBusMessageHeaders.PARTITION_KEYAzureHeaders.PARTITION_KEY 설정되면 기본 설정 ServiceBusMessageHeaders.PARTITION_KEY 됩니다.

세션 지원

이 예제에서는 애플리케이션에서 메시지의 세션 ID를 수동으로 설정하는 방법을 보여 줍니다.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

참고 항목

ServiceBusMessageHeaders.SESSION_ID 메시지 헤더에 설정되고 다른 ServiceBusMessageHeaders.PARTITION_KEY 헤더도 설정되면 세션 ID의 값이 결국 파티션 키의 값을 덮어쓰는 데 사용됩니다.

샘플

자세한 내용은 GitHub의 azure-spring-boot-samples 리포지토리를 참조하세요.

Azure Service Queue와 Spring 통합

주요 개념

Azure Queue Storage는 대량의 메시지를 저장하기 위한 서비스입니다. 전 세계 어디서나 인증된 호출을 통해 HTTP 또는 HTTPS를 사용하여 메시지에 액세스할 수 있습니다. 큐 메시지의 크기는 최대 64KB입니다. 큐는 스토리지 계정의 용량 제한에 도달할 때까지 수백만 개의 메시지를 포함할 수 있습니다. 큐는 비동기적으로 처리할 작업의 백로그를 만드는 데 일반적으로 사용됩니다.

종속성 설정

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

구성

이 시작에서는 다음과 같은 구성 옵션을 제공합니다.

커넥트ion 구성 속성

이 섹션에는 Azure Storage 큐에 연결하는 데 사용되는 구성 옵션이 포함되어 있습니다.

참고 항목

보안 주체를 사용하여 Azure 리소스에 액세스하기 위해 Microsoft Entra ID로 인증하고 권한을 부여하도록 선택한 경우 Microsoft Entra ID를 사용하여 액세스 권한 부여를 참조하여 보안 주체에게 Azure 리소스에 액세스할 수 있는 충분한 권한이 부여되었는지 확인합니다.

커넥트 spring-cloud-azure-starter-integration-storage-queue의 구성 가능한 속성:

속성 Type 설명
spring.cloud.azure.storage.queue.enabled 부울 값 Azure Storage 큐를 사용할 수 있는지 여부입니다.
spring.cloud.azure.storage.queue.connection-string 문자열 스토리지 큐 네임스페이스 연결 문자열 값입니다.
spring.cloud.azure.storage.queue.accountName 문자열 스토리지 큐 계정 이름입니다.
spring.cloud.azure.storage.queue.accountKey 문자열 스토리지 큐 계정 키입니다.
spring.cloud.azure.storage.queue.endpoint 문자열 스토리지 큐 서비스 엔드포인트.
spring.cloud.azure.storage.queue.sasToken 문자열 Sas 토큰 자격 증명
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion API 요청을 만들 때 사용되는 QueueServiceVersion입니다.
spring.cloud.azure.storage.queue.messageEncoding 문자열 큐 메시지 인코딩

기본 사용법

Azure Storage 큐에 메시지 보내기

  1. 자격 증명 구성 옵션을 채웁니다.

    • 연결 문자열 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • 자격 증명을 관리 ID로 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

참고 항목

허용되는 tenant-id 값은 , commonorganizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.

  • 서비스 주체로서의 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

참고 항목

허용되는 tenant-id 값은 , commonorganizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.

  1. Bean을 StorageQueueTemplate 사용하여 만들어 DefaultMessageHandler 스토리지 큐에 메시지를 보냅니다.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. 메시지 채널을 통해 위의 메시지 처리기를 사용하여 메시지 게이트웨이 바인딩을 만듭니다.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. 게이트웨이를 사용하여 메시지를 보냅니다.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Azure Storage 큐에서 메시지 받기

  1. 자격 증명 구성 옵션을 채웁니다.

  2. 메시지 채널의 빈을 입력 채널로 만듭니다.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Bean을 StorageQueueTemplate 사용하여 만들어 StorageQueueMessageSource 스토리지 큐에 메시지를 받습니다.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. 이전에 만든 메시지 채널을 통해 마지막 단계에서 만든 StorageQueueMessageSource를 사용하여 메시지 수신자 바인딩을 만듭니다.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

샘플

자세한 내용은 GitHub의 azure-spring-boot-samples 리포지토리를 참조하세요.