Jegyzet
Az oldalhoz való hozzáférés engedélyezést igényel. Próbálhatod be jelentkezni vagy könyvtárat váltani.
Az oldalhoz való hozzáférés engedélyezést igényel. Megpróbálhatod a könyvtár váltását.
A Spring Cloud Stream egy keretrendszer, amely nagy mértékben skálázható, eseményvezérelt mikroszolgáltatásokat hoz létre, amelyek megosztott üzenetkezelési rendszerekkel vannak összekapcsolva.
A keretrendszer egy rugalmas programozási modellt biztosít, amely már meglévő és jól ismert Spring-kifejezésekre és ajánlott eljárásokra épül. Ezek az ajánlott eljárások közé tartozik az állandó pub/al szemantikák, a fogyasztói csoportok és az állapotalapú partíciók támogatása.
A binder jelenlegi implementációi a következők:
-
spring-cloud-azure-stream-binder-eventhubs– további információ: Spring Cloud Stream Binder for Azure Event Hubs -
spring-cloud-azure-stream-binder-servicebus– további információ: Spring Cloud Stream Binder for Azure Service Bus
Spring Cloud Stream Binder az Azure Event Hubshoz
Főbb fogalmak
Az Azure Event Hubshoz készült Spring Cloud Stream Binder biztosítja a Spring Cloud Stream-keretrendszer kötési implementációját. Ez az implementáció a Spring Integration Event Hubs-csatornaadaptereket használja az alapításakor. A tervezés szempontjából az Event Hubs hasonló a Kafkához. Az Event Hubs a Kafka API-n keresztül is elérhető. Ha a projekt szorosan függ a Kafka API-tól, kipróbálhatja Event Hubot a Kafka API-minta
Fogyasztói csoport
Az Event Hubs hasonló támogatást nyújt a fogyasztói csoportoknak, mint az Apache Kafka, de kismértékben eltérő logikával. Bár a Kafka az összes véglegesített eltolást a közvetítőben tárolja, a manuálisan feldolgozott Event Hubs-üzenetek eltolásait kell tárolnia. Az Event Hubs SDK az ilyen eltolások Azure Storage-ban való tárolására szolgál.
Particionálás támogatása
Az Event Hubs a Kafkához hasonló fizikai partíciót biztosít. A Kafka által a felhasználók és a partíciók közötti automatikus újraegyensúlyozástól eltérően az Event Hubs egyfajta megelőző módot biztosít. A tárfiók bérletként működik annak meghatározásához, hogy melyik felhasználó melyik partíció tulajdonosa. Amikor egy új felhasználó elindul, megpróbál ellopni néhány partíciót a legnagyobb terhelésű felhasználóktól a számítási feladatok egyensúlyának elérése érdekében.
A terheléselosztási stratégia megadásához meg kell adni a spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* tulajdonságait. További információ: Fogyasztói tulajdonságok szakasz.
Batch fogyasztói támogatás
A Spring Cloud Azure Stream Event Hubs binder támogatja Spring Cloud Stream Batch Fogyasztói funkció.
A kötegfelhasználói mód használatához állítsa a spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode tulajdonságot trueértékre. Ha engedélyezve van, a rendszer a kötegelt események listájának hasznos adatait tartalmazó üzenetet fogad, és átadja a Consumer függvénynek. A rendszer az egyes üzenetfejléceket is listává alakítja, amelyek tartalma az egyes eseményekhez tartozó fejlécérték. A partícióazonosító, az ellenőrzőpont és az utolsó lekérdezett tulajdonságok közös fejlécei egyetlen értékként jelennek meg, mivel az események teljes kötegének ugyanaz az értéke. További információkért tekintse meg Event Hubs-üzenetfejléceketSpring Cloud Azure-támogatásának Spring Integrationcímű szakaszát.
Jegyzet
Az ellenőrzőpont fejléce csak akkor létezik, ha az MANUAL ellenőrzőpont mód van használatban.
A kötegfelhasználók ellenőrzőpontozása két módot támogat: BATCH és MANUAL.
BATCH mód egy automatikus ellenőrzőpontozási mód, a kötőanyag fogadása után az események teljes kötegének közös ellenőrzéséhez.
MANUAL mód az események felhasználók általi ellenőrzésére. Használat esetén a Checkpointer az üzenetfejlécbe kerül, és a felhasználók ellenőrzőpontozásra használhatják.
A köteg méretét a max-sizeelőtaggal rendelkező max-wait-time és spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch. tulajdonságok beállításával adhatja meg. A max-size tulajdonság szükséges, és a max-wait-time tulajdonság nem kötelező. További információ: Fogyasztói tulajdonságok szakasz.
Függőség beállítása
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Másik lehetőségként használhatja a Spring Cloud Azure Stream Event Hubs Startert is, ahogyan az a Maven esetében az alábbi példában is látható:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Konfiguráció
A kötőanyag a konfigurációs beállítások alábbi három részét biztosítja:
Kapcsolatkonfiguráció tulajdonságai
Ez a szakasz az Azure Event Hubshoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.
Jegyzet
Ha biztonsági tagot használ egy Azure-erőforrás eléréséhez a Microsoft Entra-azonosítóval történő hitelesítéshez és engedélyezéshez, tekintse meg Hozzáférés engedélyezése a Microsoft Entra-azonosítóval annak biztosításához, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.
A spring-cloud-azure-stream-binder-eventhubs kapcsolatkonfigurálható tulajdonságai:
| Ingatlan | Típus | Leírás |
|---|---|---|
| spring.cloud.azure.eventhubs.enabled | Logikai | Az Azure Event Hubs engedélyezése. |
| spring.cloud.azure.eventhubs.connection-string | Húr | Event Hubs-névtér kapcsolati sztringértéke. |
| spring.cloud.azure.eventhubs.namespace | Húr | Event Hubs-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie |
| spring.cloud.azure.eventhubs.tartománynév | Húr | Egy Azure Event Hubs-névtérérték tartományneve. |
| spring.cloud.azure.eventhubs.custom-endpoint-address | Húr | Egyéni végpont címe. |
Borravaló
Az Azure Service SDK gyakori konfigurációs beállításai a Spring Cloud Azure Stream Event Hubs iratgyűjtőhöz is konfigurálhatók. A támogatott konfigurációs beállítások Spring Cloud Azure-konfigurációsjelennek meg, és konfigurálhatók az egyesített spring.cloud.azure. előtaggal vagy a spring.cloud.azure.eventhubs.előtagjával.
A kötőanyag alapértelmezés szerint támogatja Spring Could Azure Resource Manager. A kapcsolati sztring Data kapcsolódó szerepkörökkel nem rendelkező biztonsági tagokkal való lekéréséről a Spring Could Azure Resource ManagerAlapszintű használati című szakaszában olvashat.
Ellenőrzőpont konfigurációs tulajdonságai
Ez a szakasz a Storage Blobs szolgáltatás konfigurációs beállításait tartalmazza, amely a partíció tulajdonjogának és ellenőrzőpont-adatainak megőrzésére szolgál.
Jegyzet
A 4.0.0-s verziótól kezdve, ha a spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tulajdonság nincs manuálisan engedélyezve, a rendszer nem hoz létre automatikusan egy Storage-tárolót spring.cloud.stream.bindings.binding-name.destination.
A spring-cloud-azure-stream-binder-eventhubs konfigurálható tulajdonságainak ellenőrzése:
| Ingatlan | Típus | Leírás |
|---|---|---|
| spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | logikai | Engedélyezi-e a tárolók létrehozását, ha nem létezik. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Húr | A tárfiók neve. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Húr | Tárfiók hozzáférési kulcsa. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Húr | Tároló neve. |
Borravaló
Az Azure Service SDK gyakori konfigurációs beállításai a Storage Blob CheckPoint Store-hoz is konfigurálhatók. A támogatott konfigurációs beállítások Spring Cloud Azure-konfigurációsjelennek meg, és konfigurálhatók az egyesített spring.cloud.azure. előtaggal vagy a spring.cloud.azure.eventhubs.processor.checkpoint-storeelőtagjával.
Az Azure Event Hubs-kötés konfigurációs tulajdonságai
A következő lehetőségek négy részre vannak osztva: Fogyasztói tulajdonságok, Speciális fogyasztói konfigurációk, Gyártói tulajdonságok és Speciális gyártói konfigurációk.
Fogyasztói tulajdonságok
Ezek a tulajdonságok EventHubsConsumerPropertieskeresztül érhetők el.
Jegyzet
Az ismétlődés elkerülése érdekében a 4.17.0-s és 5.11.0-s verzió óta a Spring Cloud Azure Stream Binder Event Hubs támogatja az összes csatorna értékének beállítását spring.cloud.stream.eventhubs.default.consumer.<property>=<value>formátumban.
A spring-cloud-azure-stream-binder-eventhubs fogyasztói konfigurálható tulajdonságai:
| Ingatlan | Típus | Leírás |
|---|---|---|
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | Ellenőrzőpont mód | Ellenőrzőpont mód, amelyet akkor használnak, amikor a fogyasztó dönti el, hogyan kell ellenőrizni az ellenőrzőpont-üzenetet |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Egész szám | Az egyes partíciók üzenetmennyiségét határozza meg egy ellenőrzőponthoz. Csak akkor lép érvénybe, ha PARTITION_COUNT ellenőrzőpont mód van használatban. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Időtartam | Egy ellenőrzőponthoz megadott időintervallumot határozza meg. Csak akkor lép érvénybe, ha TIME ellenőrzőpont mód van használatban. |
| spring.cloud.stream.eventhubs.bindings.<kötés-név.fogyasztó.batch.max-méret | Egész szám | A kötegben lévő események maximális száma. A kötegfelhasználói módhoz szükséges. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Időtartam | A köteghasználat maximális időtartama. Csak akkor lép érvénybe, ha a batch-consumer mód engedélyezve van, és nem kötelező. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Időtartam | A frissítés időközi időtartama. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | Terheléselosztási stratégia | A terheléselosztási stratégia. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Időtartam | Az az időtartam, amely után a partíció tulajdonjoga lejár. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | logikai | Azt jelzi, hogy az eseményfeldolgozónak adatokat kell-e kérnie a társított partíció utolsó lekéréses eseményéről, és nyomon kell-e követnie ezeket az információkat az események fogadása során. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Egész szám | A fogyasztó által azon események számának szabályozására használt szám, amelyeket az Event Hub-fogyasztó helyileg aktívan fogad és vár. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Leképezés a kulcssal partícióazonosítóként és StartPositionProperties értékeivel |
Az egyes partíciókhoz használandó eseményhelyzetet tartalmazó térkép, ha a partíció ellenőrzőpontja nem létezik az Ellenőrzőpont-tárolóban. Ez a térkép a partícióazonosítóból van kulcsra kapcsolva. |
Jegyzet
A initial-partition-event-position konfiguráció egy map fogad el az egyes eseményközpontok kezdeti helyének megadásához. Így a kulcs a partícióazonosító, és az érték StartPositionProperties, amely magában foglalja az eltolás tulajdonságait, a sorszámot, a lekérdezett dátumidőt és a befogadót. Beállíthatja például úgy, hogy
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Speciális fogyasztói konfiguráció
A fenti kapcsolati, ellenőrzőpontés gyakori Azure SDK-ügyfél konfiguráció támogatja az egyes iratgyűjtő-felhasználók testreszabását, amelyeket a spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.előtaggal konfigurálhat.
Gyártói tulajdonságok
Ezek a tulajdonságok EventHubsProducerPropertieskeresztül érhetők el.
Jegyzet
Az ismétlődés elkerülése érdekében a 4.17.0-s és 5.11.0-s verzió óta a Spring Cloud Azure Stream Binder Event Hubs támogatja az összes csatorna értékének beállítását spring.cloud.stream.eventhubs.default.producer.<property>=<value>formátumban.
A spring-cloud-azure-stream-binder-eventhubs gyártó által konfigurálható tulajdonságai:
| Ingatlan | Típus | Leírás |
|---|---|---|
| spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | Logikai | A gyártó szinkronizálásának kapcsolójelzője. Ha igaz, a gyártó a küldési művelet után várja meg a választ. |
| spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | hosszú | A küldési művelet után a válaszra váró idő. Csak akkor lép érvénybe, ha egy szinkronizálási gyártó engedélyezve van. |
Speciális gyártókonfiguráció
A fenti kapcsolati és gyakori Azure SDK-ügyfél konfigurációja támogatja az egyes iratgyűjtő-előállítók testreszabását, amelyeket a spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.előtaggal konfigurálhat.
Alapszintű használat
Üzenetek küldése és fogadása az Event Hubsból vagy az eseményközpontba
Adja meg a konfigurációs beállításokat hitelesítő adatokkal.
A hitelesítő adatok kapcsolati sztringként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUALJegyzet
A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.
A szolgáltatásnévként megadott hitelesítő adatokhoz konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Jegyzet
A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.
A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Szállító és fogyasztó meghatározása.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Particionálás támogatása
Létrejön egy PartitionSupplier a felhasználó által megadott partícióadatokkal az elküldendő üzenet partícióadatainak konfigurálásához. Az alábbi folyamatábra a partícióazonosító és a kulcs különböző prioritásainak beszerzését mutatja be:
Batch fogyasztói támogatás
Adja meg a kötegkonfigurációs beállításokat az alábbi példában látható módon:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as neededSzállító és fogyasztó meghatározása.
A
BATCHellenőrzőpont-módban az alábbi kód használatával küldhet üzeneteket, és kötegekben használhatja azokat.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }Ha
MANUALellenőrzőpont-módot szeretne használni, az alábbi kóddal küldhet üzeneteket, és kötegekben használhatja a felhasználást/ellenőrzőpontot.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Jegyzet
Kötegfelhasználó módban a Spring Cloud Stream-iratgyűjtő alapértelmezett tartalomtípusa application/json, ezért győződjön meg arról, hogy az üzenet hasznos adatai igazodnak a tartalomtípushoz. Ha például a application/json alapértelmezett tartalomtípusát használja String hasznos adattal rendelkező üzenetek fogadásához, a hasznos adatokat JSON Stringkell használni, és az eredeti String szöveghez dupla idézőjelekkel kell körülvenni. Míg text/plain tartalomtípus esetében ez közvetlenül String objektum lehet. További információ: Spring Cloud Stream tartalomtípus egyeztetési.
Hibaüzenetek kezelése
Kimenő kötési hibaüzenetek kezelése
Alapértelmezés szerint a Spring Integration létrehoz egy
errorChannelnevű globális hibacsatornát. Konfigurálja a következő üzenetvégpontot a kimenő kötési hibaüzenetek kezeléséhez.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }Bejövő kötési hibaüzenetek kezelése
A Spring Cloud Stream Event Hubs Binder egyetlen megoldást támogat a bejövő üzenetkötések hibáinak kezelésére: hibakezelők.
Hibakezelő:
A Spring Cloud Stream egy olyan mechanizmust tesz elérhetővé, amellyel egyéni hibakezelőt biztosíthat egy
Consumerpéldányokat elfogadóErrorMessagehozzáadásával. További információ: Hibaüzenetek kezelése a Spring Cloud Stream dokumentációjában.Kötés alapértelmezett hibakezelője
Konfiguráljon egyetlen
Consumerbabot az összes bejövő kötési hibaüzenet felhasználásához. Az alábbi alapértelmezett függvény feliratkozik az egyes bejövő kötési hibacsatornákra:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }A
spring.cloud.stream.default.error-handler-definitiontulajdonságot a függvény nevére is be kell állítania.Kötésspecifikus hibakezelő
Konfiguráljon egy
Consumerbabot az adott bejövő kötési hibaüzenetek felhasználásához. Az alábbi függvény feliratkozik az adott bejövő kötési hibacsatornára, és magasabb prioritással rendelkezik, mint a kötés alapértelmezett hibakezelője:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }A
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definitiontulajdonságot a függvény nevére is be kell állítania.
Event Hubs-üzenetfejlécek
A támogatott alapvető üzenetfejléceket a Spring Integration
Több kötőanyag támogatása
A több Event Hubs-névtérhez való csatlakozást több kötés is támogatja. Ez a minta példaként egy kapcsolati sztringet vesz fel. A szolgáltatásnevek és a felügyelt identitások hitelesítő adatai is támogatottak. A kapcsolódó tulajdonságokat az egyes iratgyűjtők környezeti beállításaiban állíthatja be.
Ha több kötést szeretne használni az Event Hubsban, konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000Jegyzet
Az előző alkalmazásfájl bemutatja, hogyan konfigurálhat egyetlen alapértelmezett lekérdezést az alkalmazáshoz az összes kötésre. Ha egy adott kötéshez szeretné konfigurálni a lekérdezést, használhat olyan konfigurációt, mint a
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.Jegyzet
A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.
Két szállítót és két fogyasztót kell definiálnunk:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Erőforrás-kiépítés
Az Event Hubs-iratgyűjtő támogatja az eseményközpont és a fogyasztói csoport kiépítését, a felhasználók az alábbi tulajdonságok használatával engedélyezhetik a kiépítést.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Jegyzet
A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.
Minták
További információ: azure-spring-boot-samples adattár a GitHubon.
Spring Cloud Stream Binder az Azure Service Bushoz
Főbb fogalmak
Az Azure Service Bushoz készült Spring Cloud Stream Binder biztosítja a Spring Cloud Stream-keretrendszer kötési implementációt. Ez az implementáció a Spring Integration Service Bus-csatornaadaptereket használja az alapításakor.
Ütemezett üzenet
Ez a kötés támogatja az üzenetek küldését egy témakörbe késleltetett feldolgozás céljából. A felhasználók olyan ütemezett üzeneteket küldhetnek, x-delay ezredmásodpercben kifejezve késleltetik az üzenetet. Az üzenet x-delay ezredmásodperc után jelenik meg a megfelelő témakörökhöz.
Fogyasztói csoport
A Service Bus-témakör hasonló támogatást nyújt a fogyasztói csoportoknak, mint az Apache Kafka, de kismértékben eltérő logikával.
Ez a kötőanyag egy témakör Subscription támaszkodik a fogyasztói csoportként való működéshez.
Függőség beállítása
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Másik lehetőségként használhatja a Spring Cloud Azure Stream Service Bus Startert is, ahogyan az a Maven esetében az alábbi példában is látható:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Konfiguráció
Az iratgyűjtő a konfigurációs beállítások alábbi két részét biztosítja:
Kapcsolatkonfiguráció tulajdonságai
Ez a szakasz az Azure Service Bushoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.
Jegyzet
Ha biztonsági tagot használ egy Azure-erőforrás eléréséhez a Microsoft Entra-azonosítóval történő hitelesítéshez és engedélyezéshez, tekintse meg Hozzáférés engedélyezése a Microsoft Entra-azonosítóval annak biztosításához, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.
A spring-cloud-azure-stream-binder-servicebus kapcsolatkonfigurálható tulajdonságai:
| Ingatlan | Típus | Leírás |
|---|---|---|
| spring.cloud.azure.servicebus.enabled | Logikai | Az Azure Service Bus engedélyezése. |
| spring.cloud.azure.servicebus.connection-string | Húr | Service Bus-névtér kapcsolati sztringértéke. |
| spring.cloud.azure.servicebus.custom-endpoint-address | Húr | A Service Bushoz való csatlakozáskor használandó egyéni végpontcím. |
| spring.cloud.azure.servicebus.namespace | Húr | Service Bus-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie |
| spring.cloud.azure.servicebus.domain-name | Húr | Egy Azure Service Bus-névtérérték tartományneve. |
Jegyzet
Az Azure Service SDK gyakori konfigurációs beállításai a Spring Cloud Azure Stream Service Bus-iratgyűjtőhöz is konfigurálhatók. A támogatott konfigurációs beállítások Spring Cloud Azure-konfigurációsjelennek meg, és konfigurálhatók az egyesített spring.cloud.azure. előtaggal vagy a spring.cloud.azure.servicebus.előtagjával.
A kötőanyag alapértelmezés szerint támogatja Spring Could Azure Resource Manager. A kapcsolati sztring Data kapcsolódó szerepkörökkel nem rendelkező biztonsági tagokkal való lekéréséről a Spring Could Azure Resource ManagerAlapszintű használati című szakaszában olvashat.
Az Azure Service Bus kötéskonfigurációs tulajdonságai
A következő lehetőségek négy részre vannak osztva: Fogyasztói tulajdonságok, Speciális fogyasztói konfigurációk, Gyártói tulajdonságok és Speciális gyártói konfigurációk.
Fogyasztói tulajdonságok
Ezek a tulajdonságok ServiceBusConsumerPropertieskeresztül érhetők el.
Jegyzet
Az ismétlődés elkerülése érdekében a 4.17.0-s és 5.11.0-s verzió óta a Spring Cloud Azure Stream Binder Service Bus támogatja az összes csatorna értékeinek beállítását spring.cloud.stream.servicebus.default.consumer.<property>=<value>formátumban.
A spring-cloud-azure-stream-binder-servicebus fogyasztói konfigurálható tulajdonságai:
| Ingatlan | Típus | Alapértelmezett | Leírás |
|---|---|---|---|
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | Logikai | téves | Ha a sikertelen üzeneteket a rendszer a DLQ-hoz irányítja. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Egész szám | 1 | A Service Bus processzorügyfél által feldolgozandó egyidejű üzenetek maximális beállítása. Ha a munkamenet engedélyezve van, az minden munkamenetre érvényes. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-egyidejű munkamenetek | Egész szám | null | Egyszerre feldolgozandó munkamenetek maximális száma. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | logikai | null | Engedélyezve van-e a munkamenet. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-idle-timeout | Időtartam | null | Beállítja, hogy mennyi idő (időtartam) várjon egy üzenet fogadására az aktuális aktív munkamenetben. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Egész szám | 0 | A Service Bus processzorügyfél előzetes száma. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | Alvárólista | egyik sem | Az alsor típusa, amelyhez csatlakozni szeretne. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Időtartam | 5 m | A zárolás automatikus megújításának folytatásához szükséges idő. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | A Service Bus processzorügyfél fogadási módja. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | logikai | igaz | Az üzenetek automatikus rendezése. Ha hamisként van beállítva, a rendszer hozzáadja a Checkpointer üzenetfejlécét, hogy a fejlesztők manuálisan rendezhessék az üzeneteket. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabájt | Hosszú | 1024 | Az üzenetsor/témakör maximális mérete megabájtban, amely az üzenetsorhoz/témakörhöz lefoglalt memória mérete. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | Időtartam | P10675199DT2H48M5.4775807S. (10675199 nap, 2 óra, 48 perc, 5 másodperc és 477 ezredmásodperc) | Az az időtartam, amely után az üzenet lejár, attól kezdve, hogy az üzenet el lesz küldve a Service Busnak. |
Fontos
Az Azure Resource Manager (ARM) használatakor konfigurálnia kell a spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type tulajdonságot. További információ: servicebus-queue-binder-arm minta a GitHubon.
Speciális fogyasztói konfiguráció
A fenti kapcsolati és gyakori Azure SDK-ügyfél konfigurációja támogatja az egyes iratgyűjtő-felhasználók testreszabását, amelyeket a spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.előtaggal konfigurálhat.
Gyártói tulajdonságok
Ezek a tulajdonságok ServiceBusProducerPropertieskeresztül érhetők el.
Jegyzet
Az ismétlődés elkerülése érdekében a 4.17.0-s és 5.11.0-s verzió óta a Spring Cloud Azure Stream Binder Service Bus támogatja az összes csatorna értékeinek beállítását spring.cloud.stream.servicebus.default.producer.<property>=<value>formátumban.
A spring-cloud-azure-stream-binder-servicebus gyártó által konfigurálható tulajdonságai:
| Ingatlan | Típus | Alapértelmezett | Leírás |
|---|---|---|---|
| spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | Logikai | téves | Kapcsolójelző a gyártó szinkronizálásához. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | hosszú | 10 000 | Időtúllépési érték a gyártó elküldéséhez. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | null | A gyártó Service Bus-entitástípusa, amely a kötelező gyártóhoz szükséges. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabájt | Hosszú | 1024 | Az üzenetsor/témakör maximális mérete megabájtban, amely az üzenetsorhoz/témakörhöz lefoglalt memória mérete. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | Időtartam | P10675199DT2H48M5.4775807S. (10675199 nap, 2 óra, 48 perc, 5 másodperc és 477 ezredmásodperc) | Az az időtartam, amely után az üzenet lejár, attól kezdve, hogy az üzenet el lesz küldve a Service Busnak. |
Fontos
A kötésgyártó használatakor konfigurálni kell a spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type tulajdonságát.
Speciális gyártókonfiguráció
A fenti kapcsolati és gyakori Azure SDK-ügyfél konfigurációja támogatja az egyes iratgyűjtő-előállítók testreszabását, amelyeket a spring.cloud.stream.servicebus.bindings.<binding-name>.producer.előtaggal konfigurálhat.
Alapszintű használat
Üzenetek küldése és fogadása a Service Busból vagy a Service Busba
Adja meg a konfigurációs beállításokat hitelesítő adatokkal.
A hitelesítő adatok kapcsolati sztringként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus TopicJegyzet
A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.
A szolgáltatásnévként megadott hitelesítő adatokhoz konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Jegyzet
A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.
A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Szállító és fogyasztó meghatározása.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Partíciókulcs támogatása
A kötőanyag támogatja Service Bus particionálási azáltal, hogy engedélyezi a partíciókulcs és a munkamenet-azonosító beállítását az üzenetfejlécben. Ez a szakasz bemutatja, hogyan állíthat be partíciókulcsot az üzenetekhez.
A Spring Cloud Stream egy partíciókulcs SpEL-kifejezéstulajdonságot biztosít spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Például állítsa be ezt a tulajdonságot "'partitionKey-' + headers[<message-header-key>]", és adjon hozzá egy message-header-key nevű fejlécet. A Spring Cloud Stream a fejléc értékét használja a kifejezés kiértékelésekor egy partíciókulcs hozzárendeléséhez. Az alábbi kód egy példakészítőt tartalmaz:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Munkamenet-támogatás
A kötőanyag támogatja a Service Bus
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Jegyzet
Service Bus particionálásiszerint a munkamenet-azonosító prioritása magasabb, mint a partíciókulcs. Így ha mind a ServiceBusMessageHeaders#SESSION_ID, mind a ServiceBusMessageHeaders#PARTITION_KEY fejléc be van állítva, a munkamenet-azonosító értéke végül a partíciókulcs értékének felülírására szolgál.
Hibaüzenetek kezelése
Kimenő kötési hibaüzenetek kezelése
Alapértelmezés szerint a Spring Integration létrehoz egy
errorChannelnevű globális hibacsatornát. Konfigurálja a következő üzenetvégpontot a kimenő kötési hibaüzenet kezeléséhez.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }Bejövő kötési hibaüzenetek kezelése
A Spring Cloud Stream Service Bus Binder két megoldást támogat a bejövő üzenetkötések hibáinak kezelésére: a binder hibakezelője és a kezelők.
Binder hibakezelő:
Az alapértelmezett kötéskezelő hibakezelő kezeli a bejövő kötést. Ezzel a kezelő használatával sikertelen üzeneteket küldhet a kézbesítetlen levelek üzenetsorába, ha
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejectedengedélyezve van. Ellenkező esetben a sikertelen üzenetek elhagyva lesznek. A kötőanyag hibakezelője kölcsönösen kizárja a többi megadott hibakezelőt.Hibakezelő:
A Spring Cloud Stream egy olyan mechanizmust tesz elérhetővé, amellyel egyéni hibakezelőt biztosíthat egy
Consumerpéldányokat elfogadóErrorMessagehozzáadásával. További információ: Hibaüzenetek kezelése a Spring Cloud Stream dokumentációjában.Kötés alapértelmezett hibakezelője
Konfiguráljon egyetlen
Consumerbabot az összes bejövő kötési hibaüzenet felhasználásához. Az alábbi alapértelmezett függvény feliratkozik az egyes bejövő kötési hibacsatornákra:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }A
spring.cloud.stream.default.error-handler-definitiontulajdonságot a függvény nevére is be kell állítania.Kötésspecifikus hibakezelő
Konfiguráljon egy
Consumerbabot az adott bejövő kötési hibaüzenetek felhasználásához. Az alábbi függvény előfizet az adott bejövő kötési hibacsatornára, amelynek prioritása magasabb, mint a kötés alapértelmezett hibakezelője.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }A
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definitiontulajdonságot a függvény nevére is be kell állítania.
Service Bus-üzenetfejlécek
A támogatott alapvető üzenetfejlécekért tekintse meg Service Bus-üzenetfejléceketSpring Cloud Azure-támogatás spring integrationcímű szakaszát.
Jegyzet
A partíciókulcs beállításakor az üzenetfejléc prioritása magasabb, mint a Spring Cloud Stream tulajdonság. Így spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression csak akkor lép érvénybe, ha egyik ServiceBusMessageHeaders#SESSION_ID és ServiceBusMessageHeaders#PARTITION_KEY fejléc sincs konfigurálva.
Több kötőanyag támogatása
A több Service Bus-névtérhez való csatlakozást több kötés is támogatja. Ez a minta példaként a kapcsolati sztringet veszi fel. A szolgáltatásnevek és a felügyelt identitások hitelesítő adatai is támogatottak, a felhasználók a kapcsolódó tulajdonságokat az egyes iratgyűjtők környezeti beállításaiban állíthatják be.
A ServiceBus több kötésének használatához konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000Jegyzet
Az előző alkalmazásfájl bemutatja, hogyan konfigurálhat egyetlen alapértelmezett lekérdezést az alkalmazáshoz az összes kötésre. Ha egy adott kötéshez szeretné konfigurálni a lekérdezést, használhat olyan konfigurációt, mint a
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.Jegyzet
A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.
két beszállítót és két fogyasztót kell meghatároznunk
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Erőforrás-kiépítés
A Service Bus binder támogatja az üzenetsor, a témakör és az előfizetés kiépítését, a felhasználók az alábbi tulajdonságok használatával engedélyezhetik a kiépítést.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Jegyzet
A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.
A Service Bus-ügyfél tulajdonságainak testreszabása
A fejlesztők a AzureServiceClientBuilderCustomizer segítségével testre szabhatják a Service Bus-ügyfél tulajdonságait. Az alábbi példa a sessionIdleTimeoutServiceBusClientBuilder tulajdonságát szabja testre:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Minták
További információ: azure-spring-boot-samples adattár a GitHubon.