Freigeben über


Erstellen einer Java-Anwendung, die die Azure Cosmos DB for NoSQL und den Änderungsfeedprozessor verwendet

GILT FÜR: NoSQL

Azure Cosmos DB ist ein vollständig verwalteter NoSQL-Datenbankdienst, der von Microsoft bereitgestellt wird. Er ermöglicht Ihnen, global verteilte und hochgradig skalierbare Anwendungen mit Leichtigkeit zu erstellen. Diese Schrittanleitung führt Sie durch den Prozess zum Erstellen einer Java-Anwendung, die die Azure Cosmos DB for NoSQL-Datenbank verwendet und den Änderungsfeedprozessor für die Echtzeitdatenverarbeitung implementiert. Die Java-Anwendung kommuniziert mit Azure Cosmos DB for NoSQL unter Verwendung des Azure Cosmos DB Java SDK v4.

Wichtig

Dieses Tutorial gilt nur für Azure Cosmos DB Java SDK V4. Weitere Informationen finden Sie in den Versionshinweisen zu Azure Cosmos DB Java SDK V4, im Maven-Repository, unter Änderungsfeedprozessor in Azure Cosmos DB und im Leitfaden zur Problembehandlung für Azure Cosmos DB Java SDK V4. Wenn Sie aktuell eine ältere Version als V4 verwenden, lesen Sie den Leitfaden Migrieren zu Azure Cosmos DB Java SDK V4, um Hilfe beim Aktualisieren auf V4 zu erhalten.

Voraussetzungen

  • Azure Cosmos DB-Konto: Sie können das Konto über das Azure-Portal erstellen oder auch den Azure Cosmos DB-Emulator verwenden.

  • Java-Entwicklungsumgebung: Stellen Sie sicher, dass das Java Development Kit (JDK) auf Ihrem Computer mit mindestens Version 8 installiert ist.

  • Azure Cosmos DB Java SDK V4: Bietet die erforderlichen Features für die Interaktion mit Azure Cosmos DB.

Hintergrund

Über den Azure Cosmos DB-Änderungsfeed wird eine ereignisgesteuerte Schnittstelle zum Auslösen von Aktionen als Reaktion auf Dokumenteinfügungen bereitgestellt. Dafür gibt es viele Anwendungsfälle.

Die Verwaltung der Ereignisse von Änderungsfeeds wird größtenteils von der Änderungsfeedprozessor-Bibliothek durchgeführt, die in das SDK integriert ist. Diese Bibliothek ist leistungsfähig genug, um die Ereignisse von Änderungsfeeds bei Bedarf auf mehrere Worker zu verteilen. Hierfür müssen Sie die Änderungsfeedbibliothek lediglich mit einem Rückruf versehen.

Dieses einfache Beispiel für eine Java-Anwendung veranschaulicht die Echtzeitdatenverarbeitung mit Azure Cosmos DB und dem Änderungsfeedprozessor. Die Anwendung fügt Beispieldokumente in einen „Feedcontainer“ ein, um einen Datenstrom zu simulieren. Der Änderungsfeedprozessor, der an den Feedcontainer gebunden ist, verarbeitet eingehende Änderungen und protokolliert den Dokumentinhalt. Der Prozessor verwaltet automatisch Leases für die Parallelverarbeitung.

Quellcode

Sie können das SDK-Beispielrepository klonen. Dieses Beispiel finden Sie in SampleChangeFeedProcessor.java:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

Exemplarische Vorgehensweise

  1. Konfigurieren Sie ChangeFeedProcessorOptions in einer Java-Anwendung mithilfe von Azure Cosmos DB und Azure Cosmos DB Java SDK V4. ChangeFeedProcessorOptions stellt wichtige Einstellungen bereit, um das Verhalten des Änderungsfeedprozessors während der Datenverarbeitung zu steuern.

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    
  2. Initialisieren Sie ChangeFeedProcessor mit relevanten Konfigurationen, einschließlich Hostname, Feedcontainer, Leasecontainer und Datenverarbeitungslogik. Die start()-Methode initiiert die Datenverarbeitung und ermöglicht die gleichzeitige Echtzeitverarbeitung eingehender Datenänderungen aus dem Feedcontainer.

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. Geben Sie an, dass der Delegat eingehende Datenänderungen mithilfe der handleChanges()-Methode verarbeitet. Die Methode verarbeitet die empfangenen JsonNode-Dokumente aus dem Änderungsfeed. Als Entwickler*in haben Sie zwei Optionen für die Verarbeitung des JsonNode-Dokuments, das Ihnen vom Änderungsfeed bereitgestellt wird. Eine Option besteht darin, das Dokument im JsonNode-Format zu verwenden. Diese Option ist besonders geeignet, wenn Sie nicht über ein einheitliches Datenmodell für alle Dokumente verfügen. Die zweite Option besteht im Transformieren des JsonNode-Dokuments in ein POJO-Dokument mit der gleichen Struktur wie das JsonNode-Dokument. Anschließend können Sie mit dem POJO-Dokument arbeiten.

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Erstellen Sie die Java-Anwendung, und führen Sie sie aus. Die Anwendung startet den Änderungsfeedprozessor, fügt Beispieldokumente in den Feedcontainer ein und verarbeitet die eingehenden Änderungen.

Schlussbemerkung

In diesem Leitfaden haben Sie erfahren, wie Sie mithilfe von Azure Cosmos DB Java SDK V4 eine Java-Anwendung erstellen, die die Azure Cosmos DB for NoSQL-Datenbank und den Änderungsfeedprozessor für die Datenverarbeitung in Echtzeit verwendet. Sie können diese Anwendung erweitern, um komplexere Anwendungsfälle zu verarbeiten und mithilfe von Azure Cosmos DB stabile, skalierbare und global verteilte Anwendungen zu erstellen.

Zusätzliche Ressourcen

Nächste Schritte

In den folgenden Artikeln erfahren Sie mehr über den Änderungsfeed-Estimator: