Megosztás:


Spring Cloud Azure-támogatás a Spring Cloud Streamhez

A Spring Cloud Stream egy keretrendszer, amely nagy mértékben skálázható, eseményvezérelt mikroszolgáltatásokat hoz létre, amelyek megosztott üzenetkezelési rendszerekkel vannak összekapcsolva.

A keretrendszer egy rugalmas programozási modellt biztosít, amely már meglévő és jól ismert Spring-kifejezésekre és ajánlott eljárásokra épül. Ezek az ajánlott eljárások közé tartozik az állandó pub/al szemantikák, a fogyasztói csoportok és az állapotalapú partíciók támogatása.

A binder jelenlegi implementációi a következők:

Spring Cloud Stream Binder az Azure Event Hubshoz

Főbb fogalmak

Az Azure Event Hubshoz készült Spring Cloud Stream Binder biztosítja a Spring Cloud Stream-keretrendszer kötési implementációját. Ez az implementáció a Spring Integration Event Hubs-csatornaadaptereket használja az alapításakor. A tervezés szempontjából az Event Hubs hasonló a Kafkához. Az Event Hubs a Kafka API-n keresztül is elérhető. Ha a projekt szorosan függ a Kafka API-tól, kipróbálhatja Event Hubot a Kafka API-minta

Fogyasztói csoport

Az Event Hubs hasonló támogatást nyújt a fogyasztói csoportoknak, mint az Apache Kafka, de kismértékben eltérő logikával. Bár a Kafka az összes véglegesített eltolást a közvetítőben tárolja, a manuálisan feldolgozott Event Hubs-üzenetek eltolásait kell tárolnia. Az Event Hubs SDK az ilyen eltolások Azure Storage-ban való tárolására szolgál.

Particionálás támogatása

Az Event Hubs a Kafkához hasonló fizikai partíciót biztosít. A Kafka által a felhasználók és a partíciók közötti automatikus újraegyensúlyozástól eltérően az Event Hubs egyfajta megelőző módot biztosít. A tárfiók bérletként működik annak meghatározásához, hogy melyik felhasználó melyik partíció tulajdonosa. Amikor egy új felhasználó elindul, megpróbál ellopni néhány partíciót a legnagyobb terhelésű felhasználóktól a számítási feladatok egyensúlyának elérése érdekében.

A terheléselosztási stratégia megadásához meg kell adni a spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* tulajdonságait. További információ: Fogyasztói tulajdonságok szakasz.

Batch fogyasztói támogatás

A Spring Cloud Azure Stream Event Hubs binder támogatja Spring Cloud Stream Batch Fogyasztói funkció.

A kötegfelhasználói mód használatához állítsa a spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode tulajdonságot trueértékre. Ha engedélyezve van, a rendszer a kötegelt események listájának hasznos adatait tartalmazó üzenetet fogad, és átadja a Consumer függvénynek. A rendszer az egyes üzenetfejléceket is listává alakítja, amelyek tartalma az egyes eseményekhez tartozó fejlécérték. A partícióazonosító, az ellenőrzőpont és az utolsó lekérdezett tulajdonságok közös fejlécei egyetlen értékként jelennek meg, mivel az események teljes kötegének ugyanaz az értéke. További információkért tekintse meg Event Hubs-üzenetfejléceketSpring Cloud Azure-támogatásának Spring Integrationcímű szakaszát.

Jegyzet

Az ellenőrzőpont fejléce csak akkor létezik, ha az MANUAL ellenőrzőpont mód van használatban.

A kötegfelhasználók ellenőrzőpontozása két módot támogat: BATCH és MANUAL. BATCH mód egy automatikus ellenőrzőpontozási mód, a kötőanyag fogadása után az események teljes kötegének közös ellenőrzéséhez. MANUAL mód az események felhasználók általi ellenőrzésére. Használat esetén a Checkpointer az üzenetfejlécbe kerül, és a felhasználók ellenőrzőpontozásra használhatják.

A köteg méretét a max-sizeelőtaggal rendelkező max-wait-time és spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch. tulajdonságok beállításával adhatja meg. A max-size tulajdonság szükséges, és a max-wait-time tulajdonság nem kötelező. További információ: Fogyasztói tulajdonságok szakasz.

Függőség beállítása

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Másik lehetőségként használhatja a Spring Cloud Azure Stream Event Hubs Startert is, ahogyan az a Maven esetében az alábbi példában is látható:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Konfiguráció

A kötőanyag a konfigurációs beállítások alábbi három részét biztosítja:

Kapcsolatkonfiguráció tulajdonságai

Ez a szakasz az Azure Event Hubshoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.

Jegyzet

Ha biztonsági tagot használ egy Azure-erőforrás eléréséhez a Microsoft Entra-azonosítóval történő hitelesítéshez és engedélyezéshez, tekintse meg Hozzáférés engedélyezése a Microsoft Entra-azonosítóval annak biztosításához, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.

A spring-cloud-azure-stream-binder-eventhubs kapcsolatkonfigurálható tulajdonságai:

Ingatlan Típus Leírás
spring.cloud.azure.eventhubs.enabled Logikai Az Azure Event Hubs engedélyezése.
spring.cloud.azure.eventhubs.connection-string Húr Event Hubs-névtér kapcsolati sztringértéke.
spring.cloud.azure.eventhubs.namespace Húr Event Hubs-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie
spring.cloud.azure.eventhubs.tartománynév Húr Egy Azure Event Hubs-névtérérték tartományneve.
spring.cloud.azure.eventhubs.custom-endpoint-address Húr Egyéni végpont címe.

Borravaló

Az Azure Service SDK gyakori konfigurációs beállításai a Spring Cloud Azure Stream Event Hubs iratgyűjtőhöz is konfigurálhatók. A támogatott konfigurációs beállítások Spring Cloud Azure-konfigurációsjelennek meg, és konfigurálhatók az egyesített spring.cloud.azure. előtaggal vagy a spring.cloud.azure.eventhubs.előtagjával.

A kötőanyag alapértelmezés szerint támogatja Spring Could Azure Resource Manager. A kapcsolati sztring Data kapcsolódó szerepkörökkel nem rendelkező biztonsági tagokkal való lekéréséről a Spring Could Azure Resource ManagerAlapszintű használati című szakaszában olvashat.

Ellenőrzőpont konfigurációs tulajdonságai

Ez a szakasz a Storage Blobs szolgáltatás konfigurációs beállításait tartalmazza, amely a partíció tulajdonjogának és ellenőrzőpont-adatainak megőrzésére szolgál.

Jegyzet

A 4.0.0-s verziótól kezdve, ha a spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tulajdonság nincs manuálisan engedélyezve, a rendszer nem hoz létre automatikusan egy Storage-tárolót spring.cloud.stream.bindings.binding-name.destination.

A spring-cloud-azure-stream-binder-eventhubs konfigurálható tulajdonságainak ellenőrzése:

Ingatlan Típus Leírás
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists logikai Engedélyezi-e a tárolók létrehozását, ha nem létezik.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Húr A tárfiók neve.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Húr Tárfiók hozzáférési kulcsa.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Húr Tároló neve.

Borravaló

Az Azure Service SDK gyakori konfigurációs beállításai a Storage Blob CheckPoint Store-hoz is konfigurálhatók. A támogatott konfigurációs beállítások Spring Cloud Azure-konfigurációsjelennek meg, és konfigurálhatók az egyesített spring.cloud.azure. előtaggal vagy a spring.cloud.azure.eventhubs.processor.checkpoint-storeelőtagjával.

Az Azure Event Hubs-kötés konfigurációs tulajdonságai

A következő lehetőségek négy részre vannak osztva: Fogyasztói tulajdonságok, Speciális fogyasztói konfigurációk, Gyártói tulajdonságok és Speciális gyártói konfigurációk.

Fogyasztói tulajdonságok

Ezek a tulajdonságok EventHubsConsumerPropertieskeresztül érhetők el.

Jegyzet

Az ismétlődés elkerülése érdekében a 4.17.0-s és 5.11.0-s verzió óta a Spring Cloud Azure Stream Binder Event Hubs támogatja az összes csatorna értékének beállítását spring.cloud.stream.eventhubs.default.consumer.<property>=<value>formátumban.

A spring-cloud-azure-stream-binder-eventhubs fogyasztói konfigurálható tulajdonságai:

Ingatlan Típus Leírás
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode Ellenőrzőpont mód Ellenőrzőpont mód, amelyet akkor használnak, amikor a fogyasztó dönti el, hogyan kell ellenőrizni az ellenőrzőpont-üzenetet
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Egész szám Az egyes partíciók üzenetmennyiségét határozza meg egy ellenőrzőponthoz. Csak akkor lép érvénybe, ha PARTITION_COUNT ellenőrzőpont mód van használatban.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Időtartam Egy ellenőrzőponthoz megadott időintervallumot határozza meg. Csak akkor lép érvénybe, ha TIME ellenőrzőpont mód van használatban.
spring.cloud.stream.eventhubs.bindings.<kötés-név.fogyasztó.batch.max-méret Egész szám A kötegben lévő események maximális száma. A kötegfelhasználói módhoz szükséges.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Időtartam A köteghasználat maximális időtartama. Csak akkor lép érvénybe, ha a batch-consumer mód engedélyezve van, és nem kötelező.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Időtartam A frissítés időközi időtartama.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy Terheléselosztási stratégia A terheléselosztási stratégia.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Időtartam Az az időtartam, amely után a partíció tulajdonjoga lejár.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties logikai Azt jelzi, hogy az eseményfeldolgozónak adatokat kell-e kérnie a társított partíció utolsó lekéréses eseményéről, és nyomon kell-e követnie ezeket az információkat az események fogadása során.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Egész szám A fogyasztó által azon események számának szabályozására használt szám, amelyeket az Event Hub-fogyasztó helyileg aktívan fogad és vár.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Leképezés a kulcssal partícióazonosítóként és StartPositionProperties értékeivel Az egyes partíciókhoz használandó eseményhelyzetet tartalmazó térkép, ha a partíció ellenőrzőpontja nem létezik az Ellenőrzőpont-tárolóban. Ez a térkép a partícióazonosítóból van kulcsra kapcsolva.

Jegyzet

A initial-partition-event-position konfiguráció egy map fogad el az egyes eseményközpontok kezdeti helyének megadásához. Így a kulcs a partícióazonosító, és az érték StartPositionProperties, amely magában foglalja az eltolás tulajdonságait, a sorszámot, a lekérdezett dátumidőt és a befogadót. Beállíthatja például úgy, hogy

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Speciális fogyasztói konfiguráció

A fenti kapcsolati, ellenőrzőpontés gyakori Azure SDK-ügyfél konfiguráció támogatja az egyes iratgyűjtő-felhasználók testreszabását, amelyeket a spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.előtaggal konfigurálhat.

Gyártói tulajdonságok

Ezek a tulajdonságok EventHubsProducerPropertieskeresztül érhetők el.

Jegyzet

Az ismétlődés elkerülése érdekében a 4.17.0-s és 5.11.0-s verzió óta a Spring Cloud Azure Stream Binder Event Hubs támogatja az összes csatorna értékének beállítását spring.cloud.stream.eventhubs.default.producer.<property>=<value>formátumban.

A spring-cloud-azure-stream-binder-eventhubs gyártó által konfigurálható tulajdonságai:

Ingatlan Típus Leírás
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync Logikai A gyártó szinkronizálásának kapcsolójelzője. Ha igaz, a gyártó a küldési művelet után várja meg a választ.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout hosszú A küldési művelet után a válaszra váró idő. Csak akkor lép érvénybe, ha egy szinkronizálási gyártó engedélyezve van.
Speciális gyártókonfiguráció

A fenti kapcsolati és gyakori Azure SDK-ügyfél konfigurációja támogatja az egyes iratgyűjtő-előállítók testreszabását, amelyeket a spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.előtaggal konfigurálhat.

Alapszintű használat

Üzenetek küldése és fogadása az Event Hubsból vagy az eseményközpontba

  1. Adja meg a konfigurációs beállításokat hitelesítő adatokkal.

    • A hitelesítő adatok kapcsolati sztringként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

      Jegyzet

      A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.

    • A szolgáltatásnévként megadott hitelesítő adatokhoz konfigurálja a következő tulajdonságokat a application.yml fájlban:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Jegyzet

A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.

  • A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Szállító és fogyasztó meghatározása.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Particionálás támogatása

Létrejön egy PartitionSupplier a felhasználó által megadott partícióadatokkal az elküldendő üzenet partícióadatainak konfigurálásához. Az alábbi folyamatábra a partícióazonosító és a kulcs különböző prioritásainak beszerzését mutatja be:

A particionálás támogatási folyamat folyamatábraját bemutató diagram.

Batch fogyasztói támogatás

  1. Adja meg a kötegkonfigurációs beállításokat az alábbi példában látható módon:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Szállító és fogyasztó meghatározása.

    A BATCHellenőrzőpont-módban az alábbi kód használatával küldhet üzeneteket, és kötegekben használhatja azokat.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Ha MANUALellenőrzőpont-módot szeretne használni, az alábbi kóddal küldhet üzeneteket, és kötegekben használhatja a felhasználást/ellenőrzőpontot.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Jegyzet

Kötegfelhasználó módban a Spring Cloud Stream-iratgyűjtő alapértelmezett tartalomtípusa application/json, ezért győződjön meg arról, hogy az üzenet hasznos adatai igazodnak a tartalomtípushoz. Ha például a application/json alapértelmezett tartalomtípusát használja String hasznos adattal rendelkező üzenetek fogadásához, a hasznos adatokat JSON Stringkell használni, és az eredeti String szöveghez dupla idézőjelekkel kell körülvenni. Míg text/plain tartalomtípus esetében ez közvetlenül String objektum lehet. További információ: Spring Cloud Stream tartalomtípus egyeztetési.

Hibaüzenetek kezelése

  • Kimenő kötési hibaüzenetek kezelése

    Alapértelmezés szerint a Spring Integration létrehoz egy errorChannelnevű globális hibacsatornát. Konfigurálja a következő üzenetvégpontot a kimenő kötési hibaüzenetek kezeléséhez.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Bejövő kötési hibaüzenetek kezelése

    A Spring Cloud Stream Event Hubs Binder egyetlen megoldást támogat a bejövő üzenetkötések hibáinak kezelésére: hibakezelők.

    Hibakezelő:

    A Spring Cloud Stream egy olyan mechanizmust tesz elérhetővé, amellyel egyéni hibakezelőt biztosíthat egy Consumer példányokat elfogadó ErrorMessage hozzáadásával. További információ: Hibaüzenetek kezelése a Spring Cloud Stream dokumentációjában.

    • Kötés alapértelmezett hibakezelője

      Konfiguráljon egyetlen Consumer babot az összes bejövő kötési hibaüzenet felhasználásához. Az alábbi alapértelmezett függvény feliratkozik az egyes bejövő kötési hibacsatornákra:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      A spring.cloud.stream.default.error-handler-definition tulajdonságot a függvény nevére is be kell állítania.

    • Kötésspecifikus hibakezelő

      Konfiguráljon egy Consumer babot az adott bejövő kötési hibaüzenetek felhasználásához. Az alábbi függvény feliratkozik az adott bejövő kötési hibacsatornára, és magasabb prioritással rendelkezik, mint a kötés alapértelmezett hibakezelője:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      A spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition tulajdonságot a függvény nevére is be kell állítania.

Event Hubs-üzenetfejlécek

A támogatott alapvető üzenetfejléceket a Spring IntegrationSpring Cloud Azure-támogatásának Event Hubs-üzenetfejlécek című szakaszában találja.

Több kötőanyag támogatása

A több Event Hubs-névtérhez való csatlakozást több kötés is támogatja. Ez a minta példaként egy kapcsolati sztringet vesz fel. A szolgáltatásnevek és a felügyelt identitások hitelesítő adatai is támogatottak. A kapcsolódó tulajdonságokat az egyes iratgyűjtők környezeti beállításaiban állíthatja be.

  1. Ha több kötést szeretne használni az Event Hubsban, konfigurálja a következő tulajdonságokat a application.yml fájlban:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Jegyzet

    Az előző alkalmazásfájl bemutatja, hogyan konfigurálhat egyetlen alapértelmezett lekérdezést az alkalmazáshoz az összes kötésre. Ha egy adott kötéshez szeretné konfigurálni a lekérdezést, használhat olyan konfigurációt, mint a spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

    Jegyzet

    A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.

  2. Két szállítót és két fogyasztót kell definiálnunk:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Erőforrás-kiépítés

Az Event Hubs-iratgyűjtő támogatja az eseményközpont és a fogyasztói csoport kiépítését, a felhasználók az alábbi tulajdonságok használatával engedélyezhetik a kiépítést.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Jegyzet

A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.

Minták

További információ: azure-spring-boot-samples adattár a GitHubon.

Spring Cloud Stream Binder az Azure Service Bushoz

Főbb fogalmak

Az Azure Service Bushoz készült Spring Cloud Stream Binder biztosítja a Spring Cloud Stream-keretrendszer kötési implementációt. Ez az implementáció a Spring Integration Service Bus-csatornaadaptereket használja az alapításakor.

Ütemezett üzenet

Ez a kötés támogatja az üzenetek küldését egy témakörbe késleltetett feldolgozás céljából. A felhasználók olyan ütemezett üzeneteket küldhetnek, x-delay ezredmásodpercben kifejezve késleltetik az üzenetet. Az üzenet x-delay ezredmásodperc után jelenik meg a megfelelő témakörökhöz.

Fogyasztói csoport

A Service Bus-témakör hasonló támogatást nyújt a fogyasztói csoportoknak, mint az Apache Kafka, de kismértékben eltérő logikával. Ez a kötőanyag egy témakör Subscription támaszkodik a fogyasztói csoportként való működéshez.

Függőség beállítása

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Másik lehetőségként használhatja a Spring Cloud Azure Stream Service Bus Startert is, ahogyan az a Maven esetében az alábbi példában is látható:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Konfiguráció

Az iratgyűjtő a konfigurációs beállítások alábbi két részét biztosítja:

Kapcsolatkonfiguráció tulajdonságai

Ez a szakasz az Azure Service Bushoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.

Jegyzet

Ha biztonsági tagot használ egy Azure-erőforrás eléréséhez a Microsoft Entra-azonosítóval történő hitelesítéshez és engedélyezéshez, tekintse meg Hozzáférés engedélyezése a Microsoft Entra-azonosítóval annak biztosításához, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.

A spring-cloud-azure-stream-binder-servicebus kapcsolatkonfigurálható tulajdonságai:

Ingatlan Típus Leírás
spring.cloud.azure.servicebus.enabled Logikai Az Azure Service Bus engedélyezése.
spring.cloud.azure.servicebus.connection-string Húr Service Bus-névtér kapcsolati sztringértéke.
spring.cloud.azure.servicebus.custom-endpoint-address Húr A Service Bushoz való csatlakozáskor használandó egyéni végpontcím.
spring.cloud.azure.servicebus.namespace Húr Service Bus-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie
spring.cloud.azure.servicebus.domain-name Húr Egy Azure Service Bus-névtérérték tartományneve.

