Jak vytvořit aplikaci v Javě, která používá Azure Cosmos DB for NoSQL a procesor kanálu změn

PLATÍ PRO: NoSQL

Azure Cosmos DB je plně spravovaná databázová služba NoSQL poskytovaná Microsoftem. Umožňuje snadno vytvářet globálně distribuované a vysoce škálovatelné aplikace. Tento návod vás provede procesem vytvoření aplikace v Javě, která používá databázi Azure Cosmos DB for NoSQL, a implementuje procesor kanálu změn pro zpracování dat v reálném čase. Aplikace Java komunikuje se službou Azure Cosmos DB for NoSQL pomocí sady Java SDK služby Azure Cosmos DB v4.

Důležité

Tento kurz je určený pouze pro Sadu Java SDK služby Azure Cosmos DB v4. Další informace najdete v poznámkách k verzi sady Java SDK služby Azure Cosmos DB v4, úložišti Maven, procesoru kanálu změn ve službě Azure Cosmos DB a průvodci odstraňováním potíží se sadou Java SDK služby Azure Cosmos DB v4. Pokud aktuálně používáte starší verzi než v4, nápovědu k upgradu na verzi 4 najdete v průvodci migrací na sadu Java SDK služby Azure Cosmos DB v4 .

Požadavky

Pozadí

Kanál změn služby Azure Cosmos DB poskytuje rozhraní řízené událostmi pro aktivaci akcí v reakci na vložení dokumentu, které má mnoho použití.

O správu událostí kanálu změn se z velké části stará knihovna procesoru kanálu změn integrovaná v sadě SDK. Tato knihovna je dostatečně výkonná k distribuci událostí kanálu změn mezi více pracovních procesů, pokud je to žádoucí. Jediné, co musíte udělat, je poskytnout knihovně kanálu změn zpětné volání.

Tento jednoduchý příklad aplikace v Javě demonstruje zpracování dat v reálném čase pomocí služby Azure Cosmos DB a procesoru kanálu změn. Aplikace vloží ukázkové dokumenty do kontejneru informačního kanálu pro simulaci datového proudu. Procesor kanálu změn vázaný na kontejner informačního kanálu zpracovává příchozí změny a protokoluje obsah dokumentu. Procesor automaticky spravuje zapůjčení pro paralelní zpracování.

Zdrojový kód

Ukázkové úložiště sady SDK můžete naklonovat a tento příklad najdete v SampleChangeFeedProcessor.java:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

Názorný postup

  1. ChangeFeedProcessorOptions Nakonfigurujte v aplikaci v Javě pomocí služby Azure Cosmos DB a sady Java SDK služby Azure Cosmos DB verze 4. Poskytuje ChangeFeedProcessorOptions základní nastavení pro řízení chování zpracovatele kanálu změn během zpracování dat.

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    
  2. Inicializuje changeFeedProcessor s příslušnými konfiguracemi, včetně názvu hostitele, kontejneru informačního kanálu, kontejneru zapůjčení a logiky zpracování dat. Metoda start() zahájí zpracování dat a povolí souběžné zpracování změn příchozích dat z kontejneru informačního kanálu v reálném čase.

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. Určete, že delegát zpracovává změny příchozích dat pomocí handleChanges() metody . Metoda zpracovává přijaté dokumenty JsonNode z kanálu změn. Jako vývojář máte dvě možnosti, jak zpracovat dokument JsonNode, který vám poskytuje kanál změn. Jednou z možností je pracovat s dokumentem ve formě uzlu JsonNode. To je skvělé zejména v případě, že nemáte jeden jednotný datový model pro všechny dokumenty. Druhá možnost – transformujte JsonNode na POJO se stejnou strukturou jako JsonNode. Pak můžete pracovat s POJO.

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Sestavte a spusťte aplikaci v Javě. Aplikace spustí procesor kanálu změn, vloží do kontejneru informačního kanálu ukázkové dokumenty a zpracuje příchozí změny.

Závěr

V této příručce jste zjistili, jak pomocí sady Azure Cosmos DB Java SDK verze 4 vytvořit aplikaci v Javě, která používá databázi Azure Cosmos DB for NoSQL a ke zpracování dat v reálném čase používá procesor kanálu změn. Tuto aplikaci můžete rozšířit tak, aby zvládla složitější případy použití, a vytvářet robustní, škálovatelné a globálně distribuované aplikace pomocí služby Azure Cosmos DB.

Další materiály

Další kroky

Další informace o nástroji pro posouzení kanálu změn teď najdete v následujících článcích: