Partager via


Comment créer une application Java qui utilise Azure Cosmos DB pour NoSQL et le processeur de flux de modification

Azure Cosmos DB est un service de base de données NoSQL entièrement managé fourni par Microsoft. Il vous permet de créer facilement des applications distribuées et hautement évolutives à l’échelle mondiale. Ce guide pratique vous guide tout au long du processus de création d’une application Java qui utilise la base de données Azure Cosmos DB pour NoSQL et implémente le processeur de flux de modification pour le traitement des données en temps réel. L’application Java communique avec Azure Cosmos DB pour NoSQL à l’aide du Kit de développement logiciel (SDK) Java Azure Cosmos DB v4.

Important

Ce tutoriel concerne uniquement le Kit de développement logiciel (SDK) Java Azure Cosmos DB v4. Pour plus d’informations, consultez les notes de publication du Kit de développement logiciel (SDK) Java v4 Azure Cosmos DB, le référentiel Maven, le processeur de flux de modification dans Azure Cosmos DB et le guide de résolution des problèmes du Kit de développement logiciel (SDK) Java v4 d’Azure Cosmos DB. Si vous utilisez actuellement une version antérieure à v4, consultez le guide Migrer votre application pour utiliser le SDK Java v4 Azure Cosmos DB afin d’obtenir de l’aide sur la mise à niveau vers v4.

Prerequisites

Contexte

Le flux de modification Azure Cosmos DB fournit une interface pilotée par les événements pour déclencher des actions en réponse à l’insertion de document qui a de nombreuses utilisations.

Le travail de gestion des événements de flux de modification est en grande partie pris en charge par la bibliothèque de processeur de flux de modification intégrée au Kit de développement logiciel (SDK). Cette bibliothèque est suffisamment puissante pour distribuer les événements de flux de modification entre plusieurs workers, si vous le souhaitez. Il vous suffit de fournir à la bibliothèque de flux de modification un rappel.

Cet exemple simple d’application Java illustre le traitement des données en temps réel avec Azure Cosmos DB et le processeur de flux de modification. L’application insère des exemples de documents dans un « conteneur de flux » pour simuler un flux de données. Le processeur de flux de modification, lié au conteneur de flux, traite les modifications entrantes et enregistre le contenu du document. Le processeur gère automatiquement les baux pour le traitement parallèle.

Code source

Vous pouvez cloner l’exemple de référentiel du Kit de développement logiciel (SDK) et trouver cet exemple dans SampleChangeFeedProcessor.java:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

Walkthrough

  1. Configurez le ChangeFeedProcessorOptions dans une application Java en utilisant Azure Cosmos DB et le Kit de développement logiciel (SDK) Java Azure Cosmos DB V4. Les ChangeFeedProcessorOptions paramètres essentiels permettent de contrôler le comportement du processeur de flux de modification pendant le traitement des données.

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);
    
  2. Initialisez ChangeFeedProcessor avec des configurations pertinentes, notamment le nom d’hôte, le conteneur de flux, le conteneur de baux et la logique de gestion des données. La méthode start() lance le traitement des données, en activant le traitement simultané et en temps réel des modifications de données entrantes à partir du conteneur de flux.

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .options(options)
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. Spécifiez que le délégué gère les modifications de données entrantes à l’aide de la handleChanges() méthode. La méthode traite les documents JsonNode reçus à partir du flux de modification. En tant que développeur, vous avez deux options pour gérer le document JsonNode fourni par le flux de modification. Une option consiste à utiliser le document sous la forme d’un JsonNode. Cela est très utile en particulier si vous n’avez pas de modèle de données uniforme unique pour tous les documents. La deuxième option : transformer le JsonNode en un OBJET POJO ayant la même structure que le JsonNode. Vous pouvez ensuite utiliser le POJO.

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Générez et exécutez l’application Java. L’application démarre le processeur de flux de modification, insère des exemples de documents dans le conteneur de flux et traite les modifications entrantes.

Conclusion

Dans ce guide, vous avez appris à créer une application Java à l’aide du Kit de développement logiciel (SDK) Java Azure Cosmos DB V4 qui utilise la base de données Azure Cosmos DB pour NoSQL et utilise le processeur de flux de modification pour le traitement des données en temps réel. Vous pouvez étendre cette application pour gérer des cas d’usage plus complexes et créer des applications robustes, évolutives et distribuées à l’échelle mondiale à l’aide d’Azure Cosmos DB.

Ressources supplémentaires

Étapes suivantes

Vous pouvez maintenant continuer à en savoir plus sur l’estimateur de flux de modification dans les articles suivants :