Partage via


Guide pratique pour créer une application Java qui utilise Azure Cosmos DB for NoSQL et un processeur de flux de modification

S’APPLIQUE À : NoSQL

Azure Cosmos DB est un service de base de données NoSQL entièrement managé fourni par Microsoft. Il vous permet de créer facilement des applications distribuées à l’échelle mondiale et hautement évolutives. Ce guide pratique vous guide tout au long du processus de création d’une application Java qui utilise la base de données Azure Cosmos DB for NoSQL et implémente le processeur de flux de modification pour le traitement des données en temps réel. L’application Java communique avec l’Azure Cosmos DB for NoSQL en utilisant le Kit de développement logiciel (SDK) Java v4 Azure Cosmos DB.

Important

Ce tutoriel s’applique uniquement au SDK Java v4 Azure Cosmos DB. Pour plus d’informations, consultez les Notes de publication du SDK Java v4 Azure Cosmos DB, le Référentiel Maven, le Processeur de flux de modification dans Azure Cosmos DB et le guide de résolution des problèmes du SDK Java v4 Azure Cosmos DB. Si vous utilisez actuellement une version antérieure à v4, consultez le guide Migrer votre application pour utiliser le SDK Java v4 Azure Cosmos DB pour obtenir de l’aide sur la mise à niveau vers v4.

Prérequis

Arrière-plan

Le flux de modification Azure Cosmos DB fournit une interface pilotée par les événements pour déclencher des actions en réponse à l’insertion de documents qui a de nombreuses utilisations.

La gestion des événements du flux de modification est en grande partie prise en charge par la bibliothèque du processeur de flux de modification qui est intégrée au SDK. Cette bibliothèque est suffisamment puissante pour distribuer les événements du flux de modification entre plusieurs workers quand cela est nécessaire. Il vous suffit de fournir un rappel à la bibliothèque du processeur de flux de modification.

Cet exemple simple d’application Java illustre le traitement des données en temps réel avec Azure Cosmos DB et le processeur de flux de modification. L’application insère des exemples de documents dans un « conteneur de flux » pour simuler un flux de données. Le processeur de flux de modification, lié au conteneur de flux, traite les modifications entrantes et enregistre le contenu du document. Le processeur gère automatiquement les baux pour le traitement parallèle.

Code source

Vous pouvez cloner l’exemple référentiel du Kit de développement logiciel (SDK) et trouver cet exemple dans SampleChangeFeedProcessor.java:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

Procédure pas à pas

  1. Configurez ChangeFeedProcessorOptions dans une application Java à l’aide d’Azure Cosmos DB et du SDK Java Azure Cosmos DB V4. ChangeFeedProcessorOptions fournit des paramètres essentiels pour contrôler le comportement du processeur de flux de modification pendant le traitement des données.

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);
    
  2. Initialisez ChangeFeedProcessor avec des configurations pertinentes, notamment le nom d’hôte, le conteneur de flux, le conteneur de bail et la logique de gestion des données. La méthode start() lance le traitement des données, ce qui permet le traitement simultané et en temps réel des modifications de données entrantes à partir du conteneur de flux.

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .options(options)
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. Spécifiez que le délégué gère les modifications de données entrantes à l’aide de la méthode handleChanges(). La méthode traite les documents JsonNode reçus à partir du flux de modification. En tant que développeur, vous disposez de deux options pour gérer le document JsonNode fourni par le flux de modification. Une option consiste à opérer sur le document sous la forme d’un JsonNode. C’est idéal, surtout si vous n’avez pas un seul modèle de données uniforme pour tous les documents. La deuxième option consiste à transformer le JsonNode en un POJO ayant la même structure que JsonNode. Vous pouvez ensuite opérer sur le POJO.

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Générer et exécuter votre application Java. L’application démarre le processeur de flux de modification, insère des échantillons de documents dans le conteneur de flux et traite les modifications entrantes.

Conclusion

Dans ce guide, vous avez appris à créer une application Java à l’aide du Kit de développement logiciel (SDK) Java Azure Cosmos DB V4 qui utilise la base de données Azure Cosmos DB for NoSQL et utilise le processeur de flux de modification pour le traitement des données en temps réel. Vous pouvez étendre cette application pour gérer des cas d’usage plus complexes et créer des applications robustes, évolutives et distribuées à l’échelle mondiale à l’aide d’Azure Cosmos DB.

Ressources supplémentaires

Étapes suivantes

Vous pouvez maintenant en apprendre davantage sur le processeur de flux de modifications en consultant les articles suivants :