Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Spring Cloud Stream, paylaşılan mesajlaşma sistemleriyle bağlantılı yüksek oranda ölçeklenebilir olay odaklı mikro hizmetler oluşturmaya yönelik bir çerçevedir.
Çerçeve, önceden oluşturulmuş ve tanıdık Spring deyimleri ve en iyi uygulamalar üzerine oluşturulmuş esnek bir programlama modeli sağlar. Bu en iyi yöntemler kalıcı pub/sub semantiği, tüketici grupları ve durum bilgisi olan bölümler için destek içerir.
Geçerli bağlayıcı uygulamaları şunlardır:
- Daha fazla bilgi için bkz. Azure Event Hubs için Spring Cloud Stream Binder - Daha fazla bilgi için bkz. Azure Service Bus için Spring Cloud Stream Binder
Azure Event Hubs için Spring Cloud Stream Binder
Temel kavramlar
Azure Event Hubs için Spring Cloud Stream Binder, Spring Cloud Stream çerçevesi için bağlama uygulamasını sağlar.
Bu uygulama, temelinde Spring Integration Event Hubs Kanal Bağdaştırıcılarını kullanır. Tasarım açısından bakıldığında Event Hubs, Kafka ile benzerdir. Ayrıca Event Hubs'a Kafka API aracılığıyla erişilebilir. Projenizin Kafka API'sinde sıkı bağımlılığı varsa, Kafka API Örneği ile Olay Hub'ını
Tüketici grubu
Event Hubs, tüketici grubu için Apache Kafka ile benzer destek sağlar ancak biraz farklı mantık sunar. Kafka, işlenen tüm uzaklıkları aracıda depolasa da, el ile işlenen Event Hubs iletilerinin uzaklıklarını depolamanız gerekir. Event Hubs SDK'sı, bu tür uzaklıkları Azure Depolama'da depolama işlevi sağlar.
Bölümleme desteği
Event Hubs, Kafka ile benzer bir fiziksel bölüm kavramı sağlar. Ancak Kafka'nın tüketiciler ve bölümler arasında otomatik olarak yeniden dengelenmesinden farklı olarak, Event Hubs bir tür önleyici mod sağlar. Depolama hesabı, hangi tüketicinin hangi bölüme sahip olduğunu belirlemek için kira görevi görür. Yeni bir tüketici başladığında, iş yükü bakiyesini elde etmek için en yoğun yüke sahip tüketicilerden bazı bölümleri çalmaya çalışır.
Yük dengeleme stratejisini belirtmek için spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* özellikleri sağlanır. Daha fazla bilgi için Tüketici özellikleri bölümüne bakın.
Batch tüketici desteği
Spring Cloud Azure Stream Event Hubs bağlayıcısıSpring Cloud Stream Batch Consumer özelliğini
Batch-consumer moduyla çalışmak için spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode özelliğini trueolarak ayarlayın. Etkinleştirildiğinde, toplu olaylar listesinin yükünü içeren bir ileti alınır ve Consumer işlevine geçirilir. Her ileti üst bilgisi, içeriği her olaydan ayrıştırılan ilişkili üst bilgi değeri olan bir listeye de dönüştürülür. Tüm olay grubu aynı değeri paylaştığından bölüm kimliği, denetim noktası ve son sıralanan özelliklerin ortak üst bilgileri tek bir değer olarak sunulur. Daha fazla bilgi için Spring Integrationiçin Spring Cloud Azure desteğinin
Not
Denetim noktası üst bilgisi yalnızca MANUAL denetim noktası modu kullanıldığında bulunur.
Toplu tüketici denetim noktası oluşturma iki modu destekler: BATCH ve MANUAL.
BATCH modu, bağlayıcı bunları aldıktan sonra olayların tamamını birlikte kontrol etmek için otomatik bir denetim noktası modudur.
MANUAL modu, kullanıcılara göre olayları kontrol etmektir. Kullanıldığında, Checkpointer ileti üst bilgisine geçirilir ve kullanıcılar bunu denetim noktası oluşturma amacıyla kullanabilir.
max-sizeön ekini içeren max-wait-time ve spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch. özelliklerini ayarlayarak toplu iş boyutunu belirtebilirsiniz.
max-size özelliği gereklidir ve max-wait-time özelliği isteğe bağlıdır. Daha fazla bilgi için Tüketici özellikleri bölümüne bakın.
Bağımlılık kurulumu
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Alternatif olarak, aşağıdaki Maven örneğinde gösterildiği gibi Spring Cloud Azure Stream Event Hubs Starter'ı da kullanabilirsiniz:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Konfigürasyon
Bağlayıcı, yapılandırma seçeneklerinin aşağıdaki üç bölümünü sağlar:
Bağlantı yapılandırma özellikleri
Bu bölüm, Azure Event Hubs'a bağlanmak için kullanılan yapılandırma seçeneklerini içerir.
Not
Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz güvenlik sorumlusuna Azure kaynağına erişmek için yeterli izin verildiğinden emin olmak için microsoft entra id erişimi yetkilendirme
spring-cloud-azure-stream-binder-eventhubs'ın bağlantı yapılandırılabilir özellikleri:
| Mülk | Tür | Açıklama |
|---|---|---|
| spring.cloud.azure.eventhubs.enabled | Boolean | Azure Event Hubs'ın etkinleştirilip etkinleştirilmediği. |
| spring.cloud.azure.eventhubs.connection-string |
Dizgi | Event Hubs Ad Alanı bağlantı dizesi değeri. |
| spring.cloud.azure.eventhubs.namespace | Dizgi | FQDN'nin ön eki olan Event Hubs Ad Alanı değeri. FQDN, NamespaceName.DomainName dosyasından oluşmalıdır |
| spring.cloud.azure.eventhubs.domain-name | Dizgi | Azure Event Hubs Ad Alanı değerinin etki alanı adı. |
| spring.cloud.azure.eventhubs.custom-endpoint-address | Dizgi | Özel Uç Nokta adresi. |
Bahşiş
Yaygın Azure Hizmeti SDK yapılandırma seçenekleri Spring Cloud Azure Stream Event Hubs bağlayıcısı için de yapılandırılabilir. Desteklenen yapılandırma seçenekleri Spring Cloud Azure yapılandırmasunulmuştur ve birleştirilmiş ön ek spring.cloud.azure. veya spring.cloud.azure.eventhubs.ön eki ile yapılandırılabilir.
Bağlayıcı ayrıca Spring Azure Resource Manager'ın varsayılan olarak destekler. İlgili
Denetim noktası yapılandırma özellikleri
Bu bölüm, bölüm sahipliğini ve denetim noktası bilgilerini kalıcı hale etmek için kullanılan Depolama Blobları hizmetinin yapılandırma seçeneklerini içerir.
Not
Sürüm 4.0.0'dan itibaren, spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists özelliği el ile etkinleştirilmediğinde, spring.cloud.stream.bindings.binding-name.destinationadlı depolama kapsayıcısı otomatik olarak oluşturulmaz.
spring-cloud-azure-stream-binder-eventhubs'ın yapılandırılabilir özelliklerini denetleme:
| Mülk | Tür | Açıklama |
|---|---|---|
| spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolean (Boole Mantığı) | Mevcut değilse kapsayıcı oluşturmaya izin verilip verilmeyeceği. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name |
Dizgi | Depolama hesabının adı. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Dizgi | Depolama hesabı erişim anahtarı. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Dizgi | Depolama kapsayıcısı adı. |
Bahşiş
Yaygın Azure Hizmeti SDK yapılandırma seçenekleri, Depolama Blobu denetim noktası deposu için de yapılandırılabilir. Desteklenen yapılandırma seçenekleri Spring Cloud Azure yapılandırmasunulmuştur ve birleştirilmiş ön ek spring.cloud.azure. veya spring.cloud.azure.eventhubs.processor.checkpoint-storeön eki ile yapılandırılabilir.
Azure Event Hubs Bağlama yapılandırma özellikleri
Aşağıdaki seçenekler dört bölüme ayrılmıştır: Tüketici Özellikleri, Gelişmiş Tüketici Yapılandırmaları, Üretici Özellikleri ve Gelişmiş Üretici Yapılandırmaları.
Tüketici özellikleri
Bu özellikler EventHubsConsumerPropertiesaracılığıyla kullanıma sunulur.
Not
Sürüm 4.17.0 ve 5.11.0'dan bu yana yinelemeyi önlemek için Spring Cloud Azure Stream Binder Event Hubs, spring.cloud.stream.eventhubs.default.consumer.<property>=<value>biçiminde tüm kanallar için değerlerin ayarlanmasını destekler.
Spring-cloud-azure-stream-binder-eventhubs'ın tüketici tarafından yapılandırılabilir özellikleri:
| Mülk | Tür | Açıklama |
|---|---|---|
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode |
Denetim Noktası Modu | Tüketici denetim noktası iletisinin nasıl denetleneceğine karar verince kullanılan denetim noktası modu |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Tam sayı | Her bölümün tek bir denetim noktası yapması için ileti miktarına karar verir. Yalnızca PARTITION_COUNT denetim noktası modu kullanıldığında geçerlilik kazanır. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Süre | Bir denetim noktası yapmak için zaman aralığına karar verir. Yalnızca TIME denetim noktası modu kullanıldığında geçerlilik kazanır. |
| spring.cloud.stream.eventhubs.bindings .<binding-name.consumer.batch.max-size | Tam sayı | Toplu iş içindeki en fazla olay sayısı. Toplu tüketici modu için gereklidir. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time |
Süre | Toplu işlem için en uzun süre. Yalnızca toplu iş tüketici modu etkinleştirildiğinde ve isteğe bağlı olduğunda geçerlilik kazanır. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Süre | Güncelleştirme için aralık süresi. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | Yük Dengeleme Stratejisi | Yük dengeleme stratejisi. |
| spring.cloud.stream.eventhubs.bindings.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval |
Süre | Bölümün sahipliğinin sona erdiği süre. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Boolean (Boole Mantığı) | Olay işlemcisinin ilişkili bölümünde en son sıraya alınan olayla ilgili bilgi isteyip istememesi ve olaylar alınırken bu bilgileri izlemesi gerekip gerekmediği. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count |
Tam sayı | Olay Hub'ı tüketicisinin etkin olarak alıp yerel olarak kuyruğa alacağı olay sayısını denetlemek için tüketici tarafından kullanılan sayı. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Bölüm kimliği ve StartPositionProperties değerleri olarak anahtarla eşleme |
Denetim noktası deposunda bölüm için bir denetim noktası yoksa, her bölüm için kullanılacak olay konumunu içeren eşleme. Bu eşleme bölüm kimliğinin dışında anahtarlanır. |
Not
initial-partition-event-position yapılandırması, her olay hub'ı için başlangıç konumunu belirtmek üzere bir map kabul eder. Bu nedenle, anahtarı bölüm kimliğidir ve değer, uzaklık, sıra numarası, sıralanan tarih saati ve dahil olup olmadığını içeren StartPositionPropertiesdeğeridir. Örneğin, bunu olarak ayarlayabilirsiniz
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Gelişmiş tüketici yapılandırması
Yukarıdaki bağlantıdenetim noktasıve ortak Azure SDK istemcisi yapılandırma desteği her bağlayıcı tüketicisi için özelleştirmeyi destekler ve bu özelleştirmeyi spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.ön ekiyle yapılandırabilirsiniz.
Üretici özellikleri
Bu özellikler EventHubsProducerPropertiesaracılığıyla kullanıma sunulur.
Not
Sürüm 4.17.0 ve 5.11.0'dan bu yana yinelemeyi önlemek için Spring Cloud Azure Stream Binder Event Hubs, spring.cloud.stream.eventhubs.default.producer.<property>=<value>biçiminde tüm kanallar için değerlerin ayarlanmasını destekler.
spring-cloud-azure-stream-binder-eventhubs üretici tarafından yapılandırılabilir özellikler:
| Mülk | Tür | Açıklama |
|---|---|---|
| spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | Boolean | Üretici eşitlemesi için anahtar bayrağı. True ise, üretici bir gönderme işleminden sonra yanıt bekler. |
| spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | uzun | Gönderme işleminden sonra yanıt bekleme süresi. Yalnızca eşitleme üreticisi etkinleştirildiğinde geçerlilik kazanır. |
Gelişmiş üretici yapılandırması
Yukarıdaki bağlantı ve ortak Azure SDK istemcisi yapılandırması, her bağlayıcı üreticisi için özelleştirmeyi destekler ve bu özelleştirmeyi spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.ön ekiyle yapılandırabilirsiniz.
Temel kullanım
Event Hubs'dan/Event Hubs'a ileti gönderme ve alma
Yapılandırma seçeneklerini kimlik bilgileriyle doldurun.
Bağlantı dizesi olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUALNot
Microsoft, kullanılabilir en güvenli kimlik doğrulama akışının kullanılmasını önerir. Veritabanları, önbellekler, mesajlaşma veya yapay zeka hizmetleri gibi bu yordamda açıklanan kimlik doğrulama akışı, uygulamaya çok yüksek düzeyde güven gerektirir ve diğer akışlarda mevcut olmayan riskler taşır. Bu akışı yalnızca parolasız veya anahtarsız bağlantılar için yönetilen kimlikler gibi daha güvenli seçenekler uygun olmadığında kullanın. Yerel makine işlemleri için parolasız veya anahtarsız bağlantılar için kullanıcı kimliklerini tercih edin.
Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Not
tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.
Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Sağlayıcıyı ve tüketiciyi tanımlayın.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Bölümleme desteği
Gönderilecek iletiyle ilgili bölüm bilgilerini yapılandırmak için kullanıcı tarafından sağlanan bölüm bilgilerini içeren bir PartitionSupplier oluşturulur. Aşağıdaki akış çizelgesinde bölüm kimliği ve anahtarı için farklı öncelikleri alma işlemi gösterilmektedir:
bölümleme destek işleminin akış çizelgesini gösteren
Batch tüketici desteği
Aşağıdaki örnekte gösterildiği gibi toplu iş yapılandırma seçeneklerini belirtin:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as neededSağlayıcıyı ve tüketiciyi tanımlayın.
BATCHolarak denetim noktası oluşturma modu için, iletileri göndermek ve toplu olarak kullanmak için aşağıdaki kodu kullanabilirsiniz.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }MANUALolarak denetim noktası oluşturma modu için, iletileri göndermek ve toplu olarak kullanmak/kontrol noktası yapmak için aşağıdaki kodu kullanabilirsiniz.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Not
Toplu işlem modunda Spring Cloud Stream bağlayıcısının varsayılan içerik türü application/jsonolduğundan ileti yükünün içerik türüyle hizalandığından emin olun. Örneğin, application/json yüküne sahip iletileri almak için varsayılan String içerik türünü kullanırken yük, özgün JSON String metni için çift tırnak içine alınan Stringolmalıdır.
text/plain içerik türü için ise doğrudan bir String nesnesi olabilir. Daha fazla bilgi için bkz. Spring Cloud Stream İçerik Türü Anlaşması.
Hata iletilerini işleme
- Spring Cloud Azure 6.x/7.x
-
Spring Cloud Azure 5.x
-
Spring Cloud Azure 4.x
Giden bağlama hata iletilerini işleme
Varsayılan olarak Spring Integration,
errorChanneladlı bir genel hata kanalı oluşturur. Giden bağlama hata iletilerini işlemek için aşağıdaki ileti uç noktasını yapılandırın.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }Gelen bağlama hata iletilerini işleme
Spring Cloud Stream Event Hubs Binder, gelen ileti bağlamalarındaki hataları işlemek için tek bir çözümü destekler: hata işleyicileri.
Hata İşleyicisi:
Spring Cloud Stream,
Consumerörnekleri kabul eden birErrorMessageekleyerek özel hata işleyicisi sağlamanız için bir mekanizma sunar. Daha fazla bilgi için Spring Cloud Stream belgelerinde Hata İletilerini İşlemebakın. Bağlama varsayılan hata işleyicisi
Tüm gelen bağlama hata iletilerini kullanmak için tek bir
Consumerçekirdeği yapılandırın. Aşağıdaki varsayılan işlev her gelen bağlama hata kanalına abone olur:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }ayrıca
spring.cloud.stream.default.error-handler-definitionözelliğini işlev adına ayarlamanız gerekir.Bağlamaya özgü hata işleyicisi
Belirli gelen bağlama hata iletilerini kullanmak için bir
Consumerçekirdeği yapılandırın. Aşağıdaki işlev, belirli bir gelen bağlama hata kanalına abone olur ve bağlama varsayılan hata işleyicisinden daha yüksek önceliğe sahiptir:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }ayrıca
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definitionözelliğini işlev adına ayarlamanız gerekir.
Event Hubs ileti üst bilgileri
Desteklenen temel ileti üst bilgileri için Spring Integrationiçin Spring Cloud Azure desteğinin
Birden çok bağlayıcı desteği
Birden çok Event Hubs ad alanına bağlantı, birden çok bağlayıcı kullanılarak da desteklenir. Bu örnek, örnek olarak bir bağlantı dizesi alır. Hizmet sorumlularının ve yönetilen kimliklerin kimlik bilgileri de desteklenir. Her bağlayıcının ortam ayarlarında ilgili özellikleri ayarlayabilirsiniz.
Event Hubs ile birden çok bağlayıcı kullanmak için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000Not
Önceki uygulama dosyası, tüm bağlamalara uygulama için tek bir varsayılan poller yapılandırmayı gösterir. Poller'ı belirli bir bağlama için yapılandırmak istiyorsanız,
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000gibi bir yapılandırma kullanabilirsiniz.Not
Microsoft, kullanılabilir en güvenli kimlik doğrulama akışının kullanılmasını önerir. Veritabanları, önbellekler, mesajlaşma veya yapay zeka hizmetleri gibi bu yordamda açıklanan kimlik doğrulama akışı, uygulamaya çok yüksek düzeyde güven gerektirir ve diğer akışlarda mevcut olmayan riskler taşır. Bu akışı yalnızca parolasız veya anahtarsız bağlantılar için yönetilen kimlikler gibi daha güvenli seçenekler uygun olmadığında kullanın. Yerel makine işlemleri için parolasız veya anahtarsız bağlantılar için kullanıcı kimliklerini tercih edin.
İki tedarikçi ve iki tüketici tanımlamamız gerekir:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Kaynak sağlama
Event Hubs bağlayıcısı olay hub'ının ve tüketici grubunun sağlanmasını destekler; kullanıcılar sağlamayı etkinleştirmek için aşağıdaki özellikleri kullanabilir.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Not
tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.
Örnekleri
Daha fazla bilgi için GitHub'daki azure-spring-boot-samples
Azure Service Bus için Spring Cloud Stream Binder
Temel kavramlar
Azure Service Bus için Spring Cloud Stream Binder, Spring Cloud Stream Framework için bağlama uygulamasını sağlar. Bu uygulama, temelinde Spring Integration Service Bus Kanal Bağdaştırıcılarını kullanır.
Zamanlanmış ileti
Bu bağlayıcı, gecikmeli işleme için bir konuya ileti göndermeyi destekler. Kullanıcılar, iletinin gecikme süresini milisaniye cinsinden ifade eden üst bilgi x-delay zamanlanmış iletiler gönderebilir. İleti, x-delay milisaniye sonra ilgili konulara teslim edilir.
Tüketici grubu
Service Bus Konusu, tüketici grubu için Apache Kafka ile benzer destek sağlar, ancak biraz farklı mantık sunar.
Bu bağlayıcı, tüketici grubu olarak hareket etmek için bir konunun Subscription dayanır.
Bağımlılık kurulumu
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Alternatif olarak, aşağıdaki Maven örneğinde gösterildiği gibi Spring Cloud Azure Stream Service Bus Starter'ı da kullanabilirsiniz:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Konfigürasyon
Bağlayıcı, yapılandırma seçeneklerinin aşağıdaki iki bölümünü sağlar:
Bağlantı yapılandırma özellikleri
Bu bölüm, Azure Service Bus'a bağlanmak için kullanılan yapılandırma seçeneklerini içerir.
Not
Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz güvenlik sorumlusuna Azure kaynağına erişmek için yeterli izin verildiğinden emin olmak için microsoft entra id erişimi yetkilendirme
spring-cloud-azure-stream-binder-servicebus'ın bağlantı yapılandırılabilir özellikleri:
| Mülk | Tür | Açıklama |
|---|---|---|
| spring.cloud.azure.servicebus.enabled |
Boolean | Azure Service Bus'ın etkinleştirilip etkinleştirilmediği. |
| spring.cloud.azure.servicebus.connection-string | Dizgi | Service Bus Ad Alanı bağlantı dizesi değeri. |
| spring.cloud.azure.servicebus.custom-endpoint-address | Dizgi | Service Bus'a bağlanırken kullanılacak özel uç nokta adresi. |
| spring.cloud.azure.servicebus.namespace |
Dizgi | FQDN ön eki olan Service Bus Ad Alanı değeri. FQDN, NamespaceName.DomainName dosyasından oluşmalıdır |
| spring.cloud.azure.servicebus.etki alanı-adı | Dizgi | Azure Service Bus Ad Alanı değerinin etki alanı adı. |
Not
Yaygın Azure Hizmet SDK'sı yapılandırma seçenekleri Spring Cloud Azure Stream Service Bus bağlayıcısı için de yapılandırılabilir. Desteklenen yapılandırma seçenekleri Spring Cloud Azure yapılandırmasunulmuştur ve birleştirilmiş ön ek spring.cloud.azure. veya spring.cloud.azure.servicebus.ön eki ile yapılandırılabilir.
Bağlayıcı ayrıca Spring Azure Resource Manager'ın varsayılan olarak destekler. İlgili
Azure Service Bus bağlama yapılandırma özellikleri
Aşağıdaki seçenekler dört bölüme ayrılmıştır: Tüketici Özellikleri, Gelişmiş Tüketici Yapılandırmaları, Üretici Özellikleri ve Gelişmiş Üretici Yapılandırmaları.
Tüketici özellikleri
Bu özellikler ServiceBusConsumerPropertiesaracılığıyla kullanıma sunulur.
Not
Sürüm 4.17.0 ve 5.11.0'dan bu yana yinelemeyi önlemek için Spring Cloud Azure Stream Binder Service Bus, spring.cloud.stream.servicebus.default.consumer.<property>=<value>biçiminde tüm kanallar için değerlerin ayarlanmasını destekler.
spring-cloud-azure-stream-binder-servicebus'ın tüketici tarafından yapılandırılabilir özellikleri:
| Mülk | Tür | Temerrüt | Açıklama |
|---|---|---|---|
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-reddedildi | Boolean | yanlış | Başarısız iletiler DLQ'ya yönlendirilirse. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls |
Tam sayı | 1 | Service Bus işlemci istemcisinin işlemesi gereken en fazla eşzamanlı ileti. Oturum etkinleştirildiğinde, her oturum için geçerlidir. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions |
Tam sayı | sıfır | Herhangi bir zamanda işlenmek üzere en fazla eş zamanlı oturum sayısı. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled |
Boolean (Boole Mantığı) | sıfır | Oturumun etkinleştirilip etkinleştirilmediği. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer .session-idle-timeout | Süre | sıfır | Etkin durumdaki oturumda bir iletinin alınmasını beklemek için en uzun süreyi (Süre) ayarlar. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count |
Tam sayı | 0 | Service Bus işlemci istemcisinin ön işlem sayısı. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | Alt Sorgu | hiç kimse | Bağlanacak alt kuyruğun türü. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Süre | 5 dk. | Kilidi otomatik yenilemeye devam etme süresi. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Service Bus işlemci istemcisinin alma modu. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Boolean (Boole Mantığı) | doğru | İletilerin otomatik olarak kapatılıp kapatılmayacağı. False olarak ayarlanırsa, geliştiricilerin iletileri el ile kapatmasını sağlamak için Checkpointer bir ileti üst bilgisi eklenir. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabayt | Uzun | 1024 | Kuyruk/konu başlığı için ayrılan belleğin boyutu olan megabayt cinsinden en büyük kuyruk/konu boyutu. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | Süre | P10675199DT2H48M5.4775807S. (10675199 gün, 2 saat, 48 dakika, 5 saniye ve 477 milisaniye) | İletinin Service Bus'a gönderilmesinden başlayarak, iletinin süresinin dolma süresi. |
Önemli
Azure Resource Manager (ARM)
Gelişmiş tüketici yapılandırması
Yukarıdaki bağlantı ve ortak Azure SDK istemcisi yapılandırması, her bağlayıcı tüketicisi için özelleştirmeyi destekler ve bu özelleştirmeyi spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.ön ekiyle yapılandırabilirsiniz.
Üretici özellikleri
Bu özellikler ServiceBusProducerPropertiesaracılığıyla kullanıma sunulur.
Not
Sürüm 4.17.0 ve 5.11.0'dan bu yana yinelemeyi önlemek için Spring Cloud Azure Stream Binder Service Bus, spring.cloud.stream.servicebus.default.producer.<property>=<value>biçiminde tüm kanallar için değerlerin ayarlanmasını destekler.
spring-cloud-azure-stream-binder-servicebus üretici tarafından yapılandırılabilir özellikler:
| Mülk | Tür | Temerrüt | Açıklama |
|---|---|---|---|
| spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | Boolean | yanlış | Üretici eşitlemesi için switch bayrağı. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | uzun | 10.000 | Üreticinin gönderilmesi için zaman aşımı değeri. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | sıfır | Bağlama üreticisi için gerekli olan üreticinin Service Bus varlık türü. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabayt | Uzun | 1024 | Kuyruk/konu başlığı için ayrılan belleğin boyutu olan megabayt cinsinden en büyük kuyruk/konu boyutu. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | Süre | P10675199DT2H48M5.4775807S. (10675199 gün, 2 saat, 48 dakika, 5 saniye ve 477 milisaniye) | İletinin Service Bus'a gönderilmesinden başlayarak, iletinin süresinin dolma süresi. |
Önemli
Bağlama üreticisi kullanılırken, spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type özelliğinin yapılandırılması gerekir.
Gelişmiş üretici yapılandırması
Yukarıdaki bağlantı ve ortak Azure SDK istemcisi yapılandırması, her bağlayıcı üreticisi için özelleştirmeyi destekler ve bu özelleştirmeyi spring.cloud.stream.servicebus.bindings.<binding-name>.producer.ön ekiyle yapılandırabilirsiniz.
Temel kullanım
Service Bus'tan/Service Bus'a ileti gönderme ve alma
Yapılandırma seçeneklerini kimlik bilgileriyle doldurun.
Bağlantı dizesi olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus TopicNot
Microsoft, kullanılabilir en güvenli kimlik doğrulama akışının kullanılmasını önerir. Veritabanları, önbellekler, mesajlaşma veya yapay zeka hizmetleri gibi bu yordamda açıklanan kimlik doğrulama akışı, uygulamaya çok yüksek düzeyde güven gerektirir ve diğer akışlarda mevcut olmayan riskler taşır. Bu akışı yalnızca parolasız veya anahtarsız bağlantılar için yönetilen kimlikler gibi daha güvenli seçenekler uygun olmadığında kullanın. Yerel makine işlemleri için parolasız veya anahtarsız bağlantılar için kullanıcı kimliklerini tercih edin.
Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Not
tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.
Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Sağlayıcıyı ve tüketiciyi tanımlayın.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Bölüm anahtarı desteği
Bağlayıcı, ileti üst bilgisinde bölüm anahtarı ve oturum kimliği ayarlamaya izin vererek service bus bölümleme
Spring Cloud Stream, spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressionbir bölüm anahtarı SpEL ifade özelliği sağlar. Örneğin, bu özelliği "'partitionKey-' + headers[<message-header-key>]" olarak ayarlayın ve message-header-key adlı bir üst bilgi ekleyin. Spring Cloud Stream, bölüm anahtarı atamak için ifadeyi değerlendirirken bu üst bilgi için değerini kullanır. Aşağıdaki kod örnek bir üretici sağlar:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Oturum desteği
Bağlayıcı, Service Bus
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Not
Service Bus bölümlemegöre, oturum kimliği bölüm anahtarından daha yüksek önceliğe sahiptir. Bu nedenle hem ServiceBusMessageHeaders#SESSION_ID hem de ServiceBusMessageHeaders#PARTITION_KEY üst bilgileri ayarlandığında, bölüm anahtarının değerinin üzerine yazmak için oturum kimliğinin değeri sonunda kullanılır.
Hata iletilerini işleme
- Spring Cloud Azure 6.x/7.x
-
Spring Cloud Azure 5.x
-
Spring Cloud Azure 4.x
Giden bağlama hata iletilerini işleme
Varsayılan olarak Spring Integration,
errorChanneladlı bir genel hata kanalı oluşturur. Giden bağlama hata iletisini işlemek için aşağıdaki ileti uç noktasını yapılandırın.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }Gelen bağlama hata iletilerini işleme
Spring Cloud Stream Service Bus Bağlayıcısı, gelen ileti bağlamalarının hatalarını işlemek için iki çözümü destekler: bağlayıcı hata işleyicisi ve işleyicileri.
Bağlayıcı hata işleyicisi:
Varsayılan bağlayıcı hata işleyicisi gelen bağlamayı işler.
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejectedetkinleştirildiğinde başarısız iletileri teslim edilemeyen ileti kuyruğuna göndermek için bu işleyiciyi kullanırsınız. Aksi takdirde, başarısız iletiler terk edilir. Bağlayıcı hata işleyicisi, sağlanan diğer hata işleyicileriyle birbirini dışlar.Hata işleyicisi:
Spring Cloud Stream,
Consumerörnekleri kabul eden birErrorMessageekleyerek özel hata işleyicisi sağlamanız için bir mekanizma sunar. Daha fazla bilgi için Spring Cloud Stream belgelerinde Hata İletilerini İşlemebakın. Bağlama varsayılan hata işleyicisi
Tüm gelen bağlama hata iletilerini kullanmak için tek bir
Consumerçekirdeği yapılandırın. Aşağıdaki varsayılan işlev her gelen bağlama hata kanalına abone olur:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }ayrıca
spring.cloud.stream.default.error-handler-definitionözelliğini işlev adına ayarlamanız gerekir.Bağlamaya özgü hata işleyicisi
Belirli gelen bağlama hata iletilerini kullanmak için bir
Consumerçekirdeği yapılandırın. Aşağıdaki işlev, bağlama varsayılan hata işleyicisinden daha yüksek önceliğe sahip belirli bir gelen bağlama hata kanalına abone olur.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }ayrıca
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definitionözelliğini işlev adına ayarlamanız gerekir.
Service Bus ileti üst bilgileri
Desteklenen temel ileti üst bilgileri için Spring Integration
Not
Bölüm anahtarı ayarlanırken, ileti üst bilgisinin önceliği Spring Cloud Stream özelliğinden daha yüksektir. Bu nedenle spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression yalnızca ServiceBusMessageHeaders#SESSION_ID ve ServiceBusMessageHeaders#PARTITION_KEY üst bilgilerinin hiçbiri yapılandırılmadığında geçerlilik kazanır.
Birden çok bağlayıcı desteği
Birden çok Service Bus ad alanına bağlantı, birden çok bağlayıcı kullanılarak da desteklenir. Bu örnek, örnek olarak bağlantı dizesini alır. Hizmet sorumlularının ve yönetilen kimliklerin kimlik bilgileri de desteklenir. Kullanıcılar her bağlayıcının ortam ayarlarında ilgili özellikleri ayarlayabilir.
ServiceBus'ın birden çok bağlayıcısını kullanmak için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000Not
Önceki uygulama dosyası, tüm bağlamalara uygulama için tek bir varsayılan poller yapılandırmayı gösterir. Poller'ı belirli bir bağlama için yapılandırmak istiyorsanız,
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000gibi bir yapılandırma kullanabilirsiniz.Not
Microsoft, kullanılabilir en güvenli kimlik doğrulama akışının kullanılmasını önerir. Veritabanları, önbellekler, mesajlaşma veya yapay zeka hizmetleri gibi bu yordamda açıklanan kimlik doğrulama akışı, uygulamaya çok yüksek düzeyde güven gerektirir ve diğer akışlarda mevcut olmayan riskler taşır. Bu akışı yalnızca parolasız veya anahtarsız bağlantılar için yönetilen kimlikler gibi daha güvenli seçenekler uygun olmadığında kullanın. Yerel makine işlemleri için parolasız veya anahtarsız bağlantılar için kullanıcı kimliklerini tercih edin.
iki tedarikçi ve iki tüketici tanımlamamız gerekiyor
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Kaynak sağlama
Service Bus bağlayıcısı kuyruk, konu ve aboneliğin sağlanmasını destekler; kullanıcılar sağlamayı etkinleştirmek için aşağıdaki özellikleri kullanabilir.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Not
tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.
Service Bus istemci özelliklerini özelleştirme
Geliştiriciler Service Bus İstemcisi özelliklerini özelleştirmek için AzureServiceClientBuilderCustomizer kullanabilir. Aşağıdaki örnek, sessionIdleTimeoutiçindeki ServiceBusClientBuilder özelliğini özelleştirmektedir:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Örnekleri
Daha fazla bilgi için GitHub'daki azure-spring-boot-samples