Share via


Spring Cloud Azure-stöd för Spring Cloud Stream

Den här artikeln gäller för: ✔️ Version 4.14.0 ✔️ Version 5.8.0

Spring Cloud Stream är ett ramverk för att skapa mycket skalbara händelsedrivna mikrotjänster som är anslutna till delade meddelandesystem.

Ramverket tillhandahåller en flexibel programmeringsmodell som bygger på redan etablerade och välbekanta Spring-idiom och bästa praxis. Dessa metodtips omfattar stöd för beständiga pub/sub-semantik, konsumentgrupper och tillståndskänsliga partitioner.

Aktuella implementeringar av pärm är:

Spring Cloud Stream Binder för Azure Event Hubs

Nyckelbegrepp

Spring Cloud Stream Binder för Azure Event Hubs tillhandahåller bindningsimplementeringen för Spring Cloud Stream-ramverket. Den här implementeringen använder Spring Integration Event Hubs-kanalkort i grunden. Från designens perspektiv liknar Event Hubs Kafka. Event Hubs kan också nås via Kafka API. Om ditt projekt har ett nära beroende av Kafka API kan du prova Events Hub med Kafka API Sample

Konsumentgrupp

Event Hubs har liknande stöd för konsumentgruppen som Apache Kafka, men med lite annan logik. Medan Kafka lagrar alla incheckade förskjutningar i asynkron meddelandekö måste du lagra förskjutningar av Event Hubs-meddelanden som bearbetas manuellt. Event Hubs SDK tillhandahåller funktionen för att lagra sådana förskjutningar i Azure Storage.

Stöd för partitionering

Event Hubs har ett liknande koncept för fysisk partition som Kafka. Men till skillnad från Kafkas automatiska ombalansering mellan konsumenter och partitioner tillhandahåller Event Hubs ett slags förebyggande läge. Lagringskontot fungerar som ett lån för att avgöra vilken konsument som äger vilken partition. När en ny konsument startar försöker den stjäla vissa partitioner från de mest belastade konsumenterna för att uppnå arbetsbelastningsbalansen.

För att ange belastningsutjämningsstrategin tillhandahålls spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* egenskaper för. Mer information finns i avsnittet Konsumentegenskaper .

Stöd för Batch-konsument

Spring Cloud Azure Stream Event Hubs binder stöder Spring Cloud Stream Batch Consumer-funktionen.

Om du vill arbeta med batch-konsumentläget anger du spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode egenskapen till true. När det är aktiverat tas ett meddelande med en nyttolast av en lista över batchade händelser emot och skickas Consumer till funktionen. Varje meddelanderubrik konverteras också till en lista, där innehållet är det associerade rubrikvärdet som parsas från varje händelse. De gemensamma huvudena för partitions-ID, checkpointer och de senaste egenskaperna visas som ett enda värde eftersom hela batchen med händelser delar samma värde. Mer information finns i avsnittet Event Hubs-meddelandehuvudeni Spring Cloud Azure-stöd för Spring Integration.

Kommentar

Kontrollpunktshuvudet finns bara när kontrollpunktsläget MANUAL används.

Kontrollpunkter för batchkonsumenten stöder två lägen: BATCH och MANUAL. BATCH läget är ett läge för automatisk kontrollpunkt för att kontrollera hela batchen med händelser tillsammans när bindemedlet tar emot dem. MANUAL läge är att kontrollera händelserna av användare. När den används skickas den Checkpointer till meddelandehuvudet och användarna kan använda den för att göra kontrollpunkter.

Du kan ange batchstorleken genom att ange max-size egenskaperna och max-wait-time som har prefixet spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. Egenskapen max-size är nödvändig och egenskapen max-wait-time är valfri. Mer information finns i avsnittet Konsumentegenskaper .

Beroendekonfiguration

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Du kan också använda Spring Cloud Azure Stream Event Hubs Starter, som du ser i följande exempel för Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Konfiguration

Pärmen innehåller följande tre delar av konfigurationsalternativen:

konfigurationsegenskaper för Anslut ion

Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Event Hubs.

Kommentar

Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs läser du Auktorisera åtkomst med Microsoft Entra-ID för att kontrollera att säkerhetsobjektet har beviljats tillräcklig behörighet för att få åtkomst till Azure-resursen.

Anslut ion konfigurerbara egenskaper för spring-cloud-azure-stream-binder-eventhubs:

Property Type Beskrivning
spring.cloud.azure.eventhubs.enabled boolean Om en Azure Event Hubs är aktiverad.
spring.cloud.azure.eventhubs.connection-string String Event Hubs-namnområde anslutningssträng värde.
spring.cloud.azure.eventhubs.namespace String Event Hubs-namnområdesvärde, som är prefixet för det fullständiga domännamnet. Ett FQDN ska bestå av NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name String Domännamn för ett Namnområdesvärde för Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address String Anpassad slutpunktsadress.

Dricks

Vanliga konfigurationsalternativ för Azure Service SDK kan konfigureras även för Spring Cloud Azure Stream Event Hubs-pärmen. De konfigurationsalternativ som stöds introduceras i Spring Cloud Azure-konfigurationen och kan konfigureras med antingen det enhetliga prefixet spring.cloud.azure. eller prefixet spring.cloud.azure.eventhubs..

Pärmen har också stöd för Spring Could Azure Resource Manager som standard. Mer information om hur du hämtar anslutningssträng med säkerhetsobjekt som inte beviljas med Data relaterade roller finns i avsnittet Grundläggande användning iSpring Could Azure Resource Manager.

Konfigurationsegenskaper för kontrollpunkt

Det här avsnittet innehåller konfigurationsalternativen för Storage Blobs-tjänsten, som används för att bevara partitionsägarskap och kontrollpunktsinformation.

Kommentar

Från version 4.0.0, när egenskapen spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists inte är aktiverad manuellt, skapas ingen Lagringscontainer automatiskt med namnet från spring.cloud.stream.bindings.binding-name.destination.

Kontrollpunktskonfigurerbara egenskaper för spring-cloud-azure-stream-binder-eventhubs:

Property Type Beskrivning
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleskt Om du vill tillåta att containrar skapas om de inte finns.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name String Namn på lagringskontot.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key String Åtkomstnyckel för lagringskonto.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name String Namn på lagringscontainer.

Dricks

Vanliga konfigurationsalternativ för Azure Service SDK kan även konfigureras för Lagringsblobkontrollpunktsarkiv. De konfigurationsalternativ som stöds introduceras i Spring Cloud Azure-konfigurationen och kan konfigureras med antingen det enhetliga prefixet spring.cloud.azure. eller prefixet spring.cloud.azure.eventhubs.processor.checkpoint-store.

Konfigurationsegenskaper för Azure Event Hubs-bindning

Följande alternativ är indelade i fyra avsnitt: Konsumentegenskaper, Avancerade konsumentkonfigurationer, Producentegenskaper och Avancerade producentkonfigurationer.

Konsumentegenskaper

Dessa egenskaper exponeras via EventHubsConsumerProperties.

Konfigurerbara egenskaper för spring-cloud-azure-stream-binder-eventhubs:

Property Type Beskrivning
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Kontrollpunktsläge som används när konsumenten bestämmer hur ett kontrollpunktsmeddelande ska visas
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Integer Bestämmer mängden meddelande för varje partition för att göra en kontrollpunkt. Börjar gälla endast när PARTITION_COUNT kontrollpunktsläget används.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Varaktighet Bestämmer tidsintervallet för att göra en kontrollpunkt. Börjar gälla endast när TIME kontrollpunktsläget används.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Integer Det maximala antalet händelser i en batch. Krävs för batch-konsumentläge.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Varaktighet Den maximala tidsperioden för batchförbrukning. Börjar gälla endast när batch-konsumentläget är aktiverat och är valfritt.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Varaktighet Varaktighet för intervalltid för uppdatering.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy Belastningsutjämningsstrategin.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Varaktighet Tidsåtgången efter vilken ägarskapet för partitionen upphör att gälla.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Booleskt Om händelseprocessorn ska begära information om den senaste begärda händelsen på den associerade partitionen och spåra den informationen när händelser tas emot.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Integer Antalet som används av konsumenten för att kontrollera antalet händelser som händelsehubbens konsument aktivt tar emot och köar lokalt.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Mappa med nyckeln som partitions-ID och värden för StartPositionProperties Kartan som innehåller den händelseposition som ska användas för varje partition om en kontrollpunkt för partitionen inte finns i kontrollpunktsarkivet. Den här kartan är bortkopplad från partitions-ID:t.

Kommentar

Konfigurationen initial-partition-event-position accepterar en map för att ange den inledande positionen för varje händelsehubb. Dess nyckel är alltså partitions-ID och värdet innehåller StartPositionProperties egenskaper för förskjutning, sekvensnummer, köad datumtid och huruvida inkluderande. Du kan till exempel ange den som

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Avancerad konsumentkonfiguration

Ovanstående anslutning, kontrollpunkt och vanliga Azure SDK-klientkonfiguration stöder anpassning för varje bindemedelskonsument, som du kan konfigurera med prefixet spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

Producentegenskaper

Dessa egenskaper exponeras via EventHubsProducerProperties.

Producentkonfigurerbara egenskaper för spring-cloud-azure-stream-binder-eventhubs:

Property Type Beskrivning
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync boolean Switch-flaggan för synkronisering av producent. Om det är sant väntar producenten på ett svar efter en sändningsåtgärd.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout lång Hur lång tid det går att vänta på ett svar efter en sändningsåtgärd. Börjar gälla endast när en synkroniseringsproducent är aktiverad.
Avancerad producentkonfiguration

Ovanstående anslutning och den vanliga Azure SDK-klientkonfigurationen stöder anpassning för varje pärmproducent, som du kan konfigurera med prefixet spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

Grundläggande användning

Skicka och ta emot meddelanden från/till Event Hubs

  1. Fyll i konfigurationsalternativen med information om autentiseringsuppgifter.

    • För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i filen application.yml :

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Kommentar

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.

  • För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i filen application.yml :

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Definiera leverantör och konsument.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Stöd för partitionering

En PartitionSupplier partitionsinformation som tillhandahålls av användaren skapas för att konfigurera partitionsinformationen om meddelandet som ska skickas. Följande flödesschema visar processen för att hämta olika prioriteringar för partitions-ID och nyckel:

Diagram showing a flowchart of the partitioning support process.

Stöd för Batch-konsument

  1. Ange konfigurationsalternativen för batchen enligt följande exempel:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Definiera leverantör och konsument.

    För kontrollpunktsläge som BATCHkan du använda följande kod för att skicka meddelanden och använda i batchar.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    För kontrollpunktsläge som MANUALkan du använda följande kod för att skicka meddelanden och använda/kontrollpunkt i batchar.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Kommentar

I batchförbrukningsläget är application/jsonstandardinnehållstypen för Spring Cloud Stream-bindemedel , så se till att meddelandenyttolasten är anpassad till innehållstypen. När du till exempel använder standardinnehållstypen application/json för för att ta emot meddelanden med String nyttolast ska nyttolasten vara JSON String, omgiven av dubbla citattecken för den ursprungliga String texten. När det gäller text/plain innehållstyp kan det vara ett String objekt direkt. Mer information finns i Spring Cloud Stream Content Type Negotiation(Förhandlingar om Spring Cloud Stream-innehållstyp).

Hantera felmeddelanden

  • Hantera utgående bindningsfelmeddelanden

    Som standard skapar Spring Integration en global felkanal med namnet errorChannel. Konfigurera följande meddelandeslutpunkt för att hantera utgående bindningsfel:

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Hantera inkommande bindningsfelmeddelanden

    Spring Cloud Stream Event Hubs Binder stöder två lösningar för att hantera fel för inkommande meddelandebindningar: anpassade felkanaler och hanterare.

    Felkanal:

    Spring Cloud Stream tillhandahåller en felkanal för varje inkommande bindning. En ErrorMessage skickas till felkanalen. Mer information finns i Hantera fel i Spring Cloud Stream-dokumentationen.

    • Standardfelkanal

      Du kan använda en global felkanal med namnet errorChannel för att använda alla inkommande bindningsfelmeddelanden. Om du vill hantera dessa meddelanden konfigurerar du följande meddelandeslutpunkt:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Bindningsspecifik felkanal

      Du kan använda en specifik felkanal för att använda de specifika inkommande bindningsfelmeddelandena med högre prioritet än standardfelkanalen. Om du vill hantera dessa meddelanden konfigurerar du följande meddelandeslutpunkt:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Kommentar

      Den bindningsspecifika felkanalen är ömsesidigt uteslutande med andra tillhandahållna felhanterare och kanaler.

    Felhanterare:

    Spring Cloud Stream exponerar en mekanism som du kan använda för att tillhandahålla en anpassad felhanterare genom att lägga till en Consumer som accepterar ErrorMessage instanser. Mer information finns i Felhantering i Spring Cloud Stream-dokumentationen.

    Kommentar

    När en bindningsfelhanterare har konfigurerats kan den fungera med standardfelkanalen.

    • Felhanterare för bindningsstandard

      Konfigurera en enda Consumer böna för att använda alla inkommande bindningsfelmeddelanden. Följande standardfunktion prenumererar på varje inkommande bindningsfelkanal:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Du måste också ange spring.cloud.stream.default.error-handler-definition egenskapen till funktionsnamnet.

    • Bindningsspecifik felhanterare

      Konfigurera en Consumer böna för att använda de specifika inkommande bindningsfelmeddelandena. Följande funktion prenumererar på den specifika felkanalen för inkommande bindning och har högre prioritet än felhanteraren för bindningsstandard:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Du måste också ange spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition egenskapen till funktionsnamnet.

Meddelandehuvuden för Event Hubs

De grundläggande meddelandehuvuden som stöds finns i avsnittet Event Hubs-meddelandehuvuden i Spring Cloud Azure-stöd för Spring Integration.

Stöd för flera pärmar

Anslut ion till flera Event Hubs-namnområden stöds också med hjälp av flera pärmar. Det här exemplet tar ett anslutningssträng som exempel. Autentiseringsuppgifter för tjänstens huvudnamn och hanterade identiteter stöds också. Du kan ange relaterade egenskaper i varje bindemedels miljöinställningar.

  1. Om du vill använda flera pärmar med Event Hubs konfigurerar du följande egenskaper i filen application.yml :

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Kommentar

    Den tidigare programfilen visar hur du konfigurerar en enda standardsökare för programmet till alla bindningar. Om du vill konfigurera polleraren för en specifik bindning kan du använda en konfiguration som spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Vi behöver definiera två leverantörer och två konsumenter:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Resursetablering

Event Hubs binder stöder etablering av händelsehubb och konsumentgrupp. Användarna kan använda följande egenskaper för att aktivera etablering.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Kommentar

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.

Exempel

Mer information finns i lagringsplatsen azure-spring-boot-samples på GitHub.

Spring Cloud Stream Binder för Azure Service Bus

Nyckelbegrepp

Spring Cloud Stream Binder för Azure Service Bus tillhandahåller bindningsimplementeringen för Spring Cloud Stream Framework. Den här implementeringen använder Spring Integration Service Bus-kanalkort i grunden.

Schemalagt meddelande

Den här pärmen stöder sändning av meddelanden till ett ämne för fördröjd bearbetning. Användare kan skicka schemalagda meddelanden med sidhuvud x-delay som i millisekunder uttrycker en fördröjningstid för meddelandet. Meddelandet levereras till respektive avsnitt efter x-delay millisekunder.

Konsumentgrupp

Service Bus-ämnet ger liknande stöd för konsumentgruppen som Apache Kafka, men med lite annan logik. Den här pärmen förlitar sig på Subscription ett ämne för att fungera som en konsumentgrupp.

Beroendekonfiguration

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Du kan också använda Spring Cloud Azure Stream Service Bus Starter, som du ser i följande exempel för Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Konfiguration

Pärmen innehåller följande två delar av konfigurationsalternativen:

konfigurationsegenskaper för Anslut ion

Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Service Bus.

Kommentar

Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs läser du Auktorisera åtkomst med Microsoft Entra-ID för att kontrollera att säkerhetsobjektet har beviljats tillräcklig behörighet för att få åtkomst till Azure-resursen.

Anslut ion konfigurerbara egenskaper för spring-cloud-azure-stream-binder-servicebus:

Property Type Beskrivning
spring.cloud.azure.servicebus.enabled boolean Om en Azure Service Bus är aktiverad.
spring.cloud.azure.servicebus.connection-string String Service Bus-namnområde anslutningssträng värde.
spring.cloud.azure.servicebus.namespace String Service Bus-namnområdesvärde, som är prefixet för FQDN. Ett FQDN ska bestå av NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name String Domännamn för ett Azure Service Bus-namnområdesvärde.

Kommentar

Vanliga konfigurationsalternativ för Azure Service SDK kan konfigureras även för Spring Cloud Azure Stream Service Bus-pärmen. De konfigurationsalternativ som stöds introduceras i Spring Cloud Azure-konfigurationen och kan konfigureras med antingen det enhetliga prefixet spring.cloud.azure. eller prefixet spring.cloud.azure.servicebus..

Pärmen har också stöd för Spring Could Azure Resource Manager som standard. Mer information om hur du hämtar anslutningssträng med säkerhetsobjekt som inte beviljas med Data relaterade roller finns i avsnittet Grundläggande användning iSpring Could Azure Resource Manager.

Konfigurationsegenskaper för Azure Service Bus-bindning

Följande alternativ är indelade i fyra avsnitt: Konsumentegenskaper, Avancerade konsumentkonfigurationer, Producentegenskaper och Avancerade producentkonfigurationer.

Konsumentegenskaper

Dessa egenskaper exponeras via ServiceBusConsumerProperties.

Konsumentkonfigurerbara egenskaper för spring-cloud-azure-stream-binder-servicebus:

Property Type Standardvärde beskrivning
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected boolean falskt Om de misslyckade meddelandena dirigeras till DLQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Integer 1 Maximalt antal samtidiga meddelanden som Service Bus-processorklienten ska bearbeta.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Integer NULL Maximalt antal samtidiga sessioner som ska bearbetas vid en viss tidpunkt.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Booleskt NULL Om sessionen är aktiverad.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Integer 0 Antalet prefetch för Service Bus-processorklienten.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue Underkö inget Typen av underkö som ska anslutas till.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Varaktighet 5 m Hur lång tid det går att fortsätta förnya låset automatiskt.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Mottagningsläget för Service Bus-processorklienten.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Booleskt true Om meddelanden ska regleras automatiskt. Om det anges som falskt läggs ett meddelandehuvud Checkpointer till för att göra det möjligt för utvecklare att reglera meddelanden manuellt.
Avancerad konsumentkonfiguration

Ovanstående anslutning och den vanliga Azure SDK-klientkonfigurationen stöder anpassning för varje binder-konsument, som du kan konfigurera med prefixet spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

Producentegenskaper

Dessa egenskaper exponeras via ServiceBusProducerProperties.

Producentkonfigurerbara egenskaper för spring-cloud-azure-stream-binder-servicebus:

Property Type Standardvärde beskrivning
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync boolean falskt Växla flagga för synkronisering av producent.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout lång 10000 Timeout-värde för sändning av producent.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType NULL Service Bus-entitetstypen för producenten, som krävs för den bindande producenten.

Viktigt!

När du använder bindningsproducenten måste egenskapen spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type för konfigureras.

Avancerad producentkonfiguration

Ovanstående anslutning och den vanliga Azure SDK-klientkonfigurationen stöder anpassning för varje pärmproducent, som du kan konfigurera med prefixet spring.cloud.stream.servicebus.bindings.<binding-name>.producer..

Grundläggande användning

Skicka och ta emot meddelanden från/till Service Bus

  1. Fyll i konfigurationsalternativen med information om autentiseringsuppgifter.

    • För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i filen application.yml :

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Kommentar

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.

  • För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i filen application.yml :

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Definiera leverantör och konsument.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Stöd för partitionsnyckel

Pärmen stöder Service Bus-partitionering genom att tillåta inställning av partitionsnyckel och sessions-ID i meddelandehuvudet. I det här avsnittet beskrivs hur du anger partitionsnyckel för meddelanden.

Spring Cloud Stream tillhandahåller en spEL-uttrycksegenskap spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressionför partitionsnyckeln . Du kan till exempel ange den här egenskapen som "'partitionKey-' + headers[<message-header-key>]" och lägga till en rubrik med namnet message-header-key. Spring Cloud Stream använder värdet för det här huvudet när uttrycket utvärderas för att tilldela en partitionsnyckel. Följande kod ger en exempelproducent:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Sessionsstöd

Pärmen stöder meddelandesessioner i Service Bus. Sessions-ID för ett meddelande kan anges via meddelanderubriken.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Kommentar

Enligt Service Bus-partitionering har sessions-ID högre prioritet än partitionsnyckel. Så när både av ServiceBusMessageHeaders#SESSION_ID och ServiceBusMessageHeaders#PARTITION_KEY huvuden har angetts används värdet för sessions-ID:t så småningom för att skriva över värdet för partitionsnyckeln.

Hantera felmeddelanden

  • Hantera utgående bindningsfelmeddelanden

    Som standard skapar Spring Integration en global felkanal med namnet errorChannel. Konfigurera följande meddelandeslutpunkt för att hantera utgående bindningsfel.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Hantera inkommande bindningsfelmeddelanden

    Spring Cloud Stream Service Bus Binder stöder tre lösningar för att hantera fel för inkommande meddelandebindningar: binder-felhanteraren, anpassade felkanaler och hanterare.

    Binder-felhanterare:

    Standardhanteraren för bindemedelsfel hanterar den inkommande bindningen. Du använder den här hanteraren för att skicka misslyckade meddelanden till kön med obeställbara meddelanden när spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected den är aktiverad. Annars avbryts de misslyckade meddelandena. Förutom att konfigurera den bindningsspecifika felkanalen börjar binder-felhanteraren alltid gälla oavsett om det finns andra anpassade felhanterare eller kanaler.

    Felkanal:

    Spring Cloud Stream tillhandahåller en felkanal för varje inkommande bindning. En ErrorMessage skickas till felkanalen. Mer information finns i Hantera fel i Spring Cloud Stream-dokumentationen.

    • Standardfelkanal

      Du kan använda en global felkanal med namnet errorChannel för att använda alla inkommande bindningsfelmeddelanden. Om du vill hantera dessa meddelanden konfigurerar du följande meddelandeslutpunkt:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Bindningsspecifik felkanal

      Du kan använda en specifik felkanal för att använda de specifika inkommande bindningsfelmeddelandena med högre prioritet än standardfelkanalen. Om du vill hantera dessa meddelanden konfigurerar du följande meddelandeslutpunkt:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Kommentar

      Den bindningsspecifika felkanalen är ömsesidigt uteslutande med andra tillhandahållna felhanterare och kanaler.

    Felhanterare:

    Spring Cloud Stream exponerar en mekanism som du kan använda för att tillhandahålla en anpassad felhanterare genom att lägga till en Consumer som accepterar ErrorMessage instanser. Mer information finns i Felhantering i Spring Cloud Stream-dokumentationen.

    Kommentar

    När en bindningsfelhanterare har konfigurerats kan den fungera med standardfelkanalen och binder-felhanteraren.

    • Felhanterare för bindningsstandard

      Konfigurera en enda Consumer böna för att använda alla inkommande bindningsfelmeddelanden. Följande standardfunktion prenumererar på varje inkommande bindningsfelkanal:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Du måste också ange spring.cloud.stream.default.error-handler-definition egenskapen till funktionsnamnet.

    • Bindningsspecifik felhanterare

      Konfigurera en Consumer böna för att använda de specifika inkommande bindningsfelmeddelandena. Följande funktion prenumererar på den specifika inkommande bindningsfelkanalen med högre prioritet än felhanteraren för bindningsstandard.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Du måste också ange spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition egenskapen till funktionsnamnet.

Service Bus-meddelandehuvuden

De grundläggande meddelandehuvuden som stöds finns i avsnittet Service Bus-meddelandehuvuden i Spring Cloud Azure-stöd för Spring Integration.

Kommentar

När du ställer in partitionsnyckeln är prioriteten för meddelandehuvudet högre än spring cloud stream-egenskapen. Så spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression börjar gälla endast när ingen av huvudena ServiceBusMessageHeaders#SESSION_ID och ServiceBusMessageHeaders#PARTITION_KEY har konfigurerats.

Stöd för flera pärmar

Anslut ion till flera Service Bus-namnområden stöds också med hjälp av flera pärmar. Det här exemplet tar anslutningssträng som exempel. Autentiseringsuppgifter för tjänstens huvudnamn och hanterade identiteter stöds också, användare kan ange relaterade egenskaper i varje pärms miljöinställningar.

  1. Om du vill använda flera pärmar för ServiceBus konfigurerar du följande egenskaper i filen application.yml :

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Kommentar

    Den tidigare programfilen visar hur du konfigurerar en enda standardsökare för programmet till alla bindningar. Om du vill konfigurera polleraren för en specifik bindning kan du använda en konfiguration som spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. vi behöver definiera två leverantörer och två konsumenter

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Resursetablering

Service Bus Binder stöder etablering av kö, ämne och prenumeration. Användarna kan använda följande egenskaper för att aktivera etablering.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Kommentar

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.

Exempel

Mer information finns i lagringsplatsen azure-spring-boot-samples på GitHub.