다음을 통해 공유


Spring Cloud Stream용 Spring Cloud Azure 지원

이 문서는 버전 4.14.0 ✔️ 버전 5.8.0에 적용됩니다. ✔️

Spring Cloud Stream은 공유 메시징 시스템과 연결된 확장성이 뛰어난 이벤트 기반 마이크로 서비스를 빌드하기 위한 프레임워크입니다.

프레임워크는 이미 확립되고 친숙한 Spring 관용구 및 모범 사례를 기반으로 구축된 유연한 프로그래밍 모델을 제공합니다. 이러한 모범 사례에는 영구 게시/하위 의미 체계, 소비자 그룹 및 상태 저장 파티션에 대한 지원이 포함됩니다.

현재 바인더 구현은 다음과 같습니다.

  • spring-cloud-azure-stream-binder-eventhubs - 자세한 내용은 Azure Event Hubs용 Spring Cloud Stream Binder를 참조 하세요.
  • spring-cloud-azure-stream-binder-servicebus - 자세한 내용은 Azure Service Bus용 Spring Cloud Stream Binder를 참조 하세요.

Azure Event Hubs용 Spring Cloud Stream Binder

주요 개념

Azure Event Hubs용 Spring Cloud Stream Binder는 Spring Cloud Stream 프레임워크에 대한 바인딩 구현을 제공합니다. 이 구현에서는 Spring Integration Event Hubs 채널 어댑터를 기초로 사용합니다. 디자인의 관점에서 Event Hubs는 Kafka와 유사합니다. 또한 Kafka API를 통해 Event Hubs에 액세스할 수 있습니다. 프로젝트에 Kafka API에 대한 엄격한 종속성이 있는 경우 Kafka API 샘플을 사용하여 이벤트 허브를 사용해 볼 수 있습니다.

소비자 그룹

Event Hubs는 Apache Kafka와 유사한 소비자 그룹을 지원하지만 논리는 약간 다릅니다. Kafka는 모든 커밋된 오프셋을 broker에 저장하지만 수동으로 처리되는 Event Hubs 메시지의 오프셋을 저장해야 합니다. Event Hubs SDK는 Azure Storage 내에 이러한 오프셋을 저장하는 함수를 제공합니다.

분할 지원

Event Hubs는 Kafka와 유사한 물리적 파티션 개념을 제공합니다. 그러나 소비자와 파티션 간의 Kafka 자동 재조정과 달리 Event Hubs는 일종의 선점 모드를 제공합니다. 스토리지 계정은 어떤 소비자가 어떤 파티션을 소유하는지 결정하는 임대 역할을 합니다. 새 소비자가 시작되면 가장 많이 로드된 소비자로부터 일부 파티션을 도용하여 워크로드 균형을 달성하려고 합니다.

부하 분산 전략을 지정하기 위해 속성 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* 이 제공됩니다. 자세한 내용은 소비자 속성 섹션을 참조하세요.

Batch 소비자 지원

Spring Cloud Azure Stream Event Hubs 바인더는 Spring Cloud Stream Batch Consumer 기능을 지원합니다.

일괄 처리 소비자 모드를 사용하려면 속성을 true.로 설정합니다spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode. 사용하도록 설정하면 일괄 처리된 이벤트 목록의 페이로드가 포함된 메시지가 수신되어 함수에 Consumer 전달됩니다. 또한 각 메시지 헤더는 목록으로 변환되며, 그 중 콘텐츠는 각 이벤트에서 구문 분석된 연결된 헤더 값입니다. 파티션 ID, 검사pointer 및 마지막으로 큐에 추가된 속성의 공동 헤더는 이벤트의 전체 일괄 처리가 동일한 값을 공유하므로 단일 값으로 표시됩니다. 자세한 내용은 Spring Integration용 Spring Cloud Azure 지원 Event Hubs 메시지 헤더 섹션을 참조하세요.

참고 항목

검사포인트 헤더는 검사포인트 모드를 MANUAL 사용하는 경우에만 존재합니다.

일괄 처리 소비자의 검사점은 두 가지 모드를 지원합니다. BATCHMANUAL BATCHmode는 바인더가 이벤트를 받으면 전체 이벤트 일괄 처리를 검사점수하는 자동 검사포인트링 모드입니다. MANUAL모드는 검사사용자별 이벤트를 지정하는 것입니다. 사용하는 Checkpointer 경우 메시지 헤더에 전달되고 사용자는 이를 사용하여 검사pointing을 수행할 수 있습니다.

접두spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.사를 가진 속성 및 max-wait-timemax-size 설정하여 일괄 처리 크기를 지정할 수 있습니다. 속성이 max-size 필요하고 속성은 max-wait-time 선택 사항입니다. 자세한 내용은 소비자 속성 섹션을 참조하세요.

종속성 설정

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

또는 Maven에 대한 다음 예제와 같이 Spring Cloud Azure Stream Event Hubs Starter를 사용할 수도 있습니다.

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

구성

바인더는 구성 옵션의 다음 세 부분을 제공합니다.

커넥트ion 구성 속성

이 섹션에는 Azure Event Hubs에 연결하는 데 사용되는 구성 옵션이 포함되어 있습니다.

참고 항목

보안 주체를 사용하여 Azure 리소스에 액세스하기 위해 Microsoft Entra ID로 인증하고 권한을 부여하도록 선택한 경우 Microsoft Entra ID를 사용하여 액세스 권한 부여를 참조하여 보안 주체에게 Azure 리소스에 액세스할 수 있는 충분한 권한이 부여되었는지 확인합니다.

spring-cloud-azure-stream-binder-eventhubs의 커넥트ion 구성 가능한 속성:

속성 Type 설명
spring.cloud.azure.eventhubs.enabled 부울 값 Azure Event Hubs를 사용할 수 있는지 여부입니다.
spring.cloud.azure.eventhubs.connection-string 문자열 Event Hubs 네임스페이스 연결 문자열 값입니다.
spring.cloud.azure.eventhubs.namespace 문자열 FQDN의 접두사인 Event Hubs 네임스페이스 값입니다. FQDN은 NamespaceName.Do기본 이름으로 구성되어야 합니다.
spring.cloud.azure.eventhubs.do기본-name 문자열 Azure Event Hubs 네임스페이스 값의 이름을 기본.
spring.cloud.azure.eventhubs.custom-endpoint-address 문자열 사용자 지정 엔드포인트 주소입니다.

일반적인 Azure 서비스 SDK 구성 옵션은 Spring Cloud Azure Stream Event Hubs 바인더에도 구성할 수 있습니다. 지원되는 구성 옵션은 Spring Cloud Azure 구성도입되었으며 통합 접두사 spring.cloud.azure. 또는 접두spring.cloud.azure.eventhubs.사로 구성할 수 있습니다.

바인더는 Spring Could Azure Resource Manager도 기본적으로 지원합니다. 관련 역할로 부여 Data 되지 않은 보안 주체를 사용하여 연결 문자열 검색하는 방법에 대한 자세한 내용은 Spring Could Azure Resource Manager의 기본 사용 섹션을 참조하세요.

검사점 구성 속성

이 섹션에는 파티션 소유권 및 검사포인트 정보를 유지하는 데 사용되는 Storage Blobs 서비스에 대한 구성 옵션이 포함되어 있습니다.

참고 항목

버전 4.0.0부터 spring.cloud.azure.eventhubs.processor.검사point-store.create-container-if-not-exists는 수동으로 사용하도록 설정되지 않으며 spring.cloud.stream.bindings.binding-name.destination이름으로 스토리지 컨테이너가 자동으로 만들어지지 않습니다.

spring-cloud-azure-stream-binder-eventhubs의 구성 가능한 속성 검사점:

속성 Type 설명
spring.cloud.azure.eventhubs.processor. 검사point-store.create-container-if-not-exists 부울 컨테이너가 없는 경우 컨테이너를 만들 수 있도록 허용할지 여부입니다.
spring.cloud.azure.eventhubs.processor. 검사point-store.account-name 문자열 스토리지 계정의 이름입니다.
spring.cloud.azure.eventhubs.processor. 검사point-store.account-key 문자열 스토리지 계정 액세스 키입니다.
spring.cloud.azure.eventhubs.processor. 검사point-store.container-name 문자열 스토리지 컨테이너 이름입니다.

일반적인 Azure 서비스 SDK 구성 옵션은 Storage Blob 검사point 저장소에도 구성할 수 있습니다. 지원되는 구성 옵션은 Spring Cloud Azure 구성도입되었으며 통합 접두사 spring.cloud.azure. 또는 접두spring.cloud.azure.eventhubs.processor.checkpoint-store사로 구성할 수 있습니다.

Azure Event Hubs 바인딩 구성 속성

다음 옵션은 소비자 속성, 고급 소비자 구성, 생산자 속성 및 고급 생산자 구성의 네 가지 섹션으로 나뉩니다.

소비자 속성

이러한 속성은 .를 통해 EventHubsConsumerProperties노출됩니다.

spring-cloud-azure-stream-binder-eventhubs의 소비자 구성 가능한 속성:

속성 Type 설명
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.검사point.mode CheckpointMode 소비자가 메시지를 검사 방법을 결정할 때 사용되는 검사점 모드
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.검사point.count 정수 각 파티션이 하나의 검사포인트를 수행할 메시지 양을 결정합니다. 검사포인트 모드를 사용하는 경우에만 PARTITION_COUNT 적용됩니다.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.검사point.interval 기간 검사 하나를 수행할 시간 간격을 결정합니다. 검사포인트 모드를 사용하는 경우에만 TIME 적용됩니다.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size 정수 일괄 처리의 최대 이벤트 수입니다. 일괄 처리 소비자 모드에 필요합니다.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time 기간 일괄 처리 사용의 최대 기간입니다. 일괄 처리 소비자 모드를 사용하도록 설정하고 선택 사항인 경우에만 적용됩니다.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval 기간 업데이트에 대한 간격 기간입니다.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy 부하 분산 전략입니다.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval 기간 파티션 소유권이 만료되는 기간입니다.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties 부울 이벤트 프로세서가 연결된 파티션의 마지막 큐에 포함된 이벤트에 대한 정보를 요청하고 이벤트가 수신될 때 해당 정보를 추적해야 하는지 여부입니다.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count 정수 이벤트 허브 소비자가 적극적으로 수신하고 로컬로 큐에 대기하는 이벤트 수를 제어하기 위해 소비자가 사용하는 수입니다.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position 키와 파티션 ID 및 다음 값으로 매핑 StartPositionProperties 파티션의 검사 지점이 검사point 저장소에 없는 경우 각 파티션에 사용할 이벤트 위치를 포함하는 맵입니다. 이 맵은 파티션 ID에서 키가 지정됩니다.

참고 항목

구성은 initial-partition-event-position 각 이벤트 허브의 초기 위치를 지정하는 데 사용됩니다 map . 따라서 해당 키는 파티션 ID이며, 값에는 StartPositionProperties 오프셋, 시퀀스 번호, 큐에 추가된 날짜 시간 및 포함 여부의 속성이 포함됩니다. 예를 들어 다음과 같이 설정할 수 있습니다.

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
고급 소비자 구성

위의 연결, 검사point일반적인 Azure SDK 클라이언트 구성은 접두사를 사용하여 구성할 수 있는 각 바인더 소비자에 대한 사용자 지정을 지원합니다spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

생산자 속성

이러한 속성은 .를 통해 EventHubsProducerProperties노출됩니다.

spring-cloud-azure-stream-binder-eventhubs의 생산자 구성 가능한 속성:

속성 Type 설명
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync 부울 값 생산자 동기화에 대한 스위치 플래그입니다. true이면 생산자는 보내기 작업 후 응답을 기다립니다.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout long 보내기 작업 후 응답을 기다리는 시간입니다. 동기화 생산자를 사용하도록 설정한 경우에만 적용됩니다.
고급 생산자 구성

위의 연결 및 일반적인 Azure SDK 클라이언트 구성은 접두사를 사용하여 구성할 수 있는 각 바인더 생산자에 대한 사용자 지정을 지원합니다spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

기본 사용법

Event Hubs에서/받는 메시지 보내기 및 받기

  1. 구성 옵션을 자격 증명 정보로 채웁니다.

    • 연결 문자열 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • 서비스 주체로서의 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

참고 항목

허용되는 tenant-id 값은 , commonorganizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.

  • 자격 증명을 관리 ID로 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. 공급업체 및 소비자를 정의합니다.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

분할 지원

PartitionSupplier 보낼 메시지에 대한 파티션 정보를 구성하기 위해 사용자가 제공한 파티션 정보가 생성됩니다. 다음 순서도는 파티션 ID 및 키에 대해 다른 우선 순위를 가져오는 프로세스를 보여줍니다.

Diagram showing a flowchart of the partitioning support process.

Batch 소비자 지원

  1. 다음 예제와 같이 일괄 처리 구성 옵션을 제공합니다.

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. 공급업체 및 소비자를 정의합니다.

    검사포인트링 모드의 BATCH경우 다음 코드를 사용하여 메시지를 보내고 일괄 처리로 사용할 수 있습니다.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    검사포인트 지정 모드의 MANUAL경우 다음 코드를 사용하여 메시지를 보내고 일괄 처리로 검사/검사.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

참고 항목

일괄 처리 사용 모드에서 Spring Cloud Stream 바인더의 기본 콘텐츠 형식은 application/json메시지 페이로드가 콘텐츠 형식과 정렬되어 있는지 확인합니다. 예를 들어 페이로드가 있는 메시지를 String 수신하는 기본 콘텐츠 형식 application/json 을 사용하는 경우 페이로드는 원래 String 텍스트의 큰따옴표로 묶어야 합니다JSON String. 콘텐츠 형식의 경우 text/plain 개체일 String 수 있습니다. 자세한 내용은 Spring Cloud Stream 콘텐츠 형식 협상을 참조하세요.

오류 메시지 처리

  • 아웃바운드 바인딩 오류 메시지 처리

    기본적으로 Spring Integration은 .라는 errorChannel전역 오류 채널을 만듭니다. 아웃바운드 바인딩 오류 메시지를 처리하도록 다음 메시지 엔드포인트를 구성합니다.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • 인바운드 바인딩 오류 메시지 처리

    Spring Cloud Stream Event Hubs Binder는 인바운드 메시지 바인딩에 대한 오류를 처리하는 두 가지 솔루션인 사용자 지정 오류 채널 및 처리기를 지원합니다.

    오류 채널:

    Spring Cloud Stream은 각 인바운드 바인딩에 대한 오류 채널을 제공합니다. 오류 ErrorMessage 채널로 전송됩니다. 자세한 내용은 Spring Cloud Stream 설명서의 오류 처리를 참조하세요.

    • 기본 오류 채널

      명명 errorChannel 된 전역 오류 채널을 사용하여 모든 인바운드 바인딩 오류 메시지를 사용할 수 있습니다. 이러한 메시지를 처리하려면 다음 메시지 엔드포인트를 구성합니다.

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • 바인딩 관련 오류 채널

      특정 오류 채널을 사용하여 기본 오류 채널보다 우선 순위가 높은 특정 인바운드 바인딩 오류 메시지를 사용할 수 있습니다. 이러한 메시지를 처리하려면 다음 메시지 엔드포인트를 구성합니다.

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      참고 항목

      바인딩 관련 오류 채널은 제공된 다른 오류 처리기 및 채널과 함께 사용할 수 없습니다.

    오류 처리기:

    Spring Cloud Stream은 인스턴스를 허용하는 메커니즘을 추가하여 사용자 지정 오류 처리기를 제공하는 메커니즘을 ConsumerErrorMessage 노출합니다. 자세한 내용은 Spring Cloud Stream 설명서의 오류 처리를 참조하세요.

    참고 항목

    바인딩 오류 처리기가 구성되면 기본 오류 채널에서 작동할 수 있습니다.

    • 바인딩 기본 오류 처리기

      모든 인바운드 바인딩 오류 메시지를 사용하도록 단일 Consumer 콩을 구성합니다. 다음 기본 함수는 각 인바운드 바인딩 오류 채널을 구독합니다.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      또한 함수 이름으로 속성을 설정 spring.cloud.stream.default.error-handler-definition 해야 합니다.

    • 바인딩 관련 오류 처리기

      Consumer 특정 인바운드 바인딩 오류 메시지를 사용하도록 bean을 구성합니다. 다음 함수는 특정 인바운드 바인딩 오류 채널을 구독하고 바인딩 기본 오류 처리기보다 우선 순위가 높습니다.

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      또한 함수 이름으로 속성을 설정 spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition 해야 합니다.

Event Hubs 메시지 헤더

지원되는 기본 메시지 헤더는 Spring Integration용 Spring Cloud Azure 지원 Event Hubs 메시지 헤더 섹션을 참조하세요.

여러 바인더 지원

여러 Event Hubs 네임스페이스에 대한 커넥트 또한 여러 바인더를 사용하여 지원됩니다. 이 샘플에서는 연결 문자열 예로 사용합니다. 서비스 주체 및 관리 ID의 자격 증명도 지원됩니다. 각 바인더의 환경 설정에서 관련 속성을 설정할 수 있습니다.

  1. Event Hubs에서 여러 바인더를 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    참고 항목

    이전 애플리케이션 파일은 모든 바인딩에 애플리케이션에 대한 단일 기본 폴러를 구성하는 방법을 보여 줍니다. 특정 바인딩에 대해 폴러를 구성하려면 다음과 같은 spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000구성을 사용할 수 있습니다.

  2. 두 개의 공급업체와 두 명의 소비자를 정의해야 합니다.

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

리소스 프로비저닝

Event Hubs 바인더는 이벤트 허브 및 소비자 그룹의 프로비저닝을 지원하며, 사용자는 다음 속성을 사용하여 프로비저닝을 사용하도록 설정할 수 있습니다.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

참고 항목

허용되는 tenant-id 값은 , commonorganizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.

샘플

자세한 내용은 GitHub의 azure-spring-boot-samples 리포지토리를 참조하세요.

Azure Service Bus용 Spring Cloud 스트림 바인더

주요 개념

Azure Service Bus용 Spring Cloud Stream Binder는 Spring Cloud Stream Framework에 대한 바인딩 구현을 제공합니다. 이 구현에서는 Spring Integration Service Bus 채널 어댑터를 기초로 사용합니다.

예약된 메시지

이 바인더는 지연된 처리를 위해 토픽에 메시지 전송을 지원합니다. 사용자는 메시지의 지연 시간을 밀리초 단위로 표현하는 헤더 x-delay 를 사용하여 예약된 메시지를 보낼 수 있습니다. 메시지는 밀리초 후에 x-delay 해당 토픽에 전달됩니다.

소비자 그룹

Service Bus 토픽은 Apache Kafka와 비슷한 소비자 그룹을 지원하지만 논리는 약간 다릅니다. 이 바인더는 토픽을 사용하여 Subscription 소비자 그룹으로 작동합니다.

종속성 설정

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

또는 Maven에 대한 다음 예제와 같이 Spring Cloud Azure Stream Service Bus Starter를 사용할 수도 있습니다.

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

구성

바인더는 구성 옵션의 다음 두 부분을 제공합니다.

커넥트ion 구성 속성

이 섹션에는 Azure Service Bus에 연결하는 데 사용되는 구성 옵션이 포함되어 있습니다.

참고 항목

보안 주체를 사용하여 Azure 리소스에 액세스하기 위해 Microsoft Entra ID로 인증하고 권한을 부여하도록 선택한 경우 Microsoft Entra ID를 사용하여 액세스 권한 부여를 참조하여 보안 주체에게 Azure 리소스에 액세스할 수 있는 충분한 권한이 부여되었는지 확인합니다.

spring-cloud-azure-stream-binder-servicebus의 커넥트ion 구성 가능한 속성:

속성 Type 설명
spring.cloud.azure.servicebus.enabled 부울 값 Azure Service Bus를 사용할 수 있는지 여부입니다.
spring.cloud.azure.servicebus.connection-string 문자열 Service Bus 네임스페이스 연결 문자열 값입니다.
spring.cloud.azure.servicebus.namespace 문자열 FQDN의 접두사인 Service Bus 네임스페이스 값입니다. FQDN은 NamespaceName.Do기본 이름으로 구성되어야 합니다.
spring.cloud.azure.servicebus.do기본-name 문자열 Azure Service Bus 네임스페이스 값의 이름을 기본.

참고 항목

일반적인 Azure 서비스 SDK 구성 옵션은 Spring Cloud Azure Stream Service Bus 바인더에도 구성할 수 있습니다. 지원되는 구성 옵션은 Spring Cloud Azure 구성도입되었으며 통합 접두사 spring.cloud.azure. 또는 접두spring.cloud.azure.servicebus.사로 구성할 수 있습니다.

바인더는 Spring Could Azure Resource Manager도 기본적으로 지원합니다. 관련 역할로 부여 Data 되지 않은 보안 주체를 사용하여 연결 문자열 검색하는 방법에 대한 자세한 내용은 Spring Could Azure Resource Manager의 기본 사용 섹션을 참조하세요.

Azure Service Bus 바인딩 구성 속성

다음 옵션은 소비자 속성, 고급 소비자 구성, 생산자 속성 및 고급 생산자 구성의 네 가지 섹션으로 나뉩니다.

소비자 속성

이러한 속성은 .를 통해 ServiceBusConsumerProperties노출됩니다.

spring-cloud-azure-stream-binder-servicebus의 소비자 구성 가능한 속성:

속성 Type 기본값 설명
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected 부울 값 false 실패한 메시지가 DLQ로 라우팅되는 경우
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls 정수 1 Service Bus 프로세서 클라이언트에서 처리해야 하는 최대 동시 메시지입니다.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions 정수 null 지정된 시간에 처리할 최대 동시 세션 수입니다.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled 부울 null 세션을 사용할 수 있는지 여부입니다.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count 정수 0 Service Bus 프로세서 클라이언트의 프리페치 수입니다.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue 하위 큐 없음 연결할 하위 큐의 형식입니다.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration 기간 5m 잠금 자동 갱신을 계속할 시간입니다.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Service Bus 프로세서 클라이언트의 수신 모드입니다.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete 부울 true 메시지를 자동으로 정정할지 여부입니다. false로 설정하면 개발자가 메시지를 수동으로 해결할 수 있도록 메시지 헤더 Checkpointer 가 추가됩니다.
고급 소비자 구성

위의 연결 및 일반적인 Azure SDK 클라이언트 구성은 접두사를 사용하여 구성할 수 있는 각 바인더 소비자에 대한 사용자 지정을 지원합니다spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

생산자 속성

이러한 속성은 .를 통해 ServiceBusProducerProperties노출됩니다.

spring-cloud-azure-stream-binder-servicebus의 생산자 구성 가능한 속성:

속성 Type 기본값 설명
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync 부울 값 false 생산자 동기화에 대한 스위치 플래그입니다.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout long 10000 생산자를 보내기 위한 시간 제한 값입니다.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType null 바인딩 생산자에 필요한 생산자의 Service Bus 엔터티 형식입니다.

Important

바인딩 생산자를 사용하는 경우 속성을 spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type 구성해야 합니다.

고급 생산자 구성

위의 연결 및 일반적인 Azure SDK 클라이언트 구성은 접두사를 사용하여 구성할 수 있는 각 바인더 생산자에 대한 사용자 지정을 지원합니다spring.cloud.stream.servicebus.bindings.<binding-name>.producer..

기본 사용법

Service Bus에서/받는 메시지 보내기 및 받기

  1. 구성 옵션을 자격 증명 정보로 채웁니다.

    • 연결 문자열 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • 서비스 주체로서의 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

참고 항목

허용되는 tenant-id 값은 , commonorganizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.

  • 자격 증명을 관리 ID로 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. 공급업체 및 소비자를 정의합니다.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

파티션 키 지원

바인더는 메시지 헤더에서 파티션 키 및 세션 ID를 설정하도록 허용하여 Service Bus 분할을 지원합니다. 이 섹션에서는 메시지의 파티션 키를 설정하는 방법을 소개합니다.

Spring Cloud Stream은 파티션 키 SpEL 식 속성을 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression제공합니다. 예를 들어 이 속성을 설정하여 "'partitionKey-' + headers[<message-header-key>]" message-header-key라는 헤더를 추가합니다. Spring Cloud Stream은 식을 평가할 때 이 헤더의 값을 사용하여 파티션 키를 할당합니다. 다음 코드는 예제 생산자를 제공합니다.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

세션 지원

바인더는 Service Bus의 메시지 세션을 지원 합니다 . 메시지 헤더를 통해 메시지의 세션 ID를 설정할 수 있습니다.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

참고 항목

Service Bus 분할에 따르면 세션 ID는 파티션 키보다 우선 순위가 높습니다. 따라서 두 헤더가 모두 ServiceBusMessageHeaders#SESSION_IDServiceBusMessageHeaders#PARTITION_KEY 설정되면 세션 ID의 값이 결국 파티션 키의 값을 덮어쓰는 데 사용됩니다.

오류 메시지 처리

  • 아웃바운드 바인딩 오류 메시지 처리

    기본적으로 Spring Integration은 .라는 errorChannel전역 오류 채널을 만듭니다. 아웃바운드 바인딩 오류 메시지를 처리하도록 다음 메시지 엔드포인트를 구성합니다.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • 인바운드 바인딩 오류 메시지 처리

    Spring Cloud Stream Service Bus Binder는 인바운드 메시지 바인딩에 대한 오류를 처리하는 세 가지 솔루션인 바인더 오류 처리기, 사용자 지정 오류 채널 및 처리기를 지원합니다.

    바인더 오류 처리기:

    기본 바인더 오류 처리기는 인바운드 바인딩을 처리합니다. 이 처리기를 사용하여 설정된 경우 spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected 배달 못한 편지 큐에 실패한 메시지를 보냅니다. 그렇지 않으면 실패한 메시지가 중단됩니다. 바인딩별 오류 채널을 구성하는 경우를 제외하고 바인더 오류 처리기는 다른 사용자 지정 오류 처리기 또는 채널이 있는지 여부에 관계없이 항상 적용됩니다.

    오류 채널:

    Spring Cloud Stream은 각 인바운드 바인딩에 대한 오류 채널을 제공합니다. 오류 ErrorMessage 채널로 전송됩니다. 자세한 내용은 Spring Cloud Stream 설명서의 오류 처리를 참조하세요.

    • 기본 오류 채널

      명명 errorChannel 된 전역 오류 채널을 사용하여 모든 인바운드 바인딩 오류 메시지를 사용할 수 있습니다. 이러한 메시지를 처리하려면 다음 메시지 엔드포인트를 구성합니다.

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • 바인딩 관련 오류 채널

      특정 오류 채널을 사용하여 기본 오류 채널보다 우선 순위가 높은 특정 인바운드 바인딩 오류 메시지를 사용할 수 있습니다. 이러한 메시지를 처리하려면 다음 메시지 엔드포인트를 구성합니다.

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      참고 항목

      바인딩 관련 오류 채널은 제공된 다른 오류 처리기 및 채널과 함께 사용할 수 없습니다.

    오류 처리기:

    Spring Cloud Stream은 인스턴스를 허용하는 메커니즘을 추가하여 사용자 지정 오류 처리기를 제공하는 메커니즘을 ConsumerErrorMessage 노출합니다. 자세한 내용은 Spring Cloud Stream 설명서의 오류 처리를 참조하세요.

    참고 항목

    바인딩 오류 처리기가 구성되면 기본 오류 채널 및 바인더 오류 처리기에서 작동할 수 있습니다.

    • 바인딩 기본 오류 처리기

      모든 인바운드 바인딩 오류 메시지를 사용하도록 단일 Consumer 콩을 구성합니다. 다음 기본 함수는 각 인바운드 바인딩 오류 채널을 구독합니다.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      또한 함수 이름으로 속성을 설정 spring.cloud.stream.default.error-handler-definition 해야 합니다.

    • 바인딩 관련 오류 처리기

      Consumer 특정 인바운드 바인딩 오류 메시지를 사용하도록 bean을 구성합니다. 다음 함수는 바인딩 기본 오류 처리기보다 우선 순위가 높은 특정 인바운드 바인딩 오류 채널을 구독합니다.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      또한 함수 이름으로 속성을 설정 spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition 해야 합니다.

Service Bus 메시지 헤더

지원되는 기본 메시지 헤더는 Spring Integration용 Spring Cloud Azure 지원 Service Bus 메시지 헤더 섹션을 참조하세요.

참고 항목

파티션 키를 설정할 때 메시지 헤더의 우선 순위가 Spring Cloud Stream 속성보다 높습니다. 따라서 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression 헤더 및 ServiceBusMessageHeaders#PARTITION_KEY 헤더가 ServiceBusMessageHeaders#SESSION_ID 구성되지 않은 경우에만 적용됩니다.

여러 바인더 지원

여러 Service Bus 네임스페이스에 대한 커넥트 또한 여러 바인더를 사용하여 지원됩니다. 이 샘플에서는 연결 문자열 예로 사용합니다. 서비스 주체 및 관리 ID의 자격 증명도 지원되며, 사용자는 각 바인더의 환경 설정에서 관련 속성을 설정할 수 있습니다.

  1. ServiceBus의 여러 바인더를 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    참고 항목

    이전 애플리케이션 파일은 모든 바인딩에 애플리케이션에 대한 단일 기본 폴러를 구성하는 방법을 보여 줍니다. 특정 바인딩에 대해 폴러를 구성하려면 다음과 같은 spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000구성을 사용할 수 있습니다.

  2. 공급업체 2개와 소비자 2개를 정의해야 합니다.

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

리소스 프로비저닝

Service Bus 바인더는 큐, 토픽 및 구독의 프로비저닝을 지원하며, 사용자는 다음 속성을 사용하여 프로비저닝을 사용하도록 설정할 수 있습니다.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

참고 항목

허용되는 tenant-id 값은 , commonorganizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.

샘플

자세한 내용은 GitHub의 azure-spring-boot-samples 리포지토리를 참조하세요.