Jak utworzyć aplikację Java korzystającą z usługi Azure Cosmos DB dla NoSQL i procesora zestawienia zmian

DOTYCZY: NoSQL

Azure Cosmos DB to w pełni zarządzana usługa bazy danych NoSQL udostępniana przez firmę Microsoft. Umożliwia łatwe tworzenie globalnie rozproszonych i wysoce skalowalnych aplikacji. Ten przewodnik z instrukcjami przeprowadzi Cię przez proces tworzenia aplikacji Java, która korzysta z bazy danych Azure Cosmos DB for NoSQL i implementuje procesor zestawienia zmian na potrzeby przetwarzania danych w czasie rzeczywistym. Aplikacja Java komunikuje się z usługą Azure Cosmos DB for NoSQL przy użyciu zestawu Java SDK usługi Azure Cosmos DB w wersji 4.

Ważne

Ten samouczek dotyczy tylko zestawu Java SDK usługi Azure Cosmos DB w wersji 4. Aby uzyskać więcej informacji, zobacz informacje o wersji zestawu Java SDK usługi Azure Cosmos DB w wersji 4, repozytorium Maven, procesor zestawienia zmian w usłudze Azure Cosmos DB i przewodnik rozwiązywania problemów z zestawem Java SDK usługi Azure Cosmos DB w wersji 4. Jeśli obecnie używasz starszej wersji niż 4, zobacz przewodnik Migrate to Azure Cosmos DB Java SDK v4 (Migrowanie do zestawu Java SDK usługi Azure Cosmos DB w wersji 4), aby uzyskać pomoc dotyczącą uaktualniania do wersji 4.

Wymagania wstępne

Tło

Źródło zmian usługi Azure Cosmos DB udostępnia interfejs sterowany zdarzeniami umożliwiający wyzwalanie akcji w odpowiedzi na wstawienie dokumentu, które ma wiele zastosowań.

Praca zarządzania zdarzeniami zestawienia zmian jest w dużej mierze zajmowana przez bibliotekę procesora zestawienia zmian wbudowaną w zestaw SDK. Ta biblioteka jest wystarczająco zaawansowana, aby w razie potrzeby dystrybuować zdarzenia zestawienia zmian między wieloma procesami roboczymi. Wystarczy podać bibliotekę zestawienia zmian wywołanie zwrotne.

Ten prosty przykład aplikacji Java demonstruje przetwarzanie danych w czasie rzeczywistym za pomocą usługi Azure Cosmos DB i procesora zestawienia zmian. Aplikacja wstawia przykładowe dokumenty do "kontenera źródła danych", aby symulować strumień danych. Procesor zestawienia zmian powiązany z kontenerem kanału informacyjnego przetwarza przychodzące zmiany i rejestruje zawartość dokumentu. Procesor automatycznie zarządza dzierżawami przetwarzania równoległego.

Kod źródłowy

Możesz sklonować przykładowe repozytorium zestawu SDK i znaleźć ten przykład w pliku SampleChangeFeedProcessor.java:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

Przewodnik

  1. ChangeFeedProcessorOptions Skonfiguruj aplikację w języku Java przy użyciu usługi Azure Cosmos DB i zestawu Java SDK usługi Azure Cosmos DB w wersji 4. Zapewnia ChangeFeedProcessorOptions podstawowe ustawienia umożliwiające sterowanie zachowaniem procesora zestawienia zmian podczas przetwarzania danych.

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    
  2. Zainicjuj metodę ChangeFeedProcessor za pomocą odpowiednich konfiguracji, w tym nazwy hosta, kontenera źródła danych, kontenera dzierżawy i logiki obsługi danych. Metoda start() inicjuje przetwarzanie danych, umożliwiając współbieżne i w czasie rzeczywistym przetwarzanie przychodzących zmian danych z kontenera kanału informacyjnego.

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. Określ delegata obsługuje zmiany danych przychodzących przy użyciu handleChanges() metody . Metoda przetwarza odebrane dokumenty JsonNode z zestawienia zmian. Jako deweloper masz dwie opcje obsługi dokumentu JsonNode dostarczonego przez zestawienie zmian. Jedną z opcji jest działanie na dokumencie w postaci węzła JsonNode. Jest to doskonałe rozwiązanie, zwłaszcza jeśli nie masz jednego jednolitego modelu danych dla wszystkich dokumentów. Druga opcja — przekształć węzeł JsonNode na obiekt POJO o tej samej strukturze co węzeł JsonNode. Następnie można pracować na poJO.

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Skompiluj i uruchom aplikację Java. Aplikacja uruchamia procesor zestawienia zmian, wstaw przykładowe dokumenty do kontenera kanału informacyjnego i przetwarza zmiany przychodzące.

Podsumowanie

W tym przewodniku przedstawiono sposób tworzenia aplikacji Java przy użyciu zestawu Java SDK usługi Azure Cosmos DB w wersji 4, który korzysta z bazy danych Azure Cosmos DB for NoSQL i używa procesora zestawienia zmian na potrzeby przetwarzania danych w czasie rzeczywistym. Tę aplikację można rozszerzyć w celu obsługi bardziej złożonych przypadków użycia i tworzenia niezawodnych, skalowalnych i globalnie rozproszonych aplikacji przy użyciu usługi Azure Cosmos DB.

Dodatkowe zasoby

Następne kroki

Teraz możesz dowiedzieć się więcej na temat narzędzia do szacowania zestawienia zmian w następujących artykułach: