Share via


Spring Cloud Azure-stöd för Spring Integration

Den här artikeln gäller för: ✔️ Version 4.14.0 ✔️ Version 5.8.0

Spring Integration Extension för Azure tillhandahåller Spring Integration-kort för de olika tjänster som tillhandahålls av Azure SDK för Java. Vi tillhandahåller Spring Integration-stöd för dessa Azure-tjänster: Event Hubs, Service Bus, Storage Queue. Följande är en lista över kort som stöds:

Spring-integrering med Azure Event Hubs

Nyckelbegrepp

Azure Event Hubs är en stordataströmningsplattform och händelseinmatningstjänst. Den kan ta emot och behandla miljoner händelser per sekund. Data som skickas till en händelsehubb kan omvandlas och lagras med hjälp av valfri provider för realtidsanalys eller batchbearbetnings-/lagringsadaptrar.

Spring Integration möjliggör enklare meddelanden i Spring-baserade program och stöder integrering med externa system via deklarativa kort. Dessa kort ger en högre abstraktionsnivå jämfört med Spring-stöd för fjärrkommunikation, meddelanden och schemaläggning. Spring Integration for Event Hubs-tilläggsprojektet tillhandahåller inkommande och utgående kanalkort och gatewayer för Azure Event Hubs.

Kommentar

RxJava-stöd-API:er tas bort från version 4.0.0. Mer information finns i Javadoc.

Konsumentgrupp

Event Hubs har liknande stöd för konsumentgruppen som Apache Kafka, men med lite annan logik. Medan Kafka lagrar alla incheckade förskjutningar i asynkron meddelandekö måste du lagra förskjutningar av Event Hubs-meddelanden som bearbetas manuellt. Event Hubs SDK tillhandahåller funktionen för att lagra sådana förskjutningar i Azure Storage.

Stöd för partitionering

Event Hubs har ett liknande koncept för fysisk partition som Kafka. Men till skillnad från Kafkas automatiska ombalansering mellan konsumenter och partitioner tillhandahåller Event Hubs ett slags förebyggande läge. Lagringskontot fungerar som ett lån för att avgöra vilken partition som ägs av vilken konsument. När en ny konsument startar försöker den stjäla vissa partitioner från de flesta tunga användare för att uppnå arbetsbelastningsutjämningen.

För att ange belastningsutjämningsstrategin kan utvecklare använda EventHubsContainerProperties för konfigurationen. I följande avsnitt finns ett exempel på hur du konfigurerar EventHubsContainerProperties.

Stöd för Batch-konsument

Stöder EventHubsInboundChannelAdapter batchförbrukningsläget. För att aktivera den kan användarna ange lyssnarläget som ListenerMode.BATCH när de skapar en EventHubsInboundChannelAdapter instans. När det är aktiverat tas ett meddelande som nyttolasten är en lista över batchade händelser emot och skickas till den nedströmskanalen. Varje meddelanderubrik konverteras också som en lista, där innehållet är det associerade rubrikvärdet som parsas från varje händelse. För de gemensamma huvudena för partitions-ID, checkpointer och de senaste egenskaperna visas de som ett enda värde för hela batchen med händelser som delar samma. Mer information finns i avsnittet Meddelandehuvuden för Event Hubs.

Kommentar

Kontrollpunktsrubriken finns bara när MANUELLT kontrollpunktsläge används.

Kontrollpunkter för batchkonsumenten stöder två lägen: BATCH och MANUAL. BATCH läget är ett läge för automatisk kontrollpunkt för att kontrollera hela batchen med händelser tillsammans när de har tagits emot. MANUAL läge är att kontrollera händelserna av användare. När den används skickas Kontrollpunkten till meddelandehuvudet och användarna kan använda den för att göra kontrollpunkter.

Principen för batchförbrukning kan anges av egenskaperna max-size och , där max-size är en nödvändig egenskap medan max-wait-timemax-wait-timeden är valfri. Utvecklare kan använda EventHubsContainerProperties för konfigurationen för att ange den batchanvändningsstrategi som används. I följande avsnitt finns ett exempel på hur du konfigurerar EventHubsContainerProperties.

Beroendekonfiguration

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Konfiguration

Den här startprogrammet innehåller följande tre delar av konfigurationsalternativen:

konfigurationsegenskaper för Anslut ion

Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Event Hubs.

Kommentar

Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs läser du Auktorisera åtkomst med Microsoft Entra-ID för att kontrollera att säkerhetsobjektet har beviljats tillräcklig behörighet för att få åtkomst till Azure-resursen.

Anslut ion konfigurerbara egenskaper för spring-cloud-azure-starter-integration-eventhubs:

Property Type Beskrivning
spring.cloud.azure.eventhubs.enabled boolean Om en Azure Event Hubs är aktiverad.
spring.cloud.azure.eventhubs.connection-string String Event Hubs-namnområde anslutningssträng värde.
spring.cloud.azure.eventhubs.namespace String Event Hubs-namnområdesvärde, som är prefixet för det fullständiga domännamnet. Ett FQDN ska bestå av NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name String Domännamn för ett Namnområdesvärde för Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address String Anpassad slutpunktsadress.
spring.cloud.azure.eventhubs.shared-connection Booleskt Om den underliggande EventProcessorClient och EventHubProducerAsyncClient använder samma anslutning. Som standard skapas en ny anslutning och används för varje händelsehubbklient som skapas.

Konfigurationsegenskaper för kontrollpunkt

Det här avsnittet innehåller konfigurationsalternativen för Storage Blobs-tjänsten, som används för att bevara partitionsägarskap och kontrollpunktsinformation.

Kommentar

Från version 4.0.0, när egenskapen spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists inte aktiveras manuellt, skapas ingen Lagringscontainer automatiskt.

Kontrollpunktskonfigurerbara egenskaper för spring-cloud-azure-starter-integration-eventhubs:

Property Type Beskrivning
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleskt Om du vill tillåta att containrar skapas om de inte finns.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name String Namn på lagringskontot.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key String Åtkomstnyckel för lagringskonto.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name String Namn på lagringscontainer.

Vanliga konfigurationsalternativ för Azure Service SDK kan även konfigureras för Lagringsblobkontrollpunktsarkiv. De konfigurationsalternativ som stöds introduceras i Spring Cloud Azure-konfigurationen och kan konfigureras med antingen det enhetliga prefixet spring.cloud.azure. eller prefixet spring.cloud.azure.eventhubs.processor.checkpoint-store.

Konfigurationsegenskaper för Event Hub-processor

EventHubsInboundChannelAdapter Använder EventProcessorClient för att använda meddelanden från en händelsehubb för att konfigurera de övergripande egenskaperna för en EventProcessorClient, som utvecklare kan använda EventHubsContainerProperties för konfigurationen. Se följande avsnitt om hur du arbetar med EventHubsInboundChannelAdapter.

Grundläggande användning

Skicka meddelanden till Azure Event Hubs

  1. Fyll i konfigurationsalternativen för autentiseringsuppgifter.

    • För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i filen application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i filen application.yml :

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Kommentar

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.

  1. Skapa DefaultMessageHandler med bönan EventHubsTemplate för att skicka meddelanden till Event Hubs.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Skapa en meddelandegatewaybindning med meddelandehanteraren ovan via en meddelandekanal.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Skicka meddelanden med hjälp av gatewayen.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Ta emot meddelanden från Azure Event Hubs

  1. Fyll i konfigurationsalternativen för autentiseringsuppgifter.

  2. Skapa en böna med meddelandekanalen som indatakanal.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Skapa EventHubsInboundChannelAdapter med bönan EventHubsMessageListenerContainer för att ta emot meddelanden från Event Hubs.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Skapa en meddelandemottagarebindning med EventHubsInboundChannelAdapter via meddelandekanalen som skapades tidigare.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurera EventHubsMessageConverter för att anpassa objectMapper

EventHubsMessageConverter görs som en konfigurerbar böna så att användarna kan anpassa ObjectMapper.

Stöd för Batch-konsument

Att använda meddelanden från Event Hubs i batchar liknar exemplet ovan, förutom att användarna bör ange de batchkrävande relaterade konfigurationsalternativen för EventHubsInboundChannelAdapter.

När du skapar EventHubsInboundChannelAdapterska lyssnarläget anges som BATCH. När du skapar böna av EventHubsMessageListenerContaineranger du kontrollpunktsläget som antingen MANUAL eller BATCH, och batchalternativen kan konfigureras efter behov.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Meddelandehuvuden för Event Hubs

I följande tabell visas hur event hubs-meddelandeegenskaper mappas till Spring-meddelandehuvuden. För Azure Event Hubs anropas meddelandet som event.

Mappning mellan Event Hubs-meddelande/händelseegenskaper och Spring Message-huvuden i inspelningslyssningsläge:

Händelseegenskaper för Event Hubs Spring Message-huvudkonstanter Typ Beskrivning
Köad tid EventHubsHeaders#ENQUEUED_TIME Direkt Omedelbart, i UTC, när händelsen angavs i Event Hub-partitionen.
Förskjutning EventHubsHeaders#OFFSET Long Förskjutningen av händelsen när den togs emot från den associerade Event Hub-partitionen.
Partitionsnyckel AzureHeaders#PARTITION_KEY String Partitionens hashningsnyckel om den angavs när händelsen ursprungligen publicerades.
Partitions-ID AzureHeaders#RAW_PARTITION_ID String Partitions-ID för händelsehubben.
Löpnummer EventHubsHeaders#SEQUENCE_NUMBER Long Sekvensnumret som tilldelades händelsen när den angavs i den associerade Event Hub-partitionen.
Senaste egenskaper för enqueued-händelse EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Egenskaperna för den senaste köade händelsen i den här partitionen.
NA AzureHeaders#CHECKPOINTER Kontrollpunkt Huvudet för kontrollpunkten det specifika meddelandet.

Användare kan parsa meddelanderubrikerna för relaterad information om varje händelse. Om du vill ange ett meddelandehuvud för händelsen placeras alla anpassade rubriker som en programegenskap för en händelse, där rubriken anges som egenskapsnyckel. När händelser tas emot från Event Hubs konverteras alla programegenskaper till meddelandehuvudet.

Kommentar

Meddelandehuvuden för partitionsnyckel, köad tid, förskjutning och sekvensnummer stöds inte för att ställas in manuellt.

När batch-konsumentläget är aktiverat visas de specifika rubrikerna för batchmeddelanden på följande sätt, som innehåller en lista med värden från varje händelse i Händelsehubbar.

Mappning mellan Event Hubs-meddelande/händelseegenskaper och Spring Message-huvuden i Batch-lyssnarläge:

Händelseegenskaper för Event Hubs Spring Batch-meddelandehuvudkonstanter Typ Beskrivning
Köad tid EventHubsHeaders#ENQUEUED_TIME Lista över snabbmeddelanden Lista över ögonblicket, i UTC, över när varje händelse angavs i Event Hub-partitionen.
Förskjutning EventHubsHeaders#OFFSET Lista över långa Lista över förskjutningen för varje händelse när den togs emot från den associerade Event Hub-partitionen.
Partitionsnyckel AzureHeaders#PARTITION_KEY Lista över sträng Lista över partitionshashingnyckeln om den angavs när varje händelse ursprungligen publicerades.
Löpnummer EventHubsHeaders#SEQUENCE_NUMBER Lista över långa Lista över sekvensnumret som tilldelades varje händelse när den angavs i den associerade Event Hub-partitionen.
Systemegenskaper EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Lista över karta Lista över systemegenskaperna för varje händelse.
Egenskaper för program EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Lista över karta Lista över programegenskaperna för varje händelse, där alla anpassade meddelandehuvuden eller händelseegenskaper placeras.

Kommentar

När du publicerar meddelanden tas alla batchhuvuden ovan bort från meddelandena om de finns.

Exempel

Mer information finns i lagringsplatsen azure-spring-boot-samples på GitHub.

Spring-integrering med Azure Service Bus

Nyckelbegrepp

Spring Integration möjliggör enklare meddelanden i Spring-baserade program och stöder integrering med externa system via deklarativa kort.

Spring Integration for Azure Service Bus-tilläggsprojektet tillhandahåller inkommande och utgående kanalkort för Azure Service Bus.

Kommentar

CompletableFuture-stöd-API:er har föråldrats från version 2.10.0 och ersätts av Reactor Core från version 4.0.0. Mer information finns i Javadoc.

Beroendekonfiguration

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Konfiguration

Den här startprogrammet innehåller följande två delar av konfigurationsalternativen:

konfigurationsegenskaper för Anslut ion

Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Service Bus.

Kommentar

Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs läser du Auktorisera åtkomst med Microsoft Entra-ID för att kontrollera att säkerhetsobjektet har beviljats tillräcklig behörighet för att få åtkomst till Azure-resursen.

Anslut ion konfigurerbara egenskaper för spring-cloud-azure-starter-integration-servicebus:

Property Type Beskrivning
spring.cloud.azure.servicebus.enabled boolean Om en Azure Service Bus är aktiverad.
spring.cloud.azure.servicebus.connection-string String Service Bus-namnområde anslutningssträng värde.
spring.cloud.azure.servicebus.namespace String Service Bus-namnområdesvärde, som är prefixet för FQDN. Ett FQDN ska bestå av NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name String Domännamn för ett Azure Service Bus-namnområdesvärde.

Konfigurationsegenskaper för Service Bus-processor

ServiceBusInboundChannelAdapter Använder ServiceBusProcessorClient för att använda meddelanden, för att konfigurera de övergripande egenskaperna för en ServiceBusProcessorClient, kan utvecklare använda ServiceBusContainerProperties för konfigurationen. Se följande avsnitt om hur du arbetar med ServiceBusInboundChannelAdapter.

Grundläggande användning

Skicka meddelanden till Azure Service Bus

  1. Fyll i konfigurationsalternativen för autentiseringsuppgifter.

    • För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i filen application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Kommentar

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.

  • För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i filen application.yml :

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Kommentar

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.

  1. Skapa DefaultMessageHandler med bönan ServiceBusTemplate för att skicka meddelanden till Service Bus och ange entitetstypen för ServiceBusTemplate. Det här exemplet tar Service Bus Queue som exempel.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Skapa en meddelandegatewaybindning med meddelandehanteraren ovan via en meddelandekanal.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Skicka meddelanden med hjälp av gatewayen.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Ta emot meddelanden från Azure Service Bus

  1. Fyll i konfigurationsalternativen för autentiseringsuppgifter.

  2. Skapa en böna med meddelandekanalen som indatakanal.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Skapa ServiceBusInboundChannelAdapter med bönan ServiceBusMessageListenerContainer för att ta emot meddelanden till Service Bus. Det här exemplet tar Service Bus Queue som exempel.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Skapa en meddelandemottagarebindning med ServiceBusInboundChannelAdapter via meddelandekanalen som vi skapade tidigare.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurera ServiceBusMessageConverter för att anpassa objectMapper

ServiceBusMessageConverter görs som en konfigurerbar böna så att användarna kan anpassa ObjectMapper.

Service Bus-meddelandehuvuden

För vissa Service Bus-huvuden som kan mappas till flera Spring-huvudkonstanter visas prioriteten för olika Spring-huvuden.

Mappning mellan Service Bus-huvuden och Spring-huvuden:

