Delen via


Een Java-toepassing maken die gebruikmaakt van Azure Cosmos DB voor NoSQL en processor voor wijzigingenfeeds

VAN TOEPASSING OP: NoSQL

Azure Cosmos DB is een volledig beheerde NoSQL-databaseservice die wordt geleverd door Microsoft. Hiermee kunt u eenvoudig wereldwijd gedistribueerde en zeer schaalbare toepassingen bouwen. Deze handleiding begeleidt u bij het maken van een Java-toepassing die gebruikmaakt van de Azure Cosmos DB for NoSQL-database en implementeert de wijzigingenfeedprocessor voor realtime gegevensverwerking. De Java-toepassing communiceert met Azure Cosmos DB for NoSQL met behulp van Azure Cosmos DB Java SDK v4.

Belangrijk

Deze zelfstudie geldt alleen voor Azure Cosmos DB Java SDK v4. Bekijk de releaseopmerkingen voor Azure Cosmos DB Java SDK v4, Maven-opslagplaats, wijzigingenfeedprocessor in Azure Cosmos DB en de probleemoplossingsgids voor Azure Cosmos DB Java SDK v4 voor meer informatie. Als u momenteel een oudere versie dan v4 gebruikt, raadpleegt u de gids Migreren naar Azure Cosmos DB Java SDK v4 voor hulp om te upgraden naar v4.

Vereisten

  • Azure Cosmos DB-account: u kunt het maken vanuit Azure Portal of u kunt ook Azure Cosmos DB Emulator gebruiken.

  • Java Development Environment: Zorg ervoor dat u Java Development Kit (JDK) op uw computer hebt geïnstalleerd met ten minste 8 versies.

  • Azure Cosmos DB Java SDK V4: biedt de benodigde functies voor interactie met Azure Cosmos DB.

Achtergrond

De Azure Cosmos DB-wijzigingenfeed biedt een gebeurtenisgestuurde interface voor het activeren van acties als reactie op documentinvoeging die veel wordt gebruikt.

Gebeurtenissen in de wijzigingenfeed bijhouden wordt grotendeels gedaan door de bibliotheek voor wijzigingenfeedverwerking die in de SDK is ingebouwd. Deze bibliotheek is krachtig genoeg om gebeurtenissen in de wijzigingenfeed tussen meerdere werkrollen te verdelen, indien dat is gewenst. U hoeft alleen een callback op te geven in de bibliotheek van de wijzigingenfeed.

Dit eenvoudige voorbeeld van een Java-toepassing demonstreert realtime gegevensverwerking met Azure Cosmos DB en de wijzigingenfeedprocessor. De toepassing voegt voorbeelddocumenten in een 'feedcontainer' in om een gegevensstroom te simuleren. De wijzigingenfeedprocessor, gebonden aan de feedcontainer, verwerkt binnenkomende wijzigingen en registreert de documentinhoud. De processor beheert automatisch leases voor parallelle verwerking.

Broncode

U kunt de SDK-voorbeeldopslagplaats klonen en dit voorbeeld vinden in SampleChangeFeedProcessor.java:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

Walkthrough

  1. Configureer de ChangeFeedProcessorOptions in een Java-toepassing met behulp van Azure Cosmos DB en Azure Cosmos DB Java SDK V4. De ChangeFeedProcessorOptions biedt essentiële instellingen voor het beheren van het gedrag van de verwerker van wijzigingenfeeds tijdens gegevensverwerking.

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    
  2. Initialiseer ChangeFeedProcessor met relevante configuraties, waaronder de hostnaam, feedcontainer, leasecontainer en logica voor gegevensverwerking. Met de methode start() wordt de gegevensverwerking gestart, waardoor gelijktijdige en realtime verwerking van binnenkomende gegevenswijzigingen vanuit de feedcontainer mogelijk wordt.

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. Geef de gemachtigde binnenkomende gegevenswijzigingen op met behulp van de handleChanges() methode. De methode verwerkt de ontvangen JsonNode-documenten uit de wijzigingenfeed. Als ontwikkelaar hebt u twee opties voor het afhandelen van het JsonNode-document dat u door de wijzigingenfeed hebt verstrekt. Een optie is om het document te bewerken in de vorm van een JsonNode. Dit is vooral handig als u niet één uniform gegevensmodel voor alle documenten hebt. De tweede optie: transformeer de JsonNode naar een POJO met dezelfde structuur als de JsonNode. Vervolgens kunt u aan de POJO werken.

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Bouw en voer de Java-toepassing uit. De toepassing start de processor van de wijzigingenfeed, voegt voorbeelddocumenten in de feedcontainer in en verwerkt de binnenkomende wijzigingen.

Conclusie

In deze handleiding hebt u geleerd hoe u een Java-toepassing maakt met behulp van Azure Cosmos DB Java SDK V4 die gebruikmaakt van de Azure Cosmos DB for NoSQL-database en de wijzigingenfeedprocessor gebruikt voor realtime gegevensverwerking. U kunt deze toepassing uitbreiden om complexere gebruiksvoorbeelden af te handelen en robuuste, schaalbare en wereldwijd gedistribueerde toepassingen te bouwen met behulp van Azure Cosmos DB.

Aanvullende bronnen

Volgende stappen

U kunt nu verdergaan met meer informatie over de wijzigingsfeedschatter in de volgende artikelen: