Spring Cloud Azure-stöd för Spring Integration
Den här artikeln gäller för: ✔️ Version 4.14.0 ✔️ Version 5.8.0
Spring Integration Extension för Azure tillhandahåller Spring Integration-kort för de olika tjänster som tillhandahålls av Azure SDK för Java. Vi tillhandahåller Spring Integration-stöd för dessa Azure-tjänster: Event Hubs, Service Bus, Storage Queue. Följande är en lista över kort som stöds:
spring-cloud-azure-starter-integration-eventhubs
- Mer information finns i Spring Integration med Azure Event Hubsspring-cloud-azure-starter-integration-servicebus
- Mer information finns i Spring Integration med Azure Service Busspring-cloud-azure-starter-integration-storage-queue
- Mer information finns i Spring Integration med Azure Storage Queue
Spring-integrering med Azure Event Hubs
Nyckelbegrepp
Azure Event Hubs är en stordataströmningsplattform och händelseinmatningstjänst. Den kan ta emot och behandla miljoner händelser per sekund. Data som skickas till en händelsehubb kan omvandlas och lagras med hjälp av valfri provider för realtidsanalys eller batchbearbetnings-/lagringsadaptrar.
Spring Integration möjliggör enklare meddelanden i Spring-baserade program och stöder integrering med externa system via deklarativa kort. Dessa kort ger en högre abstraktionsnivå jämfört med Spring-stöd för fjärrkommunikation, meddelanden och schemaläggning. Spring Integration for Event Hubs-tilläggsprojektet tillhandahåller inkommande och utgående kanalkort och gatewayer för Azure Event Hubs.
Kommentar
RxJava-stöd-API:er tas bort från version 4.0.0. Mer information finns i Javadoc.
Konsumentgrupp
Event Hubs har liknande stöd för konsumentgruppen som Apache Kafka, men med lite annan logik. Medan Kafka lagrar alla incheckade förskjutningar i asynkron meddelandekö måste du lagra förskjutningar av Event Hubs-meddelanden som bearbetas manuellt. Event Hubs SDK tillhandahåller funktionen för att lagra sådana förskjutningar i Azure Storage.
Stöd för partitionering
Event Hubs har ett liknande koncept för fysisk partition som Kafka. Men till skillnad från Kafkas automatiska ombalansering mellan konsumenter och partitioner tillhandahåller Event Hubs ett slags förebyggande läge. Lagringskontot fungerar som ett lån för att avgöra vilken partition som ägs av vilken konsument. När en ny konsument startar försöker den stjäla vissa partitioner från de flesta tunga användare för att uppnå arbetsbelastningsutjämningen.
För att ange belastningsutjämningsstrategin kan utvecklare använda EventHubsContainerProperties
för konfigurationen. I följande avsnitt finns ett exempel på hur du konfigurerar EventHubsContainerProperties
.
Stöd för Batch-konsument
Stöder EventHubsInboundChannelAdapter
batchförbrukningsläget. För att aktivera den kan användarna ange lyssnarläget som ListenerMode.BATCH
när de skapar en EventHubsInboundChannelAdapter
instans.
När det är aktiverat tas ett meddelande som nyttolasten är en lista över batchade händelser emot och skickas till den nedströmskanalen. Varje meddelanderubrik konverteras också som en lista, där innehållet är det associerade rubrikvärdet som parsas från varje händelse. För de gemensamma huvudena för partitions-ID, checkpointer och de senaste egenskaperna visas de som ett enda värde för hela batchen med händelser som delar samma. Mer information finns i avsnittet Meddelandehuvuden för Event Hubs.
Kommentar
Kontrollpunktsrubriken finns bara när MANUELLT kontrollpunktsläge används.
Kontrollpunkter för batchkonsumenten stöder två lägen: BATCH
och MANUAL
. BATCH
läget är ett läge för automatisk kontrollpunkt för att kontrollera hela batchen med händelser tillsammans när de har tagits emot. MANUAL
läge är att kontrollera händelserna av användare. När den används skickas Kontrollpunkten till meddelandehuvudet och användarna kan använda den för att göra kontrollpunkter.
Principen för batchförbrukning kan anges av egenskaperna max-size
och , där max-size
är en nödvändig egenskap medan max-wait-time
max-wait-time
den är valfri.
Utvecklare kan använda EventHubsContainerProperties
för konfigurationen för att ange den batchanvändningsstrategi som används. I följande avsnitt finns ett exempel på hur du konfigurerar EventHubsContainerProperties
.
Beroendekonfiguration
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Konfiguration
Den här startprogrammet innehåller följande tre delar av konfigurationsalternativen:
konfigurationsegenskaper för Anslut ion
Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Event Hubs.
Kommentar
Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs läser du Auktorisera åtkomst med Microsoft Entra-ID för att kontrollera att säkerhetsobjektet har beviljats tillräcklig behörighet för att få åtkomst till Azure-resursen.
Anslut ion konfigurerbara egenskaper för spring-cloud-azure-starter-integration-eventhubs:
Property | Type | Beskrivning |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolean | Om en Azure Event Hubs är aktiverad. |
spring.cloud.azure.eventhubs.connection-string | String | Event Hubs-namnområde anslutningssträng värde. |
spring.cloud.azure.eventhubs.namespace | String | Event Hubs-namnområdesvärde, som är prefixet för det fullständiga domännamnet. Ett FQDN ska bestå av NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | String | Domännamn för ett Namnområdesvärde för Azure Event Hubs. |
spring.cloud.azure.eventhubs.custom-endpoint-address | String | Anpassad slutpunktsadress. |
spring.cloud.azure.eventhubs.shared-connection | Booleskt | Om den underliggande EventProcessorClient och EventHubProducerAsyncClient använder samma anslutning. Som standard skapas en ny anslutning och används för varje händelsehubbklient som skapas. |
Konfigurationsegenskaper för kontrollpunkt
Det här avsnittet innehåller konfigurationsalternativen för Storage Blobs-tjänsten, som används för att bevara partitionsägarskap och kontrollpunktsinformation.
Kommentar
Från version 4.0.0, när egenskapen spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists inte aktiveras manuellt, skapas ingen Lagringscontainer automatiskt.
Kontrollpunktskonfigurerbara egenskaper för spring-cloud-azure-starter-integration-eventhubs:
Property | Type | Beskrivning |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Booleskt | Om du vill tillåta att containrar skapas om de inte finns. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | String | Namn på lagringskontot. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | String | Åtkomstnyckel för lagringskonto. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | String | Namn på lagringscontainer. |
Vanliga konfigurationsalternativ för Azure Service SDK kan även konfigureras för Lagringsblobkontrollpunktsarkiv. De konfigurationsalternativ som stöds introduceras i Spring Cloud Azure-konfigurationen och kan konfigureras med antingen det enhetliga prefixet spring.cloud.azure.
eller prefixet spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Konfigurationsegenskaper för Event Hub-processor
EventHubsInboundChannelAdapter
Använder EventProcessorClient
för att använda meddelanden från en händelsehubb för att konfigurera de övergripande egenskaperna för en EventProcessorClient
, som utvecklare kan använda EventHubsContainerProperties
för konfigurationen. Se följande avsnitt om hur du arbetar med EventHubsInboundChannelAdapter
.
Grundläggande användning
Skicka meddelanden till Azure Event Hubs
Fyll i konfigurationsalternativen för autentiseringsuppgifter.
För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i filen application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i filen application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Kommentar
De värden som tillåts för tenant-id
är: common
, organizations
, consumers
eller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.
Skapa
DefaultMessageHandler
med bönanEventHubsTemplate
för att skicka meddelanden till Event Hubs.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Skapa en meddelandegatewaybindning med meddelandehanteraren ovan via en meddelandekanal.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Skicka meddelanden med hjälp av gatewayen.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Ta emot meddelanden från Azure Event Hubs
Fyll i konfigurationsalternativen för autentiseringsuppgifter.
Skapa en böna med meddelandekanalen som indatakanal.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Skapa
EventHubsInboundChannelAdapter
med bönanEventHubsMessageListenerContainer
för att ta emot meddelanden från Event Hubs.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Skapa en meddelandemottagarebindning med EventHubsInboundChannelAdapter via meddelandekanalen som skapades tidigare.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Konfigurera EventHubsMessageConverter för att anpassa objectMapper
EventHubsMessageConverter
görs som en konfigurerbar böna så att användarna kan anpassa ObjectMapper.
Stöd för Batch-konsument
Att använda meddelanden från Event Hubs i batchar liknar exemplet ovan, förutom att användarna bör ange de batchkrävande relaterade konfigurationsalternativen för EventHubsInboundChannelAdapter
.
När du skapar EventHubsInboundChannelAdapter
ska lyssnarläget anges som BATCH
. När du skapar böna av EventHubsMessageListenerContainer
anger du kontrollpunktsläget som antingen MANUAL
eller BATCH
, och batchalternativen kan konfigureras efter behov.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Meddelandehuvuden för Event Hubs
I följande tabell visas hur event hubs-meddelandeegenskaper mappas till Spring-meddelandehuvuden. För Azure Event Hubs anropas meddelandet som event
.
Mappning mellan Event Hubs-meddelande/händelseegenskaper och Spring Message-huvuden i inspelningslyssningsläge:
Händelseegenskaper för Event Hubs | Spring Message-huvudkonstanter | Typ | Beskrivning |
---|---|---|---|
Köad tid | EventHubsHeaders#ENQUEUED_TIME | Direkt | Omedelbart, i UTC, när händelsen angavs i Event Hub-partitionen. |
Förskjutning | EventHubsHeaders#OFFSET | Long | Förskjutningen av händelsen när den togs emot från den associerade Event Hub-partitionen. |
Partitionsnyckel | AzureHeaders#PARTITION_KEY | String | Partitionens hashningsnyckel om den angavs när händelsen ursprungligen publicerades. |
Partitions-ID | AzureHeaders#RAW_PARTITION_ID | String | Partitions-ID för händelsehubben. |
Löpnummer | EventHubsHeaders#SEQUENCE_NUMBER | Long | Sekvensnumret som tilldelades händelsen när den angavs i den associerade Event Hub-partitionen. |
Senaste egenskaper för enqueued-händelse | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Egenskaperna för den senaste köade händelsen i den här partitionen. |
NA | AzureHeaders#CHECKPOINTER | Kontrollpunkt | Huvudet för kontrollpunkten det specifika meddelandet. |
Användare kan parsa meddelanderubrikerna för relaterad information om varje händelse. Om du vill ange ett meddelandehuvud för händelsen placeras alla anpassade rubriker som en programegenskap för en händelse, där rubriken anges som egenskapsnyckel. När händelser tas emot från Event Hubs konverteras alla programegenskaper till meddelandehuvudet.
Kommentar
Meddelandehuvuden för partitionsnyckel, köad tid, förskjutning och sekvensnummer stöds inte för att ställas in manuellt.
När batch-konsumentläget är aktiverat visas de specifika rubrikerna för batchmeddelanden på följande sätt, som innehåller en lista med värden från varje händelse i Händelsehubbar.
Mappning mellan Event Hubs-meddelande/händelseegenskaper och Spring Message-huvuden i Batch-lyssnarläge:
Händelseegenskaper för Event Hubs | Spring Batch-meddelandehuvudkonstanter | Typ | Beskrivning |
---|---|---|---|
Köad tid | EventHubsHeaders#ENQUEUED_TIME | Lista över snabbmeddelanden | Lista över ögonblicket, i UTC, över när varje händelse angavs i Event Hub-partitionen. |
Förskjutning | EventHubsHeaders#OFFSET | Lista över långa | Lista över förskjutningen för varje händelse när den togs emot från den associerade Event Hub-partitionen. |
Partitionsnyckel | AzureHeaders#PARTITION_KEY | Lista över sträng | Lista över partitionshashingnyckeln om den angavs när varje händelse ursprungligen publicerades. |
Löpnummer | EventHubsHeaders#SEQUENCE_NUMBER | Lista över långa | Lista över sekvensnumret som tilldelades varje händelse när den angavs i den associerade Event Hub-partitionen. |
Systemegenskaper | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Lista över karta | Lista över systemegenskaperna för varje händelse. |
Egenskaper för program | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Lista över karta | Lista över programegenskaperna för varje händelse, där alla anpassade meddelandehuvuden eller händelseegenskaper placeras. |
Kommentar
När du publicerar meddelanden tas alla batchhuvuden ovan bort från meddelandena om de finns.
Exempel
Mer information finns i lagringsplatsen azure-spring-boot-samples på GitHub.
Spring-integrering med Azure Service Bus
Nyckelbegrepp
Spring Integration möjliggör enklare meddelanden i Spring-baserade program och stöder integrering med externa system via deklarativa kort.
Spring Integration for Azure Service Bus-tilläggsprojektet tillhandahåller inkommande och utgående kanalkort för Azure Service Bus.
Kommentar
CompletableFuture-stöd-API:er har föråldrats från version 2.10.0 och ersätts av Reactor Core från version 4.0.0. Mer information finns i Javadoc.
Beroendekonfiguration
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Konfiguration
Den här startprogrammet innehåller följande två delar av konfigurationsalternativen:
konfigurationsegenskaper för Anslut ion
Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Service Bus.
Kommentar
Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs läser du Auktorisera åtkomst med Microsoft Entra-ID för att kontrollera att säkerhetsobjektet har beviljats tillräcklig behörighet för att få åtkomst till Azure-resursen.
Anslut ion konfigurerbara egenskaper för spring-cloud-azure-starter-integration-servicebus:
Property | Type | Beskrivning |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolean | Om en Azure Service Bus är aktiverad. |
spring.cloud.azure.servicebus.connection-string | String | Service Bus-namnområde anslutningssträng värde. |
spring.cloud.azure.servicebus.namespace | String | Service Bus-namnområdesvärde, som är prefixet för FQDN. Ett FQDN ska bestå av NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | String | Domännamn för ett Azure Service Bus-namnområdesvärde. |
Konfigurationsegenskaper för Service Bus-processor
ServiceBusInboundChannelAdapter
Använder ServiceBusProcessorClient
för att använda meddelanden, för att konfigurera de övergripande egenskaperna för en ServiceBusProcessorClient
, kan utvecklare använda ServiceBusContainerProperties
för konfigurationen. Se följande avsnitt om hur du arbetar med ServiceBusInboundChannelAdapter
.
Grundläggande användning
Skicka meddelanden till Azure Service Bus
Fyll i konfigurationsalternativen för autentiseringsuppgifter.
För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i filen application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Kommentar
De värden som tillåts för tenant-id
är: common
, organizations
, consumers
eller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.
För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i filen application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Kommentar
De värden som tillåts för tenant-id
är: common
, organizations
, consumers
eller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.
Skapa
DefaultMessageHandler
med bönanServiceBusTemplate
för att skicka meddelanden till Service Bus och ange entitetstypen för ServiceBusTemplate. Det här exemplet tar Service Bus Queue som exempel.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Skapa en meddelandegatewaybindning med meddelandehanteraren ovan via en meddelandekanal.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Skicka meddelanden med hjälp av gatewayen.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Ta emot meddelanden från Azure Service Bus
Fyll i konfigurationsalternativen för autentiseringsuppgifter.
Skapa en böna med meddelandekanalen som indatakanal.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Skapa
ServiceBusInboundChannelAdapter
med bönanServiceBusMessageListenerContainer
för att ta emot meddelanden till Service Bus. Det här exemplet tar Service Bus Queue som exempel.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Skapa en meddelandemottagarebindning med
ServiceBusInboundChannelAdapter
via meddelandekanalen som vi skapade tidigare.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Konfigurera ServiceBusMessageConverter för att anpassa objectMapper
ServiceBusMessageConverter
görs som en konfigurerbar böna så att användarna kan anpassa ObjectMapper
.
Service Bus-meddelandehuvuden
För vissa Service Bus-huvuden som kan mappas till flera Spring-huvudkonstanter visas prioriteten för olika Spring-huvuden.
Mappning mellan Service Bus-huvuden och Spring-huvuden:
Service Bus-meddelandehuvuden och egenskaper | Spring-meddelandehuvudkonstanter | Typ | Konfigureringsbart | beskrivning |
---|---|---|---|---|
Content type | MessageHeaders#CONTENT_TYPE |
String | Ja | Beskrivningen RFC2045 innehållstyp för meddelandet. |
Korrelations-ID | ServiceBusMessageHeaders#CORRELATION_ID |
String | Ja | Korrelations-ID för meddelandet |
Meddelande-ID | ServiceBusMessageHeaders#MESSAGE_ID |
String | Ja | Meddelande-ID:t för meddelandet, det här huvudet har högre prioritet än MessageHeaders#ID . |
Meddelande-ID | MessageHeaders#ID |
UUID | Ja | Meddelandets meddelande-ID, det här huvudet har lägre prioritet än ServiceBusMessageHeaders#MESSAGE_ID . |
Partitionsnyckel | ServiceBusMessageHeaders#PARTITION_KEY |
String | Ja | Partitionsnyckeln för att skicka meddelandet till en partitionerad entitet. |
Svara till | MessageHeaders#REPLY_CHANNEL |
String | Ja | Adressen till en entitet som svar ska skickas till. |
Svara på sessions-ID | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
String | Ja | Egenskapen ReplyToGroupId för meddelandet. |
Schemalagd kötidsutc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Ja | Den datumtid då meddelandet ska anges i Service Bus har den här rubriken högre prioritet än AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Schemalagd kötidsutc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Integer | Ja | Den datumtid då meddelandet ska anges i Service Bus har den här rubriken lägre prioritet än ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
Sessions-ID | ServiceBusMessageHeaders#SESSION_ID |
String | Ja | Sessions-IDentifier för en sessionsmedveten entitet. |
Time to live | ServiceBusMessageHeaders#TIME_TO_LIVE |
Varaktighet | Ja | Hur lång tid det tar innan det här meddelandet upphör att gälla. |
To | ServiceBusMessageHeaders#TO |
String | Ja | Meddelandets "till"-adress, reserverad för framtida användning i routningsscenarier och som för närvarande ignoreras av själva koordinatorn. |
Subject | ServiceBusMessageHeaders#SUBJECT |
String | Ja | Ämnet för meddelandet. |
Beskrivning av fel med obeställbara bokstäver | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
String | Nej | Beskrivningen av ett meddelande som har fått en obeställd bokstav. |
Orsak till död bokstav | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
String | Nej | Anledningen till att ett meddelande var obeställt. |
Källa med obeställbara bokstäver | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
String | Nej | Entiteten där meddelandet var obeställt. |
Antal leveranser | ServiceBusMessageHeaders#DELIVERY_COUNT |
lång | Nej | Antalet gånger meddelandet levererades till klienter. |
Kodat sekvensnummer | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
lång | Nej | Det kodade sekvensnumret som tilldelats ett meddelande av Service Bus. |
Köad tid | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Nej | Den datetime då det här meddelandet angavs i Service Bus. |
Upphör att gälla kl. | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Nej | Den datumtid då det här meddelandet upphör att gälla. |
Lås token | ServiceBusMessageHeaders#LOCK_TOKEN |
String | Nej | Låstoken för det aktuella meddelandet. |
Låst tills | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Nej | Den datumtid då låset för det här meddelandet upphör att gälla. |
Löpnummer | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
lång | Nej | Det unika nummer som tilldelats ett meddelande av Service Bus. |
Tillstånd | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Nej | Tillståndet för meddelandet, som kan vara Aktiv, Uppskjuten eller Schemalagd. |
Stöd för partitionsnyckel
Den här startern stöder Service Bus-partitionering genom att tillåta inställning av partitionsnyckel och sessions-ID i meddelandehuvudet. I det här avsnittet beskrivs hur du anger partitionsnyckel för meddelanden.
Rekommenderas: Använd ServiceBusMessageHeaders.PARTITION_KEY
som nyckel i rubriken.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Rekommenderas inte men stöds för närvarande:AzureHeaders.PARTITION_KEY
som nyckeln i rubriken.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Kommentar
När både ServiceBusMessageHeaders.PARTITION_KEY
och AzureHeaders.PARTITION_KEY
anges i meddelandehuvudena föredras ServiceBusMessageHeaders.PARTITION_KEY
.
Sessionsstöd
Det här exemplet visar hur du manuellt anger sessions-ID för ett meddelande i programmet.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Kommentar
ServiceBusMessageHeaders.SESSION_ID
När anges i meddelanderubrikerna och ett annat ServiceBusMessageHeaders.PARTITION_KEY
huvud också anges, används värdet för sessions-ID:t så småningom för att skriva över värdet för partitionsnyckeln.
Exempel
Mer information finns i lagringsplatsen azure-spring-boot-samples på GitHub.
Spring-integrering med Azure Storage Queue
Nyckelbegrepp
Azure Queue Storage är en tjänst för lagring av ett stort antal meddelanden. Du kommer åt meddelanden var som helst i världen via autentiserade anrop med HTTP eller HTTPS. Ett kömeddelande kan vara upp till 64 KB stort. En kö kan innehålla miljontals meddelanden, upp till den totala kapacitetsgränsen för ett lagringskonto. Köer används ofta för att skapa en kvarvarande arbetslogg för att bearbeta asynkront.
Beroendekonfiguration
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Konfiguration
Den här startprogrammet innehåller följande konfigurationsalternativ:
konfigurationsegenskaper för Anslut ion
Det här avsnittet innehåller de konfigurationsalternativ som används för att ansluta till Azure Storage Queue.
Kommentar
Om du väljer att använda ett säkerhetsobjekt för att autentisera och auktorisera med Microsoft Entra-ID för åtkomst till en Azure-resurs läser du Auktorisera åtkomst med Microsoft Entra-ID för att kontrollera att säkerhetsobjektet har beviljats tillräcklig behörighet för att få åtkomst till Azure-resursen.
Anslut ion konfigurerbara egenskaper för spring-cloud-azure-starter-integration-storage-queue:
Property | Type | Beskrivning |
---|---|---|
spring.cloud.azure.storage.queue.enabled | boolean | Om en Azure Storage-kö är aktiverad. |
spring.cloud.azure.storage.queue.connection-string | String | Lagringskönamnområde anslutningssträng värde. |
spring.cloud.azure.storage.queue.accountName | String | Namn på lagringskökonto. |
spring.cloud.azure.storage.queue.accountKey | String | Kontonyckel för lagringskö. |
spring.cloud.azure.storage.queue.endpoint | String | Tjänstslutpunkt för lagringskö. |
spring.cloud.azure.storage.queue.sasToken | String | Sas-tokenautentiseringsuppgifter |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion som används när API-begäranden görs. |
spring.cloud.azure.storage.queue.messageEncoding | String | Kodning av kömeddelanden. |
Grundläggande användning
Skicka meddelanden till Azure Storage Queue
Fyll i konfigurationsalternativen för autentiseringsuppgifter.
För autentiseringsuppgifter som anslutningssträng konfigurerar du följande egenskaper i filen application.yml:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
För autentiseringsuppgifter som hanterade identiteter konfigurerar du följande egenskaper i filen application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Kommentar
De värden som tillåts för tenant-id
är: common
, organizations
, consumers
eller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.
För autentiseringsuppgifter som tjänstens huvudnamn konfigurerar du följande egenskaper i filen application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Kommentar
De värden som tillåts för tenant-id
är: common
, organizations
, consumers
eller klientorganisations-ID. Mer information om dessa värden finns i avsnittet Använd fel slutpunkt (personliga konton och organisationskonton) i Fel AADSTS50020 – Användarkonto från identitetsprovidern finns inte i klientorganisationen. Information om hur du konverterar din app för en klientorganisation finns i Konvertera en klientorganisationsapp till flera klientorganisationer på Microsoft Entra-ID.
Skapa
DefaultMessageHandler
med bönanStorageQueueTemplate
för att skicka meddelanden till Lagringskö.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Skapa en meddelandegatewaybindning med meddelandehanteraren ovan via en meddelandekanal.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Skicka meddelanden med hjälp av gatewayen.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Ta emot meddelanden från Azure Storage Queue
Fyll i konfigurationsalternativen för autentiseringsuppgifter.
Skapa en böna med meddelandekanalen som indatakanal.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Skapa
StorageQueueMessageSource
med bönanStorageQueueTemplate
för att ta emot meddelanden till Lagringskö.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Skapa en meddelandemottagarebindning med StorageQueueMessageSource som skapades i det sista steget via meddelandekanalen som vi skapade tidigare.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Exempel
Mer information finns i lagringsplatsen azure-spring-boot-samples på GitHub.