Service Bus-meddelandehuvuden och egenskaper Spring-meddelandehuvudkonstanter Typ Konfigureringsbart beskrivning
Content type MessageHeaders#CONTENT_TYPE String Ja Beskrivningen RFC2045 innehållstyp för meddelandet.
Korrelations-ID ServiceBusMessageHeaders#CORRELATION_ID String Ja Korrelations-ID för meddelandet
Meddelande-ID ServiceBusMessageHeaders#MESSAGE_ID String Ja Meddelande-ID:t för meddelandet, det här huvudet har högre prioritet än MessageHeaders#ID.
Meddelande-ID MessageHeaders#ID UUID Ja Meddelandets meddelande-ID, det här huvudet har lägre prioritet än ServiceBusMessageHeaders#MESSAGE_ID.
Partitionsnyckel ServiceBusMessageHeaders#PARTITION_KEY String Ja Partitionsnyckeln för att skicka meddelandet till en partitionerad entitet.
Svara till MessageHeaders#REPLY_CHANNEL String Ja Adressen till en entitet som svar ska skickas till.
Svara på sessions-ID ServiceBusMessageHeaders#REPLY_TO_SESSION_ID String Ja Egenskapen ReplyToGroupId för meddelandet.
Schemalagd kötidsutc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Ja Den datumtid då meddelandet ska anges i Service Bus har den här rubriken högre prioritet än AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Schemalagd kötidsutc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Integer Ja Den datumtid då meddelandet ska anges i Service Bus har den här rubriken lägre prioritet än ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Sessions-ID ServiceBusMessageHeaders#SESSION_ID String Ja Sessions-IDentifier för en sessionsmedveten entitet.
Time to live ServiceBusMessageHeaders#TIME_TO_LIVE Varaktighet Ja Hur lång tid det tar innan det här meddelandet upphör att gälla.
To ServiceBusMessageHeaders#TO String Ja Meddelandets "till"-adress, reserverad för framtida användning i routningsscenarier och som för närvarande ignoreras av själva koordinatorn.
Subject ServiceBusMessageHeaders#SUBJECT String Ja Ämnet för meddelandet.
Beskrivning av fel med obeställbara bokstäver ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION String Nej Beskrivningen av ett meddelande som har fått en obeställd bokstav.
Orsak till död bokstav ServiceBusMessageHeaders#DEAD_LETTER_REASON String Nej Anledningen till att ett meddelande var obeställt.
Källa med obeställbara bokstäver ServiceBusMessageHeaders#DEAD_LETTER_SOURCE String Nej Entiteten där meddelandet var obeställt.
Antal leveranser ServiceBusMessageHeaders#DELIVERY_COUNT lång Nej Antalet gånger meddelandet levererades till klienter.
Kodat sekvensnummer ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER lång Nej Det kodade sekvensnumret som tilldelats ett meddelande av Service Bus.
Köad tid ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Nej Den datetime då det här meddelandet angavs i Service Bus.
Upphör att gälla kl. ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Nej Den datumtid då det här meddelandet upphör att gälla.
Lås token ServiceBusMessageHeaders#LOCK_TOKEN String Nej Låstoken för det aktuella meddelandet.
Låst tills ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Nej Den datumtid då låset för det här meddelandet upphör att gälla.
Löpnummer ServiceBusMessageHeaders#SEQUENCE_NUMBER lång Nej Det unika nummer som tilldelats ett meddelande av Service Bus.
Tillstånd ServiceBusMessageHeaders#STATE ServiceBusMessageState Nej Tillståndet för meddelandet, som kan vara Aktiv, Uppskjuten eller Schemalagd.

Stöd för partitionsnyckel

Den här startern stöder Service Bus-partitionering genom att tillåta inställning av partitionsnyckel och sessions-ID i meddelandehuvudet. I det här avsnittet beskrivs hur du anger partitionsnyckel för meddelanden.

Rekommenderas: Använd ServiceBusMessageHeaders.PARTITION_KEY som nyckel i rubriken.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Rekommenderas inte men stöds för närvarande:AzureHeaders.PARTITION_KEY som nyckeln i rubriken.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Kommentar

När både ServiceBusMessageHeaders.PARTITION_KEY och AzureHeaders.PARTITION_KEY anges i meddelandehuvudena föredras ServiceBusMessageHeaders.PARTITION_KEY .

Sessionsstöd

Det här exemplet visar hur du manuellt anger sessions-ID för ett meddelande i programmet.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Kommentar

ServiceBusMessageHeaders.SESSION_ID När anges i meddelanderubrikerna och ett annat ServiceBusMessageHeaders.PARTITION_KEY huvud också anges, används värdet för sessions-ID:t så småningom för att skriva över värdet för partitionsnyckeln.

Exempel

Mer information finns i lagringsplatsen azure-spring-boot-samples på GitHub.

Spring-integrering med Azure Storage Queue

Nyckelbegrepp

Azure Queue Storage är en tjänst för lagring av ett stort antal meddelanden. Du kommer åt meddelanden var som helst i världen via autentiserade anrop med HTTP eller HTTPS. Ett kömeddelande kan vara upp till 64 KB stort. En kö kan innehålla miljontals meddelanden, upp till den totala kapacitetsgränsen för ett lagringskonto. Köer används ofta för att skapa en kvarvarande arbetslogg för att bearbeta asynkront.

Beroendekonfiguration

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Konfiguration

Den här startprogrammet innehåller följande konfigurationsalternativ:

konfigurationsegenskaper för Anslut ion

Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Storage Queue.

Kommentar

Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs läser du Auktorisera åtkomst med Microsoft Entra-ID för att kontrollera att säkerhetsobjektet har beviljats tillräcklig behörighet för att få åtkomst till Azure-resursen.

Anslut ion konfigurerbara egenskaper för spring-cloud-azure-starter-integration-storage-queue:

Property Type Beskrivning
spring.cloud.azure.storage.queue.enabled boolean Om en Azure Storage-kö är aktiverad.
spring.cloud.azure.storage.queue.connection-string String Lagringskönamnområde anslutningssträng värde.
spring.cloud.azure.storage.queue.accountName String Namn på lagringskökonto.
spring.cloud.azure.storage.queue.accountKey String Kontonyckel för lagringskö.
spring.cloud.azure.storage.queue.endpoint String Tjänstslutpunkt för lagringskö.
spring.cloud.azure.storage.queue.sasToken String Sas-tokenautentiseringsuppgifter
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion som används när API-begäranden görs.
spring.cloud.azure.storage.queue.messageEncoding String Kodning av kömeddelanden.

Grundläggande användning

Skicka meddelanden till Azure Storage Queue

  1. Fyll i konfigurationsalternativen för autentiseringsuppgifter.

    • För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i filen application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Kommentar

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.

  • För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i filen application.yml :

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Kommentar

De värden som tillåts för tenant-id är: common, organizations, consumerseller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.

  1. Skapa DefaultMessageHandler med bönan StorageQueueTemplate för att skicka meddelanden till Lagringskö.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Skapa en meddelandegatewaybindning med meddelandehanteraren ovan via en meddelandekanal.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Skicka meddelanden med hjälp av gatewayen.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Ta emot meddelanden från Azure Storage Queue

  1. Fyll i konfigurationsalternativen för autentiseringsuppgifter.

  2. Skapa en böna med meddelandekanalen som indatakanal.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Skapa StorageQueueMessageSource med bönan StorageQueueTemplate för att ta emot meddelanden till Lagringskö.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Skapa en meddelandemottagarebindning med StorageQueueMessageSource som skapades i det sista steget via meddelandekanalen som vi skapade tidigare.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Exempel

Mer information finns i lagringsplatsen azure-spring-boot-samples på GitHub.