Jegyzet
Az oldalhoz való hozzáférés engedélyezést igényel. Próbálhatod be jelentkezni vagy könyvtárat váltani.
Az oldalhoz való hozzáférés engedélyezést igényel. Megpróbálhatod a könyvtár váltását.
Az Azure Spring Integration Extension spring integration adaptereket biztosít a Azure SDK for Javakülönböző szolgáltatásaihoz. Spring Integration-támogatást biztosítunk ezekhez az Azure-szolgáltatásokhoz: Event Hubs, Service Bus, Storage Queue. A támogatott adapterek listája a következő:
-
spring-cloud-azure-starter-integration-eventhubs– további információ: Spring Integration with Azure Event Hubs -
spring-cloud-azure-starter-integration-servicebus– további információ: Spring Integration with Azure Service Bus -
spring-cloud-azure-starter-integration-storage-queue– további információ: Spring Integration with Azure Storage Queue
Tavaszi integráció az Azure Event Hubsszal
Főbb fogalmak
Az Azure Event Hubs egy big data streamelési platform és eseménybetöltési szolgáltatás. Másodpercenként több millió esemény fogadására és feldolgozására képes. Az eseményközpontba küldött adatok átalakíthatók és tárolhatók bármely valós idejű elemzési szolgáltató vagy kötegelési/tárolási adapter használatával.
A Spring Integration lehetővé teszi az egyszerű üzenetkezelést a Spring-alapú alkalmazásokban, és deklaratív adaptereken keresztül támogatja a külső rendszerekkel való integrációt. Ezek az adapterek magasabb szintű absztrakciót biztosítanak a Spring újraegyeztetési, üzenetkezelési és ütemezési támogatásával szemben. Az Spring Integration for Event Hubs bővítményprojekt bejövő és kimenő csatornaadaptereket és átjárókat biztosít az Azure Event Hubshoz.
Jegyzet
Az RxJava támogatási API-k a 4.0.0-s verzióról törlődnek. Részletekért lásd: Javadoc.
Fogyasztói csoport
Az Event Hubs hasonló támogatást nyújt a fogyasztói csoportoknak, mint az Apache Kafka, de kismértékben eltérő logikával. Bár a Kafka az összes véglegesített eltolást a közvetítőben tárolja, a manuálisan feldolgozott Event Hubs-üzenetek eltolásait kell tárolnia. Az Event Hubs SDK az ilyen eltolások Azure Storage-ban való tárolására szolgál.
Particionálás támogatása
Az Event Hubs a Kafkához hasonló fizikai partíciót biztosít. A Kafka felhasználói és partíciói közötti automatikus újraelosztással ellentétben azonban az Event Hubs egyfajta megelőző módot biztosít. A tárfiók bérletként működik annak meghatározásához, hogy melyik partíció melyik felhasználó tulajdonában van. Amikor egy új felhasználó elindul, megpróbál ellopni néhány partíciót a legtöbb nehéz terhelésű felhasználótól a számítási feladatok kiegyensúlyozása érdekében.
A terheléselosztási stratégia megadásához a fejlesztők EventHubsContainerProperties használhatnak a konfigurációhoz. A
Batch fogyasztói támogatás
A EventHubsInboundChannelAdapter támogatja a kötegfogyasztó módot. Az engedélyezéshez a felhasználók ListenerMode.BATCH figyelő módot adhatnak meg EventHubsInboundChannelAdapter-példányok létrehozásakor.
Ha engedélyezve van, egy üzenet, amelynek hasznos adata a kötegelt események listája, a rendszer megkapja és átadja az alsóbb rétegbeli csatornának. Az egyes üzenetfejlécek is listaként lesznek konvertálva, amelyek tartalma az egyes eseményekhez tartozó fejlécérték. A partícióazonosító, az ellenőrzőpont és az utolsó lekérdezett tulajdonságok közösségi fejlécei egyetlen értékként jelennek meg a teljes eseménykötegben, amelyek ugyanazt a tulajdonságot használják. További információ: Event Hubs-üzenetfejlécek szakasz.
Jegyzet
Az ellenőrzőpont fejléce csak akkor létezik, ha MANUÁLIS ellenőrzőpont mód van használatban.
A kötegfelhasználók ellenőrzőpontozása két módot támogat: BATCH és MANUAL.
BATCH mód egy automatikus ellenőrzőpontozási mód, amely az események teljes kötegét együttesen ellenőrzi a beérkezés után.
MANUAL mód az események felhasználók általi ellenőrzésére. Használat esetén a Ellenőrzőpont az üzenet fejlécébe kerül, és a felhasználók használhatják az ellenőrzőpont-ellenőrzést.
A kötegfogyasztó házirendet max-size és max-wait-timetulajdonságai adhatja meg, ahol a max-size szükséges tulajdonság, míg a max-wait-time nem kötelező.
A köteghasználati stratégia megadásához a fejlesztők EventHubsContainerProperties használhatnak a konfigurációhoz. A
Függőség beállítása
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Konfiguráció
Ez az alapindító a következő 3 konfigurációs lehetőséget biztosítja:
Kapcsolatkonfiguráció tulajdonságai
Ez a szakasz az Azure Event Hubshoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.
Jegyzet
Ha biztonsági tagot használ egy Azure-erőforrás eléréséhez a Microsoft Entra-azonosítóval történő hitelesítéshez és engedélyezéshez, tekintse meg Hozzáférés engedélyezése a Microsoft Entra-azonosítóval annak biztosításához, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.
A spring-cloud-azure-starter-integration-eventhubs kapcsolatkonfigurálható tulajdonságai:
| Ingatlan | Típus | Leírás |
|---|---|---|
| spring.cloud.azure.eventhubs.enabled | Logikai | Az Azure Event Hubs engedélyezése. |
| spring.cloud.azure.eventhubs.connection-string | Húr | Event Hubs-névtér kapcsolati sztringértéke. |
| spring.cloud.azure.eventhubs.namespace | Húr | Event Hubs-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie |
| spring.cloud.azure.eventhubs.tartománynév | Húr | Egy Azure Event Hubs-névtérérték tartományneve. |
| spring.cloud.azure.eventhubs.custom-endpoint-address | Húr | Egyéni végpont címe. |
| spring.cloud.azure.eventhubs.shared-connection | Logikai | Azt, hogy a mögöttes EventProcessorClient és az EventHubProducerAsyncClient ugyanazt a kapcsolatot használja-e. Alapértelmezés szerint minden létrehozott Event Hub-ügyfélhez új kapcsolat jön létre és jön létre. |
Ellenőrzőpont konfigurációs tulajdonságai
Ez a szakasz a Storage Blobs szolgáltatás konfigurációs beállításait tartalmazza, amely a partíció tulajdonjogának és ellenőrzőpont-adatainak megőrzésére szolgál.
Jegyzet
Ha a 4.0.0-s verzióban a spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tulajdonság nincs manuálisan engedélyezve, a rendszer nem hoz létre automatikusan storage-tárolót.
A spring-cloud-azure-starter-integration-eventhubs konfigurálható tulajdonságainak ellenőrzése:
| Ingatlan | Típus | Leírás |
|---|---|---|
| spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Logikai | Engedélyezi-e a tárolók létrehozását, ha nem létezik. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Húr | A tárfiók neve. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Húr | Tárfiók hozzáférési kulcsa. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Húr | Tároló neve. |
Az Azure Service SDK gyakori konfigurációs beállításai a Storage Blob CheckPoint Store-hoz is konfigurálhatók. A támogatott konfigurációs beállítások Spring Cloud Azure-konfigurációsjelennek meg, és konfigurálhatók az egyesített spring.cloud.azure. előtaggal vagy a spring.cloud.azure.eventhubs.processor.checkpoint-storeelőtagjával.
Az Event Hub processzorkonfigurációs tulajdonságai
A EventHubsInboundChannelAdapter a EventProcessorClient használja egy eseményközpont üzeneteinek felhasználására, egy EventProcessorClientáltalános tulajdonságainak konfigurálására, a fejlesztők EventHubsContainerProperties használhatnak a konfigurációhoz. A
Alapszintű használat
Üzenetek küldése az Azure Event Hubsba
Adja meg a hitelesítő adatok konfigurációs beállításait.
A hitelesítő adatok kapcsolati sztringként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}Jegyzet
A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.
A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}A szolgáltatásnévként megadott hitelesítő adatokhoz konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Jegyzet
A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.
Hozzon létre
DefaultMessageHandleraEventHubsTemplatebabbal, hogy üzeneteket küldjön az Event Hubsnak.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }Hozzon létre egy üzenetátjáró-kötést a fenti üzenetkezelővel egy üzenetcsatornán keresztül.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }Üzenetek küldése az átjáróval.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Üzenetek fogadása az Azure Event Hubsból
Adja meg a hitelesítő adatok konfigurációs beállításait.
Hozzon létre egy bab üzenetcsatornát bemeneti csatornaként.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }Hozzon létre
EventHubsInboundChannelAdapteraEventHubsMessageListenerContainerbab használatával az Event Hubstól érkező üzenetek fogadásához.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }Hozzon létre egy üzenet fogadó kötést az EventHubsInboundChannelAdapterrel a korábban létrehozott üzenetcsatornán keresztül.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Az EventHubsMessageConverter konfigurálása az objectMapper testreszabásához
EventHubsMessageConverter konfigurálható babként készült, amely lehetővé teszi a felhasználók számára az ObjectMapper testreszabását.
Batch fogyasztói támogatás
Az Event Hubsból származó üzenetek kötegekben való felhasználása hasonló a fenti mintához, a felhasználókon kívül meg kell adniuk a kötegfelhasználókhoz kapcsolódó konfigurációs beállításokat EventHubsInboundChannelAdapter.
A EventHubsInboundChannelAdapterlétrehozásakor a figyelő üzemmódot BATCHkell beállítani. A EventHubsMessageListenerContainerlétrehozásakor állítsa be az ellenőrzőpont üzemmódot MANUAL vagy BATCH, és a kötegbeállítások igény szerint konfigurálhatók.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Event Hubs-üzenetfejlécek
Az alábbi táblázat bemutatja, hogyan vannak leképezve az Event Hubs üzenettulajdonságai a Spring-üzenetfejlécekre. Az Azure Event Hubs esetében az üzenet neve event.
Leképezés az Event Hubs-üzenet/eseménytulajdonságok és a spring message fejlécek között rekordfigyelő módban:
| Az Event Hubs eseménytulajdonságai | Tavaszi üzenetfejléc-állandók | Típus | Leírás |
|---|---|---|---|
| Várólista idő | EventHubsHeaders#ENQUEUED_TIME | Azonnali | Az eseménynek az Event Hub partícióban való leküldésének pillanata UTC-ben. |
| Ellensúlyoz | EventHubsHeaders#ELTOLÁS | Hosszú | Az esemény eltolása, amikor a társított Event Hub-partícióról érkezett. |
| Partíciókulcs | AzureHeaders#PARTITION_KEY | Húr | A partíció kivonatolási kulcsa, ha az esemény eredeti közzétételekor lett beállítva. |
| Partícióazonosító | AzureHeaders#RAW_PARTITION_ID | Húr | Az Event Hub partícióazonosítója. |
| Sorozatszáma | EventHubsHeaders#SEQUENCE_NUMBER | Hosszú | Az eseményhez rendelt sorszám, amikor a társított Event Hub-partícióban lekérdezték. |
| Utolsó enqueued eseménytulajdonságok | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | A partíció utolsó lekérdezett eseményének tulajdonságai. |
| N/A | AzureHeaders#CHECKPOINTER | Ellenőrzőpont | Az adott üzenet ellenőrzőpontjának fejléce. |
A felhasználók elemezhetik az egyes események kapcsolódó információinak üzenetfejléceit. Az esemény üzenetfejlécének beállításához a program az összes testreszabott fejlécet egy esemény alkalmazástulajdonságaként helyezi el, ahol a fejléc tulajdonságkulcsként van beállítva. Amikor események érkeznek az Event Hubstól, az összes alkalmazástulajdonság az üzenetfejlécre lesz konvertálva.
Jegyzet
A partíciókulcs, a lekérdezett idő, az eltolás és a sorszám üzenetfejlécei nem használhatók manuálisan.
Ha engedélyezve van a batch-consumer mód, a kötegelt üzenetek adott fejlécei a következők, amelyek az egyes Event Hubs-események értékeinek listáját tartalmazzák.
Leképezés az Event Hubs-üzenet/eseménytulajdonságok és a Spring Message-fejlécek között Batch-figyelő módban:
| Az Event Hubs eseménytulajdonságai | Spring Batch üzenetfejléc-állandók | Típus | Leírás |
|---|---|---|---|
| Várólista idő | EventHubsHeaders#ENQUEUED_TIME | Azonnali lista | Annak a pillanatnak a listája (UTC), amikor az egyes eseményeket az Event Hub partícióba foglalták. |
| Ellensúlyoz | EventHubsHeaders#ELTOLÁS | Hosszú lista | Az egyes események eltolásának listája a társított Event Hub-partícióról való fogadáskor. |
| Partíciókulcs | AzureHeaders#PARTITION_KEY | Sztringek listája | A partíció kivonatolási kulcsának listája, ha az az egyes események eredeti közzétételekor lett beállítva. |
| Sorozatszáma | EventHubsHeaders#SEQUENCE_NUMBER | Hosszú lista | Az egyes eseményekhez rendelt sorszámok listája, amikor az eseményt a társított Event Hub-partícióban lekérdezték. |
| Rendszertulajdonságok | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Térképlista | Az egyes események rendszertulajdonságainak listája. |
| Alkalmazástulajdonságok | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Térképlista | Az egyes események alkalmazástulajdonságainak listája, ahol az összes testreszabott üzenetfejléc vagy eseménytulajdonság el lesz helyezve. |
Jegyzet
Üzenetek közzétételekor a fenti kötegfejlécek el lesznek távolítva az üzenetekből, ha léteznek.
Minták
További információ: azure-spring-boot-samples adattár a GitHubon.
Spring integráció az Azure Service Bus-szal
Főbb fogalmak
A Spring Integration lehetővé teszi az egyszerű üzenetkezelést a Spring-alapú alkalmazásokban, és deklaratív adaptereken keresztül támogatja a külső rendszerekkel való integrációt.
Az Azure Service Bus Spring Integration bővítményprojektje bejövő és kimenő csatornaadaptereket biztosít az Azure Service Bushoz.
Jegyzet
Az CompletableFuture támogatási API-k elavultak a 2.10.0-s verzióról, és a Reactor Core a 4.0.0-s verzióról váltja fel. Részletekért lásd: Javadoc.
Függőség beállítása
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Konfiguráció
Ez az alapindító a következő 2 konfigurációs lehetőséget biztosítja:
Kapcsolatkonfiguráció tulajdonságai
Ez a szakasz az Azure Service Bushoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.
Jegyzet
Ha biztonsági tagot használ egy Azure-erőforrás eléréséhez a Microsoft Entra-azonosítóval történő hitelesítéshez és engedélyezéshez, tekintse meg Hozzáférés engedélyezése a Microsoft Entra-azonosítóval annak biztosításához, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.
A spring-cloud-azure-starter-integration-servicebus kapcsolatkonfigurálható tulajdonságai:
| Ingatlan | Típus | Leírás |
|---|---|---|
| spring.cloud.azure.servicebus.enabled | Logikai | Az Azure Service Bus engedélyezése. |
| spring.cloud.azure.servicebus.connection-string | Húr | Service Bus-névtér kapcsolati sztringértéke. |
| spring.cloud.azure.servicebus.custom-endpoint-address | Húr | A Service Bushoz való csatlakozáskor használandó egyéni végpontcím. |
| spring.cloud.azure.servicebus.namespace | Húr | Service Bus-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie |
| spring.cloud.azure.servicebus.domain-name | Húr | Egy Azure Service Bus-névtérérték tartományneve. |
A Service Bus processzor konfigurációs tulajdonságai
A ServiceBusInboundChannelAdapter a ServiceBusProcessorClient használja az üzenetek felhasználására, egy ServiceBusProcessorClientáltalános tulajdonságainak konfigurálására, a fejlesztők pedig ServiceBusContainerProperties használhatnak a konfigurációhoz. A
Alapszintű használat
Üzenetek küldése az Azure Service Busba
Adja meg a hitelesítő adatok konfigurációs beállításait.
A hitelesítő adatok kapcsolati sztringként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}Jegyzet
A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.
A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Jegyzet
A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.
A szolgáltatásnévként megadott hitelesítő adatokhoz konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Jegyzet
A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.
Hozzon létre
DefaultMessageHandleraServiceBusTemplatebeannel, hogy üzeneteket küldjön a Service Busnak, állítsa be a ServiceBusTemplate entitástípusát. Ez a minta a Service Bus-üzenetsort veszi példaként.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }Hozzon létre egy üzenetátjáró-kötést a fenti üzenetkezelővel egy üzenetcsatornán keresztül.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }Üzenetek küldése az átjáróval.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Üzenetek fogadása az Azure Service Busból
Adja meg a hitelesítő adatok konfigurációs beállításait.
Hozzon létre egy bab üzenetcsatornát bemeneti csatornaként.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }Hozzon létre
ServiceBusInboundChannelAdapteraServiceBusMessageListenerContainerbabbal a Service Busba érkező üzenetek fogadásához. Ez a minta a Service Bus-üzenetsort veszi példaként.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }Hozzon létre egy üzenet fogadó kötést
ServiceBusInboundChannelAdaptera korábban létrehozott üzenetcsatornán keresztül.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
A ServiceBusMessageConverter konfigurálása az objectMapper testreszabásához
ServiceBusMessageConverter konfigurálható babként készült, amely lehetővé teszi a felhasználók számára a ObjectMappertestreszabását.
Service Bus-üzenetfejlécek
Egyes Service Bus-fejlécek esetében, amelyek több Spring-fejlécállandóra is leképezhetők, a különböző Spring-fejlécek prioritása megjelenik a listában.
Leképezés a Service Bus-fejlécek és a spring fejlécek között:
| Service Bus-üzenetfejlécek és -tulajdonságok | Tavaszi üzenetfejléc-állandók | Típus | Konfigurálható | Leírás |
|---|---|---|---|---|
| Tartalomtípus | MessageHeaders#CONTENT_TYPE |
Húr | Igen | Az üzenet RFC2045 tartalomtípus-leírója. |
| Korrelációs azonosító | ServiceBusMessageHeaders#CORRELATION_ID |
Húr | Igen | Az üzenet korrelációs azonosítója |
| Üzenetazonosító | ServiceBusMessageHeaders#MESSAGE_ID |
Húr | Igen | Az üzenet üzenetazonosítója magasabb prioritással rendelkezik, mint MessageHeaders#ID. |
| Üzenetazonosító | MessageHeaders#ID |
UUID (Univerzálisan Egyedi Azonosító) | Igen | Az üzenet üzenetazonosítója alacsonyabb prioritással rendelkezik, mint ServiceBusMessageHeaders#MESSAGE_ID. |
| Partíciókulcs | ServiceBusMessageHeaders#PARTITION_KEY |
Húr | Igen | Az üzenet particionált entitásnak való küldéséhez használt partíciókulcs. |
| Válasz erre: | MessageHeaders#REPLY_CHANNEL |
Húr | Igen | Egy entitás címe, amelybe válaszokat szeretne küldeni. |
| Válasz a munkamenet-azonosítóra | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Húr | Igen | Az üzenet ReplyToGroupId tulajdonságértéke. |
| Ütemezett beiktatás időpontja utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Igen | Az a dátum, amikor az üzenetet le kell venni a Service Busban, ez a fejléc nagyobb prioritással rendelkezik, mint AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE. |
| Ütemezett beiktatás időpontja utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Egész szám | Igen | Az a dátum, amikor az üzenetet be kell helyezni a Service Busban, ez a fejléc alacsonyabb prioritással rendelkezik, mint ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME. |
| Munkamenet-azonosító | ServiceBusMessageHeaders#SESSION_ID |
Húr | Igen | A munkamenet-vezérelt entitás munkamenet-IDentifier-azonosítója. |
| Az élethez való idő | ServiceBusMessageHeaders#TIME_TO_LIVE |
Időtartam | Igen | Az üzenet lejárata előtti időtartam. |
| Hoz | ServiceBusMessageHeaders#TO |
Húr | Igen | Az üzenet "címzett" címe, amely az útválasztási forgatókönyvekben való jövőbeli használatra van fenntartva, és amelyet jelenleg maga a közvetítő figyelmen kívül hagy. |
| Tárgy | ServiceBusMessageHeaders#SUBJECT |
Húr | Igen | Az üzenet tárgya. |
| Holt betű hiba leírása | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Húr | Nem | Egy halott betűs üzenet leírása. |
| Holt betű oka | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Húr | Nem | Az oka annak, hogy egy üzenet elhalt betűs volt. |
| Holt betűforrás | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Húr | Nem | Az az entitás, amelyben az üzenet elhalt betűs volt. |
| Kézbesítések száma | ServiceBusMessageHeaders#DELIVERY_COUNT |
hosszú | Nem | Az üzenet ügyfeleknek való kézbesítésének száma. |
| Enqueued sorszám | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
hosszú | Nem | A Service Bus által egy üzenethez hozzárendelt lekérdezett sorszám. |
| Várólista idő | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Nem | Az az időpont, amikor az üzenet be lett iktatásra a Service Busban. |
| A lejárat dátuma: | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Nem | Az az időpont, amikor az üzenet lejár. |
| Jogkivonat zárolása | ServiceBusMessageHeaders#LOCK_TOKEN |
Húr | Nem | Az aktuális üzenet zárolási jogkivonata. |
| Zárolva, amíg | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Nem | Az az időpont, amikor az üzenet zárolása lejár. |
| Sorozatszáma | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
hosszú | Nem | A Service Bus által egy üzenethez rendelt egyedi szám. |
| Állam | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Nem | Az üzenet állapota, amely lehet aktív, késleltetett vagy ütemezett. |
Partíciókulcs támogatása
Ez a kezdő támogatja Service Bus particionálási azáltal, hogy engedélyezi a partíciókulcs és a munkamenet-azonosító beállítását az üzenetfejlécben. Ez a szakasz bemutatja, hogyan állíthat be partíciókulcsot az üzenetekhez.
Ajánlott: Használja a ServiceBusMessageHeaders.PARTITION_KEY a fejléc kulcsaként.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nem ajánlott, de jelenleg támogatott: AzureHeaders.PARTITION_KEY a fejléc kulcsaként.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Jegyzet
Ha ServiceBusMessageHeaders.PARTITION_KEY és AzureHeaders.PARTITION_KEY is be van állítva az üzenetfejlécekben, ServiceBusMessageHeaders.PARTITION_KEY előnyben részesítjük.
Munkamenet-támogatás
Ez a példa bemutatja, hogyan állíthatja be manuálisan egy üzenet munkamenet-azonosítóját az alkalmazásban.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Jegyzet
Ha a ServiceBusMessageHeaders.SESSION_ID be van állítva az üzenetfejlécekben, és egy másik ServiceBusMessageHeaders.PARTITION_KEY fejléc is be van állítva, a munkamenet-azonosító értéke idővel felülírja a partíciókulcs értékét.
A Service Bus-ügyfél tulajdonságainak testreszabása
A fejlesztők a AzureServiceClientBuilderCustomizer segítségével testre szabhatják a Service Bus-ügyfél tulajdonságait. Az alábbi példa a sessionIdleTimeoutServiceBusClientBuilder tulajdonságát szabja testre:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Minták
További információ: azure-spring-boot-samples adattár a GitHubon.
Spring integráció az Azure Storage Queue-val
Főbb fogalmak
Az Azure Queue Storage nagy mennyiségű üzenet tárolására szolgáló szolgáltatás. A világ bármely pontjáról elérheti az üzeneteket hitelesített hívásokon keresztül HTTP vagy HTTPS használatával. Az üzenetsor-üzenetek mérete legfeljebb 64 KB lehet. Az üzenetsorok több millió üzenetet tartalmazhatnak, akár a tárfiók teljes kapacitáskorlátját is. Az üzenetsorokat gyakran használják az aszinkron feldolgozáshoz használt teendőlista létrehozásához.
Függőség beállítása
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Konfiguráció
Ez az alapindító a következő konfigurációs beállításokat biztosítja:
Kapcsolatkonfiguráció tulajdonságai
Ez a szakasz az Azure Storage Queuehoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.
Jegyzet
Ha biztonsági tagot használ egy Azure-erőforrás eléréséhez a Microsoft Entra-azonosítóval történő hitelesítéshez és engedélyezéshez, tekintse meg Hozzáférés engedélyezése a Microsoft Entra-azonosítóval annak biztosításához, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.
A spring-cloud-azure-starter-integration-storage-queue kapcsolatkonfigurálható tulajdonságai:
| Ingatlan | Típus | Leírás |
|---|---|---|
| spring.cloud.azure.storage.queue.enabled | Logikai | Hogy engedélyezve van-e egy Azure Storage-üzenetsor. |
| spring.cloud.azure.storage.queue.connection-string | Húr | Storage Queue Namespace kapcsolati sztring értéke. |
| spring.cloud.azure.storage.queue.accountName | Húr | Storage Queue-fiók neve. |
| spring.cloud.azure.storage.queue.accountKey | Húr | Storage Queue-fiókkulcs. |
| spring.cloud.azure.storage.queue.endpoint | Húr | Storage Queue szolgáltatásvégpont. |
| spring.cloud.azure.storage.queue.sasToken | Húr | Sas-jogkivonat hitelesítő adatai |
| spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | Api-kérések készítésekor használt QueueServiceVersion. |
| spring.cloud.azure.storage.queue.messageEncoding | Húr | Üzenetsor üzenetkódolása. |
Alapszintű használat
Üzenetek küldése az Azure Storage-üzenetsorba
Adja meg a hitelesítő adatok konfigurációs beállításait.
A hitelesítő adatok kapcsolati sztringként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}Jegyzet
A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.
A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Jegyzet
A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.
A szolgáltatásnévként megadott hitelesítő adatokhoz konfigurálja a következő tulajdonságokat a application.yml fájlban:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Jegyzet
A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.
Hozzon létre
DefaultMessageHandleraStorageQueueTemplatebean használatával, hogy üzeneteket küldjön a Tárolási üzenetsorba.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }Hozzon létre egy Üzenetátjáró-kötést a fenti üzenetkezelővel egy üzenetcsatornán keresztül.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }Üzenetek küldése az átjáróval.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Üzenetek fogadása az Azure Storage-üzenetsorból
Adja meg a hitelesítő adatok konfigurációs beállításait.
Hozzon létre egy bab üzenetcsatornát bemeneti csatornaként.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }Hozzon létre
StorageQueueMessageSourceaStorageQueueTemplatebab használatával, hogy üzeneteket fogadjon a Tárolási üzenetsorba.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }Hozzon létre egy üzenet fogadó kötést a StorageQueueMessageSource használatával, amelyet az előző lépésben hoztunk létre a korábban létrehozott üzenetcsatornán keresztül.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Minták
További információ: azure-spring-boot-samples adattár a GitHubon.