Spring Cloud Stream용 Spring Cloud Azure 지원
이 문서는 버전 4.14.0 ✔️ 버전 5.8.0에 적용됩니다. ✔️
Spring Cloud Stream은 공유 메시징 시스템과 연결된 확장성이 뛰어난 이벤트 기반 마이크로 서비스를 빌드하기 위한 프레임워크입니다.
프레임워크는 이미 확립되고 친숙한 Spring 관용구 및 모범 사례를 기반으로 구축된 유연한 프로그래밍 모델을 제공합니다. 이러한 모범 사례에는 영구 게시/하위 의미 체계, 소비자 그룹 및 상태 저장 파티션에 대한 지원이 포함됩니다.
현재 바인더 구현은 다음과 같습니다.
spring-cloud-azure-stream-binder-eventhubs
- 자세한 내용은 Azure Event Hubs용 Spring Cloud Stream Binder를 참조 하세요.spring-cloud-azure-stream-binder-servicebus
- 자세한 내용은 Azure Service Bus용 Spring Cloud Stream Binder를 참조 하세요.
Azure Event Hubs용 Spring Cloud Stream Binder
주요 개념
Azure Event Hubs용 Spring Cloud Stream Binder는 Spring Cloud Stream 프레임워크에 대한 바인딩 구현을 제공합니다. 이 구현에서는 Spring Integration Event Hubs 채널 어댑터를 기초로 사용합니다. 디자인의 관점에서 Event Hubs는 Kafka와 유사합니다. 또한 Kafka API를 통해 Event Hubs에 액세스할 수 있습니다. 프로젝트에 Kafka API에 대한 엄격한 종속성이 있는 경우 Kafka API 샘플을 사용하여 이벤트 허브를 사용해 볼 수 있습니다.
소비자 그룹
Event Hubs는 Apache Kafka와 유사한 소비자 그룹을 지원하지만 논리는 약간 다릅니다. Kafka는 모든 커밋된 오프셋을 broker에 저장하지만 수동으로 처리되는 Event Hubs 메시지의 오프셋을 저장해야 합니다. Event Hubs SDK는 Azure Storage 내에 이러한 오프셋을 저장하는 함수를 제공합니다.
분할 지원
Event Hubs는 Kafka와 유사한 물리적 파티션 개념을 제공합니다. 그러나 소비자와 파티션 간의 Kafka 자동 재조정과 달리 Event Hubs는 일종의 선점 모드를 제공합니다. 스토리지 계정은 어떤 소비자가 어떤 파티션을 소유하는지 결정하는 임대 역할을 합니다. 새 소비자가 시작되면 가장 많이 로드된 소비자로부터 일부 파티션을 도용하여 워크로드 균형을 달성하려고 합니다.
부하 분산 전략을 지정하기 위해 속성 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
이 제공됩니다. 자세한 내용은 소비자 속성 섹션을 참조하세요.
Batch 소비자 지원
Spring Cloud Azure Stream Event Hubs 바인더는 Spring Cloud Stream Batch Consumer 기능을 지원합니다.
일괄 처리 소비자 모드를 사용하려면 속성을 true
.로 설정합니다spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
. 사용하도록 설정하면 일괄 처리된 이벤트 목록의 페이로드가 포함된 메시지가 수신되어 함수에 Consumer
전달됩니다. 또한 각 메시지 헤더는 목록으로 변환되며, 그 중 콘텐츠는 각 이벤트에서 구문 분석된 연결된 헤더 값입니다. 파티션 ID, 검사pointer 및 마지막으로 큐에 추가된 속성의 공동 헤더는 이벤트의 전체 일괄 처리가 동일한 값을 공유하므로 단일 값으로 표시됩니다. 자세한 내용은 Spring Integration용 Spring Cloud Azure 지원 Event Hubs 메시지 헤더 섹션을 참조하세요.
참고 항목
검사포인트 헤더는 검사포인트 모드를 MANUAL
사용하는 경우에만 존재합니다.
일괄 처리 소비자의 검사점은 두 가지 모드를 지원합니다. BATCH
MANUAL
BATCH
mode는 바인더가 이벤트를 받으면 전체 이벤트 일괄 처리를 검사점수하는 자동 검사포인트링 모드입니다. MANUAL
모드는 검사사용자별 이벤트를 지정하는 것입니다. 사용하는 Checkpointer
경우 메시지 헤더에 전달되고 사용자는 이를 사용하여 검사pointing을 수행할 수 있습니다.
접두spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
사를 가진 속성 및 max-wait-time
을 max-size
설정하여 일괄 처리 크기를 지정할 수 있습니다. 속성이 max-size
필요하고 속성은 max-wait-time
선택 사항입니다. 자세한 내용은 소비자 속성 섹션을 참조하세요.
종속성 설정
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
또는 Maven에 대한 다음 예제와 같이 Spring Cloud Azure Stream Event Hubs Starter를 사용할 수도 있습니다.
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
구성
바인더는 구성 옵션의 다음 세 부분을 제공합니다.
커넥트ion 구성 속성
이 섹션에는 Azure Event Hubs에 연결하는 데 사용되는 구성 옵션이 포함되어 있습니다.
참고 항목
보안 주체를 사용하여 Azure 리소스에 액세스하기 위해 Microsoft Entra ID로 인증하고 권한을 부여하도록 선택한 경우 Microsoft Entra ID를 사용하여 액세스 권한 부여를 참조하여 보안 주체에게 Azure 리소스에 액세스할 수 있는 충분한 권한이 부여되었는지 확인합니다.
spring-cloud-azure-stream-binder-eventhubs의 커넥트ion 구성 가능한 속성:
속성 | Type | 설명 |
---|---|---|
spring.cloud.azure.eventhubs.enabled | 부울 값 | Azure Event Hubs를 사용할 수 있는지 여부입니다. |
spring.cloud.azure.eventhubs.connection-string | 문자열 | Event Hubs 네임스페이스 연결 문자열 값입니다. |
spring.cloud.azure.eventhubs.namespace | 문자열 | FQDN의 접두사인 Event Hubs 네임스페이스 값입니다. FQDN은 NamespaceName.Do기본 이름으로 구성되어야 합니다. |
spring.cloud.azure.eventhubs.do기본-name | 문자열 | Azure Event Hubs 네임스페이스 값의 이름을 기본. |
spring.cloud.azure.eventhubs.custom-endpoint-address | 문자열 | 사용자 지정 엔드포인트 주소입니다. |
팁
일반적인 Azure 서비스 SDK 구성 옵션은 Spring Cloud Azure Stream Event Hubs 바인더에도 구성할 수 있습니다. 지원되는 구성 옵션은 Spring Cloud Azure 구성에 도입되었으며 통합 접두사 spring.cloud.azure.
또는 접두spring.cloud.azure.eventhubs.
사로 구성할 수 있습니다.
바인더는 Spring Could Azure Resource Manager도 기본적으로 지원합니다. 관련 역할로 부여 Data
되지 않은 보안 주체를 사용하여 연결 문자열 검색하는 방법에 대한 자세한 내용은 Spring Could Azure Resource Manager의 기본 사용 섹션을 참조하세요.
검사점 구성 속성
이 섹션에는 파티션 소유권 및 검사포인트 정보를 유지하는 데 사용되는 Storage Blobs 서비스에 대한 구성 옵션이 포함되어 있습니다.
참고 항목
버전 4.0.0부터 spring.cloud.azure.eventhubs.processor.검사point-store.create-container-if-not-exists는 수동으로 사용하도록 설정되지 않으며 spring.cloud.stream.bindings.binding-name.destination의 이름으로 스토리지 컨테이너가 자동으로 만들어지지 않습니다.
spring-cloud-azure-stream-binder-eventhubs의 구성 가능한 속성 검사점:
속성 | Type | 설명 |
---|---|---|
spring.cloud.azure.eventhubs.processor. 검사point-store.create-container-if-not-exists | 부울 | 컨테이너가 없는 경우 컨테이너를 만들 수 있도록 허용할지 여부입니다. |
spring.cloud.azure.eventhubs.processor. 검사point-store.account-name | 문자열 | 스토리지 계정의 이름입니다. |
spring.cloud.azure.eventhubs.processor. 검사point-store.account-key | 문자열 | 스토리지 계정 액세스 키입니다. |
spring.cloud.azure.eventhubs.processor. 검사point-store.container-name | 문자열 | 스토리지 컨테이너 이름입니다. |
팁
일반적인 Azure 서비스 SDK 구성 옵션은 Storage Blob 검사point 저장소에도 구성할 수 있습니다. 지원되는 구성 옵션은 Spring Cloud Azure 구성에 도입되었으며 통합 접두사 spring.cloud.azure.
또는 접두spring.cloud.azure.eventhubs.processor.checkpoint-store
사로 구성할 수 있습니다.
Azure Event Hubs 바인딩 구성 속성
다음 옵션은 소비자 속성, 고급 소비자 구성, 생산자 속성 및 고급 생산자 구성의 네 가지 섹션으로 나뉩니다.
소비자 속성
이러한 속성은 .를 통해 EventHubsConsumerProperties
노출됩니다.
spring-cloud-azure-stream-binder-eventhubs의 소비자 구성 가능한 속성:
속성 | Type | 설명 |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.검사point.mode | CheckpointMode | 소비자가 메시지를 검사 방법을 결정할 때 사용되는 검사점 모드 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.검사point.count | 정수 | 각 파티션이 하나의 검사포인트를 수행할 메시지 양을 결정합니다. 검사포인트 모드를 사용하는 경우에만 PARTITION_COUNT 적용됩니다. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.검사point.interval | 기간 | 검사 하나를 수행할 시간 간격을 결정합니다. 검사포인트 모드를 사용하는 경우에만 TIME 적용됩니다. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size | 정수 | 일괄 처리의 최대 이벤트 수입니다. 일괄 처리 소비자 모드에 필요합니다. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | 기간 | 일괄 처리 사용의 최대 기간입니다. 일괄 처리 소비자 모드를 사용하도록 설정하고 선택 사항인 경우에만 적용됩니다. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | 기간 | 업데이트에 대한 간격 기간입니다. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | 부하 분산 전략입니다. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | 기간 | 파티션 소유권이 만료되는 기간입니다. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | 부울 | 이벤트 프로세서가 연결된 파티션의 마지막 큐에 포함된 이벤트에 대한 정보를 요청하고 이벤트가 수신될 때 해당 정보를 추적해야 하는지 여부입니다. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | 정수 | 이벤트 허브 소비자가 적극적으로 수신하고 로컬로 큐에 대기하는 이벤트 수를 제어하기 위해 소비자가 사용하는 수입니다. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | 키와 파티션 ID 및 다음 값으로 매핑 StartPositionProperties |
파티션의 검사 지점이 검사point 저장소에 없는 경우 각 파티션에 사용할 이벤트 위치를 포함하는 맵입니다. 이 맵은 파티션 ID에서 키가 지정됩니다. |
참고 항목
구성은 initial-partition-event-position
각 이벤트 허브의 초기 위치를 지정하는 데 사용됩니다 map
. 따라서 해당 키는 파티션 ID이며, 값에는 StartPositionProperties
오프셋, 시퀀스 번호, 큐에 추가된 날짜 시간 및 포함 여부의 속성이 포함됩니다. 예를 들어 다음과 같이 설정할 수 있습니다.
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
고급 소비자 구성
위의 연결, 검사point 및 일반적인 Azure SDK 클라이언트 구성은 접두사를 사용하여 구성할 수 있는 각 바인더 소비자에 대한 사용자 지정을 지원합니다spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
.
생산자 속성
이러한 속성은 .를 통해 EventHubsProducerProperties
노출됩니다.
spring-cloud-azure-stream-binder-eventhubs의 생산자 구성 가능한 속성:
속성 | Type | 설명 |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | 부울 값 | 생산자 동기화에 대한 스위치 플래그입니다. true이면 생산자는 보내기 작업 후 응답을 기다립니다. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | long | 보내기 작업 후 응답을 기다리는 시간입니다. 동기화 생산자를 사용하도록 설정한 경우에만 적용됩니다. |
고급 생산자 구성
위의 연결 및 일반적인 Azure SDK 클라이언트 구성은 접두사를 사용하여 구성할 수 있는 각 바인더 생산자에 대한 사용자 지정을 지원합니다spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
.
기본 사용법
Event Hubs에서/받는 메시지 보내기 및 받기
구성 옵션을 자격 증명 정보로 채웁니다.
연결 문자열 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
서비스 주체로서의 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
참고 항목
허용되는 tenant-id
값은 , common
organizations
, consumers
또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.
자격 증명을 관리 ID로 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
공급업체 및 소비자를 정의합니다.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
분할 지원
PartitionSupplier
보낼 메시지에 대한 파티션 정보를 구성하기 위해 사용자가 제공한 파티션 정보가 생성됩니다. 다음 순서도는 파티션 ID 및 키에 대해 다른 우선 순위를 가져오는 프로세스를 보여줍니다.
Batch 소비자 지원
다음 예제와 같이 일괄 처리 구성 옵션을 제공합니다.
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
공급업체 및 소비자를 정의합니다.
검사포인트링 모드의
BATCH
경우 다음 코드를 사용하여 메시지를 보내고 일괄 처리로 사용할 수 있습니다.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
검사포인트 지정 모드의
MANUAL
경우 다음 코드를 사용하여 메시지를 보내고 일괄 처리로 검사/검사.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
참고 항목
일괄 처리 사용 모드에서 Spring Cloud Stream 바인더의 기본 콘텐츠 형식은 application/json
메시지 페이로드가 콘텐츠 형식과 정렬되어 있는지 확인합니다. 예를 들어 페이로드가 있는 메시지를 String
수신하는 기본 콘텐츠 형식 application/json
을 사용하는 경우 페이로드는 원래 String
텍스트의 큰따옴표로 묶어야 합니다JSON String
. 콘텐츠 형식의 경우 text/plain
개체일 String
수 있습니다. 자세한 내용은 Spring Cloud Stream 콘텐츠 형식 협상을 참조하세요.
오류 메시지 처리
아웃바운드 바인딩 오류 메시지 처리
기본적으로 Spring Integration은 .라는
errorChannel
전역 오류 채널을 만듭니다. 아웃바운드 바인딩 오류 메시지를 처리하도록 다음 메시지 엔드포인트를 구성합니다.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
인바운드 바인딩 오류 메시지 처리
Spring Cloud Stream Event Hubs Binder는 인바운드 메시지 바인딩에 대한 오류를 처리하는 두 가지 솔루션인 사용자 지정 오류 채널 및 처리기를 지원합니다.
오류 채널:
Spring Cloud Stream은 각 인바운드 바인딩에 대한 오류 채널을 제공합니다. 오류
ErrorMessage
채널로 전송됩니다. 자세한 내용은 Spring Cloud Stream 설명서의 오류 처리를 참조하세요.기본 오류 채널
명명
errorChannel
된 전역 오류 채널을 사용하여 모든 인바운드 바인딩 오류 메시지를 사용할 수 있습니다. 이러한 메시지를 처리하려면 다음 메시지 엔드포인트를 구성합니다.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
바인딩 관련 오류 채널
특정 오류 채널을 사용하여 기본 오류 채널보다 우선 순위가 높은 특정 인바운드 바인딩 오류 메시지를 사용할 수 있습니다. 이러한 메시지를 처리하려면 다음 메시지 엔드포인트를 구성합니다.
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
참고 항목
바인딩 관련 오류 채널은 제공된 다른 오류 처리기 및 채널과 함께 사용할 수 없습니다.
오류 처리기:
Spring Cloud Stream은 인스턴스를 허용하는 메커니즘을 추가하여 사용자 지정 오류 처리기를 제공하는 메커니즘을
Consumer
ErrorMessage
노출합니다. 자세한 내용은 Spring Cloud Stream 설명서의 오류 처리를 참조하세요.참고 항목
바인딩 오류 처리기가 구성되면 기본 오류 채널에서 작동할 수 있습니다.
바인딩 기본 오류 처리기
모든 인바운드 바인딩 오류 메시지를 사용하도록 단일
Consumer
콩을 구성합니다. 다음 기본 함수는 각 인바운드 바인딩 오류 채널을 구독합니다.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
또한 함수 이름으로 속성을 설정
spring.cloud.stream.default.error-handler-definition
해야 합니다.바인딩 관련 오류 처리기
Consumer
특정 인바운드 바인딩 오류 메시지를 사용하도록 bean을 구성합니다. 다음 함수는 특정 인바운드 바인딩 오류 채널을 구독하고 바인딩 기본 오류 처리기보다 우선 순위가 높습니다.@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
또한 함수 이름으로 속성을 설정
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
해야 합니다.
Event Hubs 메시지 헤더
지원되는 기본 메시지 헤더는 Spring Integration용 Spring Cloud Azure 지원 Event Hubs 메시지 헤더 섹션을 참조하세요.
여러 바인더 지원
여러 Event Hubs 네임스페이스에 대한 커넥트 또한 여러 바인더를 사용하여 지원됩니다. 이 샘플에서는 연결 문자열 예로 사용합니다. 서비스 주체 및 관리 ID의 자격 증명도 지원됩니다. 각 바인더의 환경 설정에서 관련 속성을 설정할 수 있습니다.
Event Hubs에서 여러 바인더를 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
참고 항목
이전 애플리케이션 파일은 모든 바인딩에 애플리케이션에 대한 단일 기본 폴러를 구성하는 방법을 보여 줍니다. 특정 바인딩에 대해 폴러를 구성하려면 다음과 같은
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
구성을 사용할 수 있습니다.두 개의 공급업체와 두 명의 소비자를 정의해야 합니다.
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
리소스 프로비저닝
Event Hubs 바인더는 이벤트 허브 및 소비자 그룹의 프로비저닝을 지원하며, 사용자는 다음 속성을 사용하여 프로비저닝을 사용하도록 설정할 수 있습니다.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
참고 항목
허용되는 tenant-id
값은 , common
organizations
, consumers
또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.
샘플
자세한 내용은 GitHub의 azure-spring-boot-samples 리포지토리를 참조하세요.
Azure Service Bus용 Spring Cloud 스트림 바인더
주요 개념
Azure Service Bus용 Spring Cloud Stream Binder는 Spring Cloud Stream Framework에 대한 바인딩 구현을 제공합니다. 이 구현에서는 Spring Integration Service Bus 채널 어댑터를 기초로 사용합니다.
예약된 메시지
이 바인더는 지연된 처리를 위해 토픽에 메시지 전송을 지원합니다. 사용자는 메시지의 지연 시간을 밀리초 단위로 표현하는 헤더 x-delay
를 사용하여 예약된 메시지를 보낼 수 있습니다. 메시지는 밀리초 후에 x-delay
해당 토픽에 전달됩니다.
소비자 그룹
Service Bus 토픽은 Apache Kafka와 비슷한 소비자 그룹을 지원하지만 논리는 약간 다릅니다.
이 바인더는 토픽을 사용하여 Subscription
소비자 그룹으로 작동합니다.
종속성 설정
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
또는 Maven에 대한 다음 예제와 같이 Spring Cloud Azure Stream Service Bus Starter를 사용할 수도 있습니다.
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
구성
바인더는 구성 옵션의 다음 두 부분을 제공합니다.
커넥트ion 구성 속성
이 섹션에는 Azure Service Bus에 연결하는 데 사용되는 구성 옵션이 포함되어 있습니다.
참고 항목
보안 주체를 사용하여 Azure 리소스에 액세스하기 위해 Microsoft Entra ID로 인증하고 권한을 부여하도록 선택한 경우 Microsoft Entra ID를 사용하여 액세스 권한 부여를 참조하여 보안 주체에게 Azure 리소스에 액세스할 수 있는 충분한 권한이 부여되었는지 확인합니다.
spring-cloud-azure-stream-binder-servicebus의 커넥트ion 구성 가능한 속성:
속성 | Type | 설명 |
---|---|---|
spring.cloud.azure.servicebus.enabled | 부울 값 | Azure Service Bus를 사용할 수 있는지 여부입니다. |
spring.cloud.azure.servicebus.connection-string | 문자열 | Service Bus 네임스페이스 연결 문자열 값입니다. |
spring.cloud.azure.servicebus.namespace | 문자열 | FQDN의 접두사인 Service Bus 네임스페이스 값입니다. FQDN은 NamespaceName.Do기본 이름으로 구성되어야 합니다. |
spring.cloud.azure.servicebus.do기본-name | 문자열 | Azure Service Bus 네임스페이스 값의 이름을 기본. |
참고 항목
일반적인 Azure 서비스 SDK 구성 옵션은 Spring Cloud Azure Stream Service Bus 바인더에도 구성할 수 있습니다. 지원되는 구성 옵션은 Spring Cloud Azure 구성에 도입되었으며 통합 접두사 spring.cloud.azure.
또는 접두spring.cloud.azure.servicebus.
사로 구성할 수 있습니다.
바인더는 Spring Could Azure Resource Manager도 기본적으로 지원합니다. 관련 역할로 부여 Data
되지 않은 보안 주체를 사용하여 연결 문자열 검색하는 방법에 대한 자세한 내용은 Spring Could Azure Resource Manager의 기본 사용 섹션을 참조하세요.
Azure Service Bus 바인딩 구성 속성
다음 옵션은 소비자 속성, 고급 소비자 구성, 생산자 속성 및 고급 생산자 구성의 네 가지 섹션으로 나뉩니다.
소비자 속성
이러한 속성은 .를 통해 ServiceBusConsumerProperties
노출됩니다.
spring-cloud-azure-stream-binder-servicebus의 소비자 구성 가능한 속성:
속성 | Type | 기본값 | 설명 |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | 부울 값 | false | 실패한 메시지가 DLQ로 라우팅되는 경우 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | 정수 | 1 | Service Bus 프로세서 클라이언트에서 처리해야 하는 최대 동시 메시지입니다. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | 정수 | null | 지정된 시간에 처리할 최대 동시 세션 수입니다. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | 부울 | null | 세션을 사용할 수 있는지 여부입니다. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | 정수 | 0 | Service Bus 프로세서 클라이언트의 프리페치 수입니다. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | 하위 큐 | 없음 | 연결할 하위 큐의 형식입니다. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | 기간 | 5m | 잠금 자동 갱신을 계속할 시간입니다. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Service Bus 프로세서 클라이언트의 수신 모드입니다. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | 부울 | true | 메시지를 자동으로 정정할지 여부입니다. false로 설정하면 개발자가 메시지를 수동으로 해결할 수 있도록 메시지 헤더 Checkpointer 가 추가됩니다. |
고급 소비자 구성
위의 연결 및 일반적인 Azure SDK 클라이언트 구성은 접두사를 사용하여 구성할 수 있는 각 바인더 소비자에 대한 사용자 지정을 지원합니다spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
.
생산자 속성
이러한 속성은 .를 통해 ServiceBusProducerProperties
노출됩니다.
spring-cloud-azure-stream-binder-servicebus의 생산자 구성 가능한 속성:
속성 | Type | 기본값 | 설명 |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | 부울 값 | false | 생산자 동기화에 대한 스위치 플래그입니다. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | long | 10000 | 생산자를 보내기 위한 시간 제한 값입니다. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | null | 바인딩 생산자에 필요한 생산자의 Service Bus 엔터티 형식입니다. |
Important
바인딩 생산자를 사용하는 경우 속성을 spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
구성해야 합니다.
고급 생산자 구성
위의 연결 및 일반적인 Azure SDK 클라이언트 구성은 접두사를 사용하여 구성할 수 있는 각 바인더 생산자에 대한 사용자 지정을 지원합니다spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
.
기본 사용법
Service Bus에서/받는 메시지 보내기 및 받기
구성 옵션을 자격 증명 정보로 채웁니다.
연결 문자열 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
서비스 주체로서의 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
참고 항목
허용되는 tenant-id
값은 , common
organizations
, consumers
또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.
자격 증명을 관리 ID로 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
공급업체 및 소비자를 정의합니다.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
파티션 키 지원
바인더는 메시지 헤더에서 파티션 키 및 세션 ID를 설정하도록 허용하여 Service Bus 분할을 지원합니다. 이 섹션에서는 메시지의 파티션 키를 설정하는 방법을 소개합니다.
Spring Cloud Stream은 파티션 키 SpEL 식 속성을 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
제공합니다. 예를 들어 이 속성을 설정하여 "'partitionKey-' + headers[<message-header-key>]"
message-header-key라는 헤더를 추가합니다. Spring Cloud Stream은 식을 평가할 때 이 헤더의 값을 사용하여 파티션 키를 할당합니다. 다음 코드는 예제 생산자를 제공합니다.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
세션 지원
바인더는 Service Bus의 메시지 세션을 지원 합니다 . 메시지 헤더를 통해 메시지의 세션 ID를 설정할 수 있습니다.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
참고 항목
Service Bus 분할에 따르면 세션 ID는 파티션 키보다 우선 순위가 높습니다. 따라서 두 헤더가 모두 ServiceBusMessageHeaders#SESSION_ID
ServiceBusMessageHeaders#PARTITION_KEY
설정되면 세션 ID의 값이 결국 파티션 키의 값을 덮어쓰는 데 사용됩니다.
오류 메시지 처리
아웃바운드 바인딩 오류 메시지 처리
기본적으로 Spring Integration은 .라는
errorChannel
전역 오류 채널을 만듭니다. 아웃바운드 바인딩 오류 메시지를 처리하도록 다음 메시지 엔드포인트를 구성합니다.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
인바운드 바인딩 오류 메시지 처리
Spring Cloud Stream Service Bus Binder는 인바운드 메시지 바인딩에 대한 오류를 처리하는 세 가지 솔루션인 바인더 오류 처리기, 사용자 지정 오류 채널 및 처리기를 지원합니다.
바인더 오류 처리기:
기본 바인더 오류 처리기는 인바운드 바인딩을 처리합니다. 이 처리기를 사용하여 설정된 경우
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
배달 못한 편지 큐에 실패한 메시지를 보냅니다. 그렇지 않으면 실패한 메시지가 중단됩니다. 바인딩별 오류 채널을 구성하는 경우를 제외하고 바인더 오류 처리기는 다른 사용자 지정 오류 처리기 또는 채널이 있는지 여부에 관계없이 항상 적용됩니다.오류 채널:
Spring Cloud Stream은 각 인바운드 바인딩에 대한 오류 채널을 제공합니다. 오류
ErrorMessage
채널로 전송됩니다. 자세한 내용은 Spring Cloud Stream 설명서의 오류 처리를 참조하세요.기본 오류 채널
명명
errorChannel
된 전역 오류 채널을 사용하여 모든 인바운드 바인딩 오류 메시지를 사용할 수 있습니다. 이러한 메시지를 처리하려면 다음 메시지 엔드포인트를 구성합니다.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
바인딩 관련 오류 채널
특정 오류 채널을 사용하여 기본 오류 채널보다 우선 순위가 높은 특정 인바운드 바인딩 오류 메시지를 사용할 수 있습니다. 이러한 메시지를 처리하려면 다음 메시지 엔드포인트를 구성합니다.
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
참고 항목
바인딩 관련 오류 채널은 제공된 다른 오류 처리기 및 채널과 함께 사용할 수 없습니다.
오류 처리기:
Spring Cloud Stream은 인스턴스를 허용하는 메커니즘을 추가하여 사용자 지정 오류 처리기를 제공하는 메커니즘을
Consumer
ErrorMessage
노출합니다. 자세한 내용은 Spring Cloud Stream 설명서의 오류 처리를 참조하세요.참고 항목
바인딩 오류 처리기가 구성되면 기본 오류 채널 및 바인더 오류 처리기에서 작동할 수 있습니다.
바인딩 기본 오류 처리기
모든 인바운드 바인딩 오류 메시지를 사용하도록 단일
Consumer
콩을 구성합니다. 다음 기본 함수는 각 인바운드 바인딩 오류 채널을 구독합니다.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
또한 함수 이름으로 속성을 설정
spring.cloud.stream.default.error-handler-definition
해야 합니다.바인딩 관련 오류 처리기
Consumer
특정 인바운드 바인딩 오류 메시지를 사용하도록 bean을 구성합니다. 다음 함수는 바인딩 기본 오류 처리기보다 우선 순위가 높은 특정 인바운드 바인딩 오류 채널을 구독합니다.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
또한 함수 이름으로 속성을 설정
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
해야 합니다.
Service Bus 메시지 헤더
지원되는 기본 메시지 헤더는 Spring Integration용 Spring Cloud Azure 지원 Service Bus 메시지 헤더 섹션을 참조하세요.
참고 항목
파티션 키를 설정할 때 메시지 헤더의 우선 순위가 Spring Cloud Stream 속성보다 높습니다. 따라서 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
헤더 및 ServiceBusMessageHeaders#PARTITION_KEY
헤더가 ServiceBusMessageHeaders#SESSION_ID
구성되지 않은 경우에만 적용됩니다.
여러 바인더 지원
여러 Service Bus 네임스페이스에 대한 커넥트 또한 여러 바인더를 사용하여 지원됩니다. 이 샘플에서는 연결 문자열 예로 사용합니다. 서비스 주체 및 관리 ID의 자격 증명도 지원되며, 사용자는 각 바인더의 환경 설정에서 관련 속성을 설정할 수 있습니다.
ServiceBus의 여러 바인더를 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
참고 항목
이전 애플리케이션 파일은 모든 바인딩에 애플리케이션에 대한 단일 기본 폴러를 구성하는 방법을 보여 줍니다. 특정 바인딩에 대해 폴러를 구성하려면 다음과 같은
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
구성을 사용할 수 있습니다.공급업체 2개와 소비자 2개를 정의해야 합니다.
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
리소스 프로비저닝
Service Bus 바인더는 큐, 토픽 및 구독의 프로비저닝을 지원하며, 사용자는 다음 속성을 사용하여 프로비저닝을 사용하도록 설정할 수 있습니다.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
참고 항목
허용되는 tenant-id
값은 , common
organizations
, consumers
또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은 오류 AADSTS50020 - ID 공급자의 사용자 계정이 테넌트에 존재하지 않는 잘못된 엔드포인트(개인 및 조직 계정) 사용 섹션을 참조하세요. 단일 테넌트 앱을 변환하는 방법에 대한 자세한 내용은 Microsoft Entra ID에서 단일 테넌트 앱을 다중 테넌트로 변환을 참조하세요.
샘플
자세한 내용은 GitHub의 azure-spring-boot-samples 리포지토리를 참조하세요.
피드백
https://aka.ms/ContentUserFeedback
출시 예정: 2024년 내내 콘텐츠에 대한 피드백 메커니즘으로 GitHub 문제를 단계적으로 폐지하고 이를 새로운 피드백 시스템으로 바꿀 예정입니다. 자세한 내용은 다음을 참조하세요.다음에 대한 사용자 의견 제출 및 보기