Jegyzet

Az Azure Service SDK gyakori konfigurációs beállításai a Spring Cloud Azure Stream Service Bus-iratgyűjtőhöz is konfigurálhatók. A támogatott konfigurációs beállítások Spring Cloud Azure-konfigurációsjelennek meg, és konfigurálhatók az egyesített spring.cloud.azure. előtaggal vagy a spring.cloud.azure.servicebus.előtagjával.

A kötőanyag alapértelmezés szerint támogatja Spring Could Azure Resource Manager. A kapcsolati sztring Data kapcsolódó szerepkörökkel nem rendelkező biztonsági tagokkal való lekéréséről a Spring Could Azure Resource ManagerAlapszintű használati című szakaszában olvashat.

Az Azure Service Bus kötéskonfigurációs tulajdonságai

A következő lehetőségek négy részre vannak osztva: Fogyasztói tulajdonságok, Speciális fogyasztói konfigurációk, Gyártói tulajdonságok és Speciális gyártói konfigurációk.

Fogyasztói tulajdonságok

Ezek a tulajdonságok ServiceBusConsumerPropertieskeresztül érhetők el.

Jegyzet

Az ismétlődés elkerülése érdekében a 4.17.0-s és 5.11.0-s verzió óta a Spring Cloud Azure Stream Binder Service Bus támogatja az összes csatorna értékeinek beállítását spring.cloud.stream.servicebus.default.consumer.<property>=<value>formátumban.

A spring-cloud-azure-stream-binder-servicebus fogyasztói konfigurálható tulajdonságai:

Ingatlan Típus Alapértelmezett Leírás
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected Logikai téves Ha a sikertelen üzeneteket a rendszer a DLQ-hoz irányítja.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Egész szám 1 A Service Bus processzorügyfél által feldolgozandó egyidejű üzenetek maximális beállítása. Ha a munkamenet engedélyezve van, az minden munkamenetre érvényes.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-egyidejű munkamenetek Egész szám null Egyszerre feldolgozandó munkamenetek maximális száma.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled logikai null Engedélyezve van-e a munkamenet.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-idle-timeout Időtartam null Beállítja, hogy mennyi idő (időtartam) várjon egy üzenet fogadására az aktuális aktív munkamenetben.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Egész szám 0 A Service Bus processzorügyfél előzetes száma.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue Alvárólista egyik sem Az alsor típusa, amelyhez csatlakozni szeretne.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Időtartam 5 m A zárolás automatikus megújításának folytatásához szükséges idő.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock A Service Bus processzorügyfél fogadási módja.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete logikai igaz Az üzenetek automatikus rendezése. Ha hamisként van beállítva, a rendszer hozzáadja a Checkpointer üzenetfejlécét, hogy a fejlesztők manuálisan rendezhessék az üzeneteket.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabájt Hosszú 1024 Az üzenetsor/témakör maximális mérete megabájtban, amely az üzenetsorhoz/témakörhöz lefoglalt memória mérete.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Időtartam P10675199DT2H48M5.4775807S. (10675199 nap, 2 óra, 48 perc, 5 másodperc és 477 ezredmásodperc) Az az időtartam, amely után az üzenet lejár, attól kezdve, hogy az üzenet el lesz küldve a Service Busnak.

Fontos

Az Azure Resource Manager (ARM) használatakor konfigurálnia kell a spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type tulajdonságot. További információ: servicebus-queue-binder-arm minta a GitHubon.

Speciális fogyasztói konfiguráció

A fenti kapcsolati és gyakori Azure SDK-ügyfél konfigurációja támogatja az egyes iratgyűjtő-felhasználók testreszabását, amelyeket a spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.előtaggal konfigurálhat.

Gyártói tulajdonságok

Ezek a tulajdonságok ServiceBusProducerPropertieskeresztül érhetők el.

Jegyzet

Az ismétlődés elkerülése érdekében a 4.17.0-s és 5.11.0-s verzió óta a Spring Cloud Azure Stream Binder Service Bus támogatja az összes csatorna értékeinek beállítását spring.cloud.stream.servicebus.default.producer.<property>=<value>formátumban.

A spring-cloud-azure-stream-binder-servicebus gyártó által konfigurálható tulajdonságai:

Ingatlan Típus Alapértelmezett Leírás
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync Logikai téves Kapcsolójelző a gyártó szinkronizálásához.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout hosszú 10 000 Időtúllépési érték a gyártó elküldéséhez.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType null A gyártó Service Bus-entitástípusa, amely a kötelező gyártóhoz szükséges.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabájt Hosszú 1024 Az üzenetsor/témakör maximális mérete megabájtban, amely az üzenetsorhoz/témakörhöz lefoglalt memória mérete.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Időtartam P10675199DT2H48M5.4775807S. (10675199 nap, 2 óra, 48 perc, 5 másodperc és 477 ezredmásodperc) Az az időtartam, amely után az üzenet lejár, attól kezdve, hogy az üzenet el lesz küldve a Service Busnak.

Fontos

A kötésgyártó használatakor konfigurálni kell a spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type tulajdonságát.

Speciális gyártókonfiguráció

A fenti kapcsolati és gyakori Azure SDK-ügyfél konfigurációja támogatja az egyes iratgyűjtő-előállítók testreszabását, amelyeket a spring.cloud.stream.servicebus.bindings.<binding-name>.producer.előtaggal konfigurálhat.

Alapszintű használat

Üzenetek küldése és fogadása a Service Busból vagy a Service Busba

  1. Adja meg a konfigurációs beállításokat hitelesítő adatokkal.

    • A hitelesítő adatok kapcsolati sztringként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

      Jegyzet

      A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.

    • A szolgáltatásnévként megadott hitelesítő adatokhoz konfigurálja a következő tulajdonságokat a application.yml fájlban:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Jegyzet

A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.

  • A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Szállító és fogyasztó meghatározása.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Partíciókulcs támogatása

A kötőanyag támogatja Service Bus particionálási azáltal, hogy engedélyezi a partíciókulcs és a munkamenet-azonosító beállítását az üzenetfejlécben. Ez a szakasz bemutatja, hogyan állíthat be partíciókulcsot az üzenetekhez.

A Spring Cloud Stream egy partíciókulcs SpEL-kifejezéstulajdonságot biztosít spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Például állítsa be ezt a tulajdonságot "'partitionKey-' + headers[<message-header-key>]", és adjon hozzá egy message-header-key nevű fejlécet. A Spring Cloud Stream a fejléc értékét használja a kifejezés kiértékelésekor egy partíciókulcs hozzárendeléséhez. Az alábbi kód egy példakészítőt tartalmaz:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Munkamenet-támogatás

A kötőanyag támogatja a Service Bus üzenet munkameneteit. Az üzenet munkamenet-azonosítója az üzenet fejlécén keresztül állítható be.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Jegyzet

Service Bus particionálásiszerint a munkamenet-azonosító prioritása magasabb, mint a partíciókulcs. Így ha mind a ServiceBusMessageHeaders#SESSION_ID, mind a ServiceBusMessageHeaders#PARTITION_KEY fejléc be van állítva, a munkamenet-azonosító értéke végül a partíciókulcs értékének felülírására szolgál.

Hibaüzenetek kezelése

  • Kimenő kötési hibaüzenetek kezelése

    Alapértelmezés szerint a Spring Integration létrehoz egy errorChannelnevű globális hibacsatornát. Konfigurálja a következő üzenetvégpontot a kimenő kötési hibaüzenet kezeléséhez.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Bejövő kötési hibaüzenetek kezelése

    A Spring Cloud Stream Service Bus Binder két megoldást támogat a bejövő üzenetkötések hibáinak kezelésére: a binder hibakezelője és a kezelők.

    Binder hibakezelő:

    Az alapértelmezett kötéskezelő hibakezelő kezeli a bejövő kötést. Ezzel a kezelő használatával sikertelen üzeneteket küldhet a kézbesítetlen levelek üzenetsorába, ha spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected engedélyezve van. Ellenkező esetben a sikertelen üzenetek elhagyva lesznek. A kötőanyag hibakezelője kölcsönösen kizárja a többi megadott hibakezelőt.

    Hibakezelő:

    A Spring Cloud Stream egy olyan mechanizmust tesz elérhetővé, amellyel egyéni hibakezelőt biztosíthat egy Consumer példányokat elfogadó ErrorMessage hozzáadásával. További információ: Hibaüzenetek kezelése a Spring Cloud Stream dokumentációjában.

    • Kötés alapértelmezett hibakezelője

      Konfiguráljon egyetlen Consumer babot az összes bejövő kötési hibaüzenet felhasználásához. Az alábbi alapértelmezett függvény feliratkozik az egyes bejövő kötési hibacsatornákra:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      A spring.cloud.stream.default.error-handler-definition tulajdonságot a függvény nevére is be kell állítania.

    • Kötésspecifikus hibakezelő

      Konfiguráljon egy Consumer babot az adott bejövő kötési hibaüzenetek felhasználásához. Az alábbi függvény előfizet az adott bejövő kötési hibacsatornára, amelynek prioritása magasabb, mint a kötés alapértelmezett hibakezelője.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      A spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition tulajdonságot a függvény nevére is be kell állítania.

Service Bus-üzenetfejlécek

A támogatott alapvető üzenetfejlécekért tekintse meg Service Bus-üzenetfejléceketSpring Cloud Azure-támogatás spring integrationcímű szakaszát.

Jegyzet

A partíciókulcs beállításakor az üzenetfejléc prioritása magasabb, mint a Spring Cloud Stream tulajdonság. Így spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression csak akkor lép érvénybe, ha egyik ServiceBusMessageHeaders#SESSION_ID és ServiceBusMessageHeaders#PARTITION_KEY fejléc sincs konfigurálva.

Több kötőanyag támogatása

A több Service Bus-névtérhez való csatlakozást több kötés is támogatja. Ez a minta példaként a kapcsolati sztringet veszi fel. A szolgáltatásnevek és a felügyelt identitások hitelesítő adatai is támogatottak, a felhasználók a kapcsolódó tulajdonságokat az egyes iratgyűjtők környezeti beállításaiban állíthatják be.

  1. A ServiceBus több kötésének használatához konfigurálja a következő tulajdonságokat a application.yml fájlban:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Jegyzet

    Az előző alkalmazásfájl bemutatja, hogyan konfigurálhat egyetlen alapértelmezett lekérdezést az alkalmazáshoz az összes kötésre. Ha egy adott kötéshez szeretné konfigurálni a lekérdezést, használhat olyan konfigurációt, mint a spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

    Jegyzet

    A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.

  2. két beszállítót és két fogyasztót kell meghatároznunk

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Erőforrás-kiépítés

A Service Bus binder támogatja az üzenetsor, a témakör és az előfizetés kiépítését, a felhasználók az alábbi tulajdonságok használatával engedélyezhetik a kiépítést.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Jegyzet

A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.

A Service Bus-ügyfél tulajdonságainak testreszabása

A fejlesztők a AzureServiceClientBuilderCustomizer segítségével testre szabhatják a Service Bus-ügyfél tulajdonságait. Az alábbi példa a sessionIdleTimeoutServiceBusClientBuilder tulajdonságát szabja testre:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Minták

További információ: azure-spring-boot-samples adattár a GitHubon.