Megosztás:


Spring Cloud Azure-támogatás a Spring Integrationhez

Az Azure Spring Integration Extension spring integration adaptereket biztosít a Azure SDK for Javakülönböző szolgáltatásaihoz. Spring Integration-támogatást biztosítunk ezekhez az Azure-szolgáltatásokhoz: Event Hubs, Service Bus, Storage Queue. A támogatott adapterek listája a következő:

Tavaszi integráció az Azure Event Hubsszal

Főbb fogalmak

Az Azure Event Hubs egy big data streamelési platform és eseménybetöltési szolgáltatás. Másodpercenként több millió esemény fogadására és feldolgozására képes. Az eseményközpontba küldött adatok átalakíthatók és tárolhatók bármely valós idejű elemzési szolgáltató vagy kötegelési/tárolási adapter használatával.

A Spring Integration lehetővé teszi az egyszerű üzenetkezelést a Spring-alapú alkalmazásokban, és deklaratív adaptereken keresztül támogatja a külső rendszerekkel való integrációt. Ezek az adapterek magasabb szintű absztrakciót biztosítanak a Spring újraegyeztetési, üzenetkezelési és ütemezési támogatásával szemben. Az Spring Integration for Event Hubs bővítményprojekt bejövő és kimenő csatornaadaptereket és átjárókat biztosít az Azure Event Hubshoz.

Jegyzet

Az RxJava támogatási API-k a 4.0.0-s verzióról törlődnek. Részletekért lásd: Javadoc.

Fogyasztói csoport

Az Event Hubs hasonló támogatást nyújt a fogyasztói csoportoknak, mint az Apache Kafka, de kismértékben eltérő logikával. Bár a Kafka az összes véglegesített eltolást a közvetítőben tárolja, a manuálisan feldolgozott Event Hubs-üzenetek eltolásait kell tárolnia. Az Event Hubs SDK az ilyen eltolások Azure Storage-ban való tárolására szolgál.

Particionálás támogatása

Az Event Hubs a Kafkához hasonló fizikai partíciót biztosít. A Kafka felhasználói és partíciói közötti automatikus újraelosztással ellentétben azonban az Event Hubs egyfajta megelőző módot biztosít. A tárfiók bérletként működik annak meghatározásához, hogy melyik partíció melyik felhasználó tulajdonában van. Amikor egy új felhasználó elindul, megpróbál ellopni néhány partíciót a legtöbb nehéz terhelésű felhasználótól a számítási feladatok kiegyensúlyozása érdekében.

A terheléselosztási stratégia megadásához a fejlesztők EventHubsContainerProperties használhatnak a konfigurációhoz. A konfigurálásának módjáról az alábbi talál példát.

Batch fogyasztói támogatás

A EventHubsInboundChannelAdapter támogatja a kötegfogyasztó módot. Az engedélyezéshez a felhasználók ListenerMode.BATCH figyelő módot adhatnak meg EventHubsInboundChannelAdapter-példányok létrehozásakor. Ha engedélyezve van, egy üzenet, amelynek hasznos adata a kötegelt események listája, a rendszer megkapja és átadja az alsóbb rétegbeli csatornának. Az egyes üzenetfejlécek is listaként lesznek konvertálva, amelyek tartalma az egyes eseményekhez tartozó fejlécérték. A partícióazonosító, az ellenőrzőpont és az utolsó lekérdezett tulajdonságok közösségi fejlécei egyetlen értékként jelennek meg a teljes eseménykötegben, amelyek ugyanazt a tulajdonságot használják. További információ: Event Hubs-üzenetfejlécek szakasz.

Jegyzet

Az ellenőrzőpont fejléce csak akkor létezik, ha MANUÁLIS ellenőrzőpont mód van használatban.

A kötegfelhasználók ellenőrzőpontozása két módot támogat: BATCH és MANUAL. BATCH mód egy automatikus ellenőrzőpontozási mód, amely az események teljes kötegét együttesen ellenőrzi a beérkezés után. MANUAL mód az események felhasználók általi ellenőrzésére. Használat esetén a Ellenőrzőpont az üzenet fejlécébe kerül, és a felhasználók használhatják az ellenőrzőpont-ellenőrzést.

A kötegfogyasztó házirendet max-size és max-wait-timetulajdonságai adhatja meg, ahol a max-size szükséges tulajdonság, míg a max-wait-time nem kötelező. A köteghasználati stratégia megadásához a fejlesztők EventHubsContainerProperties használhatnak a konfigurációhoz. A konfigurálásának módjáról az alábbi talál példát.

Függőség beállítása

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Konfiguráció

Ez az alapindító a következő 3 konfigurációs lehetőséget biztosítja:

Kapcsolatkonfiguráció tulajdonságai

Ez a szakasz az Azure Event Hubshoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.

Jegyzet

Ha biztonsági tagot használ egy Azure-erőforrás eléréséhez a Microsoft Entra-azonosítóval történő hitelesítéshez és engedélyezéshez, tekintse meg Hozzáférés engedélyezése a Microsoft Entra-azonosítóval annak biztosításához, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.

A spring-cloud-azure-starter-integration-eventhubs kapcsolatkonfigurálható tulajdonságai:

Ingatlan Típus Leírás
spring.cloud.azure.eventhubs.enabled Logikai Az Azure Event Hubs engedélyezése.
spring.cloud.azure.eventhubs.connection-string Húr Event Hubs-névtér kapcsolati sztringértéke.
spring.cloud.azure.eventhubs.namespace Húr Event Hubs-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie
spring.cloud.azure.eventhubs.tartománynév Húr Egy Azure Event Hubs-névtérérték tartományneve.
spring.cloud.azure.eventhubs.custom-endpoint-address Húr Egyéni végpont címe.
spring.cloud.azure.eventhubs.shared-connection Logikai Azt, hogy a mögöttes EventProcessorClient és az EventHubProducerAsyncClient ugyanazt a kapcsolatot használja-e. Alapértelmezés szerint minden létrehozott Event Hub-ügyfélhez új kapcsolat jön létre és jön létre.

Ellenőrzőpont konfigurációs tulajdonságai

Ez a szakasz a Storage Blobs szolgáltatás konfigurációs beállításait tartalmazza, amely a partíció tulajdonjogának és ellenőrzőpont-adatainak megőrzésére szolgál.

Jegyzet

Ha a 4.0.0-s verzióban a spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tulajdonság nincs manuálisan engedélyezve, a rendszer nem hoz létre automatikusan storage-tárolót.

A spring-cloud-azure-starter-integration-eventhubs konfigurálható tulajdonságainak ellenőrzése:

Ingatlan Típus Leírás
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Logikai Engedélyezi-e a tárolók létrehozását, ha nem létezik.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Húr A tárfiók neve.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Húr Tárfiók hozzáférési kulcsa.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Húr Tároló neve.

Az Azure Service SDK gyakori konfigurációs beállításai a Storage Blob CheckPoint Store-hoz is konfigurálhatók. A támogatott konfigurációs beállítások Spring Cloud Azure-konfigurációsjelennek meg, és konfigurálhatók az egyesített spring.cloud.azure. előtaggal vagy a spring.cloud.azure.eventhubs.processor.checkpoint-storeelőtagjával.

Az Event Hub processzorkonfigurációs tulajdonságai

A EventHubsInboundChannelAdapter a EventProcessorClient használja egy eseményközpont üzeneteinek felhasználására, egy EventProcessorClientáltalános tulajdonságainak konfigurálására, a fejlesztők EventHubsContainerProperties használhatnak a konfigurációhoz. A használatáról a következő szakaszban olvashat.

Alapszintű használat

Üzenetek küldése az Azure Event Hubsba

  1. Adja meg a hitelesítő adatok konfigurációs beállításait.

    • A hitelesítő adatok kapcsolati sztringként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      

      Jegyzet

      A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.

    • A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • A szolgáltatásnévként megadott hitelesítő adatokhoz konfigurálja a következő tulajdonságokat a application.yml fájlban:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Jegyzet

A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.

  1. Hozzon létre DefaultMessageHandler a EventHubsTemplate babbal, hogy üzeneteket küldjön az Event Hubsnak.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Hozzon létre egy üzenetátjáró-kötést a fenti üzenetkezelővel egy üzenetcsatornán keresztül.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Üzenetek küldése az átjáróval.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Üzenetek fogadása az Azure Event Hubsból

  1. Adja meg a hitelesítő adatok konfigurációs beállításait.

  2. Hozzon létre egy bab üzenetcsatornát bemeneti csatornaként.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Hozzon létre EventHubsInboundChannelAdapter a EventHubsMessageListenerContainer bab használatával az Event Hubstól érkező üzenetek fogadásához.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Hozzon létre egy üzenet fogadó kötést az EventHubsInboundChannelAdapterrel a korábban létrehozott üzenetcsatornán keresztül.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Az EventHubsMessageConverter konfigurálása az objectMapper testreszabásához

EventHubsMessageConverter konfigurálható babként készült, amely lehetővé teszi a felhasználók számára az ObjectMapper testreszabását.

Batch fogyasztói támogatás

Az Event Hubsból származó üzenetek kötegekben való felhasználása hasonló a fenti mintához, a felhasználókon kívül meg kell adniuk a kötegfelhasználókhoz kapcsolódó konfigurációs beállításokat EventHubsInboundChannelAdapter.

A EventHubsInboundChannelAdapterlétrehozásakor a figyelő üzemmódot BATCHkell beállítani. A EventHubsMessageListenerContainerlétrehozásakor állítsa be az ellenőrzőpont üzemmódot MANUAL vagy BATCH, és a kötegbeállítások igény szerint konfigurálhatók.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Event Hubs-üzenetfejlécek

Az alábbi táblázat bemutatja, hogyan vannak leképezve az Event Hubs üzenettulajdonságai a Spring-üzenetfejlécekre. Az Azure Event Hubs esetében az üzenet neve event.

Leképezés az Event Hubs-üzenet/eseménytulajdonságok és a spring message fejlécek között rekordfigyelő módban:

Az Event Hubs eseménytulajdonságai Tavaszi üzenetfejléc-állandók Típus Leírás
Várólista idő EventHubsHeaders#ENQUEUED_TIME Azonnali Az eseménynek az Event Hub partícióban való leküldésének pillanata UTC-ben.
Ellensúlyoz EventHubsHeaders#ELTOLÁS Hosszú Az esemény eltolása, amikor a társított Event Hub-partícióról érkezett.
Partíciókulcs AzureHeaders#PARTITION_KEY Húr A partíció kivonatolási kulcsa, ha az esemény eredeti közzétételekor lett beállítva.
Partícióazonosító AzureHeaders#RAW_PARTITION_ID Húr Az Event Hub partícióazonosítója.
Sorozatszáma EventHubsHeaders#SEQUENCE_NUMBER Hosszú Az eseményhez rendelt sorszám, amikor a társított Event Hub-partícióban lekérdezték.
Utolsó enqueued eseménytulajdonságok EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties A partíció utolsó lekérdezett eseményének tulajdonságai.
N/A AzureHeaders#CHECKPOINTER Ellenőrzőpont Az adott üzenet ellenőrzőpontjának fejléce.

A felhasználók elemezhetik az egyes események kapcsolódó információinak üzenetfejléceit. Az esemény üzenetfejlécének beállításához a program az összes testreszabott fejlécet egy esemény alkalmazástulajdonságaként helyezi el, ahol a fejléc tulajdonságkulcsként van beállítva. Amikor események érkeznek az Event Hubstól, az összes alkalmazástulajdonság az üzenetfejlécre lesz konvertálva.

Jegyzet

A partíciókulcs, a lekérdezett idő, az eltolás és a sorszám üzenetfejlécei nem használhatók manuálisan.

Ha engedélyezve van a batch-consumer mód, a kötegelt üzenetek adott fejlécei a következők, amelyek az egyes Event Hubs-események értékeinek listáját tartalmazzák.

Leképezés az Event Hubs-üzenet/eseménytulajdonságok és a Spring Message-fejlécek között Batch-figyelő módban:

Az Event Hubs eseménytulajdonságai Spring Batch üzenetfejléc-állandók Típus Leírás
Várólista idő EventHubsHeaders#ENQUEUED_TIME Azonnali lista Annak a pillanatnak a listája (UTC), amikor az egyes eseményeket az Event Hub partícióba foglalták.
Ellensúlyoz EventHubsHeaders#ELTOLÁS Hosszú lista Az egyes események eltolásának listája a társított Event Hub-partícióról való fogadáskor.
Partíciókulcs AzureHeaders#PARTITION_KEY Sztringek listája A partíció kivonatolási kulcsának listája, ha az az egyes események eredeti közzétételekor lett beállítva.
Sorozatszáma EventHubsHeaders#SEQUENCE_NUMBER Hosszú lista Az egyes eseményekhez rendelt sorszámok listája, amikor az eseményt a társított Event Hub-partícióban lekérdezték.
Rendszertulajdonságok EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Térképlista Az egyes események rendszertulajdonságainak listája.
Alkalmazástulajdonságok EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Térképlista Az egyes események alkalmazástulajdonságainak listája, ahol az összes testreszabott üzenetfejléc vagy eseménytulajdonság el lesz helyezve.

Jegyzet

Üzenetek közzétételekor a fenti kötegfejlécek el lesznek távolítva az üzenetekből, ha léteznek.

Minták

További információ: azure-spring-boot-samples adattár a GitHubon.

Spring integráció az Azure Service Bus-szal

Főbb fogalmak

A Spring Integration lehetővé teszi az egyszerű üzenetkezelést a Spring-alapú alkalmazásokban, és deklaratív adaptereken keresztül támogatja a külső rendszerekkel való integrációt.

Az Azure Service Bus Spring Integration bővítményprojektje bejövő és kimenő csatornaadaptereket biztosít az Azure Service Bushoz.

Jegyzet

Az CompletableFuture támogatási API-k elavultak a 2.10.0-s verzióról, és a Reactor Core a 4.0.0-s verzióról váltja fel. Részletekért lásd: Javadoc.

Függőség beállítása

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Konfiguráció

Ez az alapindító a következő 2 konfigurációs lehetőséget biztosítja:

Kapcsolatkonfiguráció tulajdonságai

Ez a szakasz az Azure Service Bushoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.

Jegyzet

Ha biztonsági tagot használ egy Azure-erőforrás eléréséhez a Microsoft Entra-azonosítóval történő hitelesítéshez és engedélyezéshez, tekintse meg Hozzáférés engedélyezése a Microsoft Entra-azonosítóval annak biztosításához, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.

A spring-cloud-azure-starter-integration-servicebus kapcsolatkonfigurálható tulajdonságai:

Ingatlan Típus Leírás
spring.cloud.azure.servicebus.enabled Logikai Az Azure Service Bus engedélyezése.
spring.cloud.azure.servicebus.connection-string Húr Service Bus-névtér kapcsolati sztringértéke.
spring.cloud.azure.servicebus.custom-endpoint-address Húr A Service Bushoz való csatlakozáskor használandó egyéni végpontcím.
spring.cloud.azure.servicebus.namespace Húr Service Bus-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie
spring.cloud.azure.servicebus.domain-name Húr Egy Azure Service Bus-névtérérték tartományneve.

A Service Bus processzor konfigurációs tulajdonságai

A ServiceBusInboundChannelAdapter a ServiceBusProcessorClient használja az üzenetek felhasználására, egy ServiceBusProcessorClientáltalános tulajdonságainak konfigurálására, a fejlesztők pedig ServiceBusContainerProperties használhatnak a konfigurációhoz. A használatáról a következő szakaszban olvashat.

Alapszintű használat

Üzenetek küldése az Azure Service Busba

  1. Adja meg a hitelesítő adatok konfigurációs beállításait.

    • A hitelesítő adatok kapcsolati sztringként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      

      Jegyzet

      A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.

    • A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Jegyzet

A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.

  • A szolgáltatásnévként megadott hitelesítő adatokhoz konfigurálja a következő tulajdonságokat a application.yml fájlban:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Jegyzet

A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.

  1. Hozzon létre DefaultMessageHandler a ServiceBusTemplate beannel, hogy üzeneteket küldjön a Service Busnak, állítsa be a ServiceBusTemplate entitástípusát. Ez a minta a Service Bus-üzenetsort veszi példaként.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Hozzon létre egy üzenetátjáró-kötést a fenti üzenetkezelővel egy üzenetcsatornán keresztül.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Üzenetek küldése az átjáróval.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Üzenetek fogadása az Azure Service Busból

  1. Adja meg a hitelesítő adatok konfigurációs beállításait.

  2. Hozzon létre egy bab üzenetcsatornát bemeneti csatornaként.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Hozzon létre ServiceBusInboundChannelAdapter a ServiceBusMessageListenerContainer babbal a Service Busba érkező üzenetek fogadásához. Ez a minta a Service Bus-üzenetsort veszi példaként.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Hozzon létre egy üzenet fogadó kötést ServiceBusInboundChannelAdapter a korábban létrehozott üzenetcsatornán keresztül.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

A ServiceBusMessageConverter konfigurálása az objectMapper testreszabásához

ServiceBusMessageConverter konfigurálható babként készült, amely lehetővé teszi a felhasználók számára a ObjectMappertestreszabását.

Service Bus-üzenetfejlécek

Egyes Service Bus-fejlécek esetében, amelyek több Spring-fejlécállandóra is leképezhetők, a különböző Spring-fejlécek prioritása megjelenik a listában.

Leképezés a Service Bus-fejlécek és a spring fejlécek között:

Service Bus-üzenetfejlécek és -tulajdonságok Tavaszi üzenetfejléc-állandók Típus Konfigurálható Leírás
Tartalomtípus MessageHeaders#CONTENT_TYPE Húr Igen Az üzenet RFC2045 tartalomtípus-leírója.
Korrelációs azonosító ServiceBusMessageHeaders#CORRELATION_ID Húr Igen Az üzenet korrelációs azonosítója
Üzenetazonosító ServiceBusMessageHeaders#MESSAGE_ID Húr Igen Az üzenet üzenetazonosítója magasabb prioritással rendelkezik, mint MessageHeaders#ID.
Üzenetazonosító MessageHeaders#ID UUID (Univerzálisan Egyedi Azonosító) Igen Az üzenet üzenetazonosítója alacsonyabb prioritással rendelkezik, mint ServiceBusMessageHeaders#MESSAGE_ID.
Partíciókulcs ServiceBusMessageHeaders#PARTITION_KEY Húr Igen Az üzenet particionált entitásnak való küldéséhez használt partíciókulcs.
Válasz erre: MessageHeaders#REPLY_CHANNEL Húr Igen Egy entitás címe, amelybe válaszokat szeretne küldeni.
Válasz a munkamenet-azonosítóra ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Húr Igen Az üzenet ReplyToGroupId tulajdonságértéke.
Ütemezett beiktatás időpontja utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Igen Az a dátum, amikor az üzenetet le kell venni a Service Busban, ez a fejléc nagyobb prioritással rendelkezik, mint AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Ütemezett beiktatás időpontja utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Egész szám Igen Az a dátum, amikor az üzenetet be kell helyezni a Service Busban, ez a fejléc alacsonyabb prioritással rendelkezik, mint ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Munkamenet-azonosító ServiceBusMessageHeaders#SESSION_ID Húr Igen A munkamenet-vezérelt entitás munkamenet-IDentifier-azonosítója.
Az élethez való idő ServiceBusMessageHeaders#TIME_TO_LIVE Időtartam Igen Az üzenet lejárata előtti időtartam.
Hoz ServiceBusMessageHeaders#TO Húr Igen Az üzenet "címzett" címe, amely az útválasztási forgatókönyvekben való jövőbeli használatra van fenntartva, és amelyet jelenleg maga a közvetítő figyelmen kívül hagy.
Tárgy ServiceBusMessageHeaders#SUBJECT Húr Igen Az üzenet tárgya.
Holt betű hiba leírása ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Húr Nem Egy halott betűs üzenet leírása.
Holt betű oka ServiceBusMessageHeaders#DEAD_LETTER_REASON Húr Nem Az oka annak, hogy egy üzenet elhalt betűs volt.
Holt betűforrás ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Húr Nem Az az entitás, amelyben az üzenet elhalt betűs volt.
Kézbesítések száma ServiceBusMessageHeaders#DELIVERY_COUNT hosszú Nem Az üzenet ügyfeleknek való kézbesítésének száma.
Enqueued sorszám ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER hosszú Nem A Service Bus által egy üzenethez hozzárendelt lekérdezett sorszám.
Várólista idő ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Nem Az az időpont, amikor az üzenet be lett iktatásra a Service Busban.
A lejárat dátuma: ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Nem Az az időpont, amikor az üzenet lejár.
Jogkivonat zárolása ServiceBusMessageHeaders#LOCK_TOKEN Húr Nem Az aktuális üzenet zárolási jogkivonata.
Zárolva, amíg ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Nem Az az időpont, amikor az üzenet zárolása lejár.
Sorozatszáma ServiceBusMessageHeaders#SEQUENCE_NUMBER hosszú Nem A Service Bus által egy üzenethez rendelt egyedi szám.
Állam ServiceBusMessageHeaders#STATE ServiceBusMessageState Nem Az üzenet állapota, amely lehet aktív, késleltetett vagy ütemezett.

Partíciókulcs támogatása

Ez a kezdő támogatja Service Bus particionálási azáltal, hogy engedélyezi a partíciókulcs és a munkamenet-azonosító beállítását az üzenetfejlécben. Ez a szakasz bemutatja, hogyan állíthat be partíciókulcsot az üzenetekhez.

Ajánlott: Használja a ServiceBusMessageHeaders.PARTITION_KEY a fejléc kulcsaként.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nem ajánlott, de jelenleg támogatott: AzureHeaders.PARTITION_KEY a fejléc kulcsaként.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Jegyzet

Ha ServiceBusMessageHeaders.PARTITION_KEY és AzureHeaders.PARTITION_KEY is be van állítva az üzenetfejlécekben, ServiceBusMessageHeaders.PARTITION_KEY előnyben részesítjük.

Munkamenet-támogatás

Ez a példa bemutatja, hogyan állíthatja be manuálisan egy üzenet munkamenet-azonosítóját az alkalmazásban.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Jegyzet

Ha a ServiceBusMessageHeaders.SESSION_ID be van állítva az üzenetfejlécekben, és egy másik ServiceBusMessageHeaders.PARTITION_KEY fejléc is be van állítva, a munkamenet-azonosító értéke idővel felülírja a partíciókulcs értékét.

A Service Bus-ügyfél tulajdonságainak testreszabása

A fejlesztők a AzureServiceClientBuilderCustomizer segítségével testre szabhatják a Service Bus-ügyfél tulajdonságait. Az alábbi példa a sessionIdleTimeoutServiceBusClientBuilder tulajdonságát szabja testre:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Minták

További információ: azure-spring-boot-samples adattár a GitHubon.

Spring integráció az Azure Storage Queue-val

Főbb fogalmak

Az Azure Queue Storage nagy mennyiségű üzenet tárolására szolgáló szolgáltatás. A világ bármely pontjáról elérheti az üzeneteket hitelesített hívásokon keresztül HTTP vagy HTTPS használatával. Az üzenetsor-üzenetek mérete legfeljebb 64 KB lehet. Az üzenetsorok több millió üzenetet tartalmazhatnak, akár a tárfiók teljes kapacitáskorlátját is. Az üzenetsorokat gyakran használják az aszinkron feldolgozáshoz használt teendőlista létrehozásához.

Függőség beállítása

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Konfiguráció

Ez az alapindító a következő konfigurációs beállításokat biztosítja:

Kapcsolatkonfiguráció tulajdonságai

Ez a szakasz az Azure Storage Queuehoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.

Jegyzet

Ha biztonsági tagot használ egy Azure-erőforrás eléréséhez a Microsoft Entra-azonosítóval történő hitelesítéshez és engedélyezéshez, tekintse meg Hozzáférés engedélyezése a Microsoft Entra-azonosítóval annak biztosításához, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.

A spring-cloud-azure-starter-integration-storage-queue kapcsolatkonfigurálható tulajdonságai:

Ingatlan Típus Leírás
spring.cloud.azure.storage.queue.enabled Logikai Hogy engedélyezve van-e egy Azure Storage-üzenetsor.
spring.cloud.azure.storage.queue.connection-string Húr Storage Queue Namespace kapcsolati sztring értéke.
spring.cloud.azure.storage.queue.accountName Húr Storage Queue-fiók neve.
spring.cloud.azure.storage.queue.accountKey Húr Storage Queue-fiókkulcs.
spring.cloud.azure.storage.queue.endpoint Húr Storage Queue szolgáltatásvégpont.
spring.cloud.azure.storage.queue.sasToken Húr Sas-jogkivonat hitelesítő adatai
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion Api-kérések készítésekor használt QueueServiceVersion.
spring.cloud.azure.storage.queue.messageEncoding Húr Üzenetsor üzenetkódolása.

Alapszintű használat

Üzenetek küldése az Azure Storage-üzenetsorba

  1. Adja meg a hitelesítő adatok konfigurációs beállításait.

    • A hitelesítő adatok kapcsolati sztringként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      

      Jegyzet

      A Microsoft az elérhető legbiztonságosabb hitelesítési folyamat használatát javasolja. Az ebben az eljárásban ismertetett hitelesítési folyamat, például adatbázisok, gyorsítótárak, üzenetkezelés vagy AI-szolgáltatások esetében, nagyon nagy megbízhatóságot igényel az alkalmazásban, és más folyamatokban nem jelenik meg kockázattal. Ezt a folyamatot csak akkor használja, ha a biztonságosabb lehetőségek, például a jelszó nélküli vagy kulcs nélküli kapcsolatok felügyelt identitásai nem életképesek. A helyi gépi műveletekhez előnyben részesítse a jelszó nélküli vagy kulcs nélküli kapcsolatok felhasználói identitásait.

    • A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja a következő tulajdonságokat a application.yml fájlban:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Jegyzet

A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.

  • A szolgáltatásnévként megadott hitelesítő adatokhoz konfigurálja a következő tulajdonságokat a application.yml fájlban:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Jegyzet

A tenant-id engedélyezett értékek a következők: common, organizations, consumersvagy bérlőazonosító. Ezekről az értékekről további információt a Helytelen végpont (személyes és szervezeti fiókok) szakaszában talál, hiba AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlői. Az egybérlős alkalmazás konvertálásáról további információt Az egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosító.

  1. Hozzon létre DefaultMessageHandler a StorageQueueTemplate bean használatával, hogy üzeneteket küldjön a Tárolási üzenetsorba.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Hozzon létre egy Üzenetátjáró-kötést a fenti üzenetkezelővel egy üzenetcsatornán keresztül.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Üzenetek küldése az átjáróval.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Üzenetek fogadása az Azure Storage-üzenetsorból

  1. Adja meg a hitelesítő adatok konfigurációs beállításait.

  2. Hozzon létre egy bab üzenetcsatornát bemeneti csatornaként.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Hozzon létre StorageQueueMessageSource a StorageQueueTemplate bab használatával, hogy üzeneteket fogadjon a Tárolási üzenetsorba.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Hozzon létre egy üzenet fogadó kötést a StorageQueueMessageSource használatával, amelyet az előző lépésben hoztunk létre a korábban létrehozott üzenetcsatornán keresztül.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Minták

További információ: azure-spring-boot-samples adattár a GitHubon.