Поделиться через


Создание Java-приложения, использующего Azure Cosmos DB в качестве NoSQL базы данных и обработчик ленты изменений

Azure Cosmos DB — это полностью управляемая служба базы данных NoSQL, предоставляемая корпорацией Майкрософт. Это позволяет легко создавать глобально распределенные и высокомасштабируемые приложения. В этом руководстве описывается процесс создания приложения Java, использующего базу данных Azure Cosmos DB для NoSQL и реализующий обработчик канала изменений для обработки данных в режиме реального времени. Приложение Java взаимодействует с Azure Cosmos DB для NoSQL с помощью пакета SDK Java для Azure Cosmos DB версии 4.

Это важно

Это руководство предназначено только для пакета SDK Java для Azure Cosmos DB версии 4. Дополнительные сведения см. в заметках о выпуске SDK Java для Azure Cosmos DB версии 4, репозитории Maven, обработчике веб-канала изменений в Azure Cosmos DB и руководстве по устранению неполадок SDK Java для Azure Cosmos DB версии 4. Если сейчас вы используете более раннюю версию, чем версия 4, руководство Перевод приложения на использование пакета средств разработки Java для Azure Cosmos DB версии 4 поможет вам обновить его до версии 4.

Предпосылки

  • Учетная запись Azure Cosmos DB: ее можно создать на портале Azure или использовать эмулятор Azure Cosmos DB .

  • Среда разработки Java. Убедитесь, что на компьютере установлен пакет средств разработки Java (JDK) с по крайней мере 8 версиями.

  • Пакет SDK Java для Azure Cosmos DB версии 4: предоставляет необходимые функции для взаимодействия с Azure Cosmos DB.

Предыстория

Лента изменений Azure Cosmos DB предоставляет интерфейс, управляемый событиями, чтобы активировать действия в ответ на вставку документов, которую можно использовать различными способами.

Работа по управлению событиями канала изменений в значительной степени учитывается библиотекой обработчика канала изменений, встроенной в пакет SDK. Эта библиотека достаточно эффективна для распространения событий канала изменений между несколькими рабочими возможностями, если это необходимо. Все, что нужно сделать, — это предоставить библиотеку потока изменений в функцию обратного вызова.

В этом простом примере приложения Java демонстрируется обработка данных в режиме реального времени с помощью Azure Cosmos DB и обработчика канала изменений. Приложение вставляет примеры документов в контейнер потока, чтобы имитировать поток данных. Обработчик канала изменений, привязанный к контейнеру веб-канала, обрабатывает входящие изменения и регистрирует содержимое документа. Обработчик автоматически управляет арендами для параллельной обработки.

Исходный код

Вы можете клонировать репозиторий пакета SDK и найти этот пример в SampleChangeFeedProcessor.java:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

Walkthrough

  1. Настройте ChangeFeedProcessorOptions в Java-приложении, используя Azure Cosmos DB и Azure Cosmos DB Java SDK V4. Предоставляет ChangeFeedProcessorOptions важные настройки для управления работой Change Feed Processora при обработке данных.

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);
    
  2. Инициализировать ChangeFeedProcessor с соответствующими конфигурациями, включая имя узла, контейнер веб-канала, контейнер аренды и логику обработки данных. Метод start() инициирует обработку данных, что обеспечивает параллельную и оперативную обработку входящих изменений данных из фид-контейнера.

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .options(options)
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. Укажите делегат, обрабатывающий входящие изменения данных с помощью handleChanges() метода. Метод обрабатывает полученные документы JsonNode из канала Change Feed. У разработчика есть два варианта обработки документа JsonNode, предоставленного веб-каналом изменений. Один из вариантов — работать с документом в виде JsonNode. Это отлично, особенно если у вас нет единой модели данных для всех документов. Второй вариант — преобразование JsonNode в POJO с той же структурой, что и JsonNode. Затем вы можете работать с POJO.

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Создайте и запустите приложение Java. Приложение запускает обработчик канала данных изменения, вставляет образцы документов в контейнер канала данных и обрабатывает входящие изменения.

Conclusion

В этом руководстве вы узнали, как создать приложение Java с помощью пакета SDK Java для Azure Cosmos DB версии 4 , использующего базу данных Azure Cosmos DB для NoSQL, и использовать обработчик канала изменений для обработки данных в режиме реального времени. Это приложение можно расширить для обработки более сложных вариантов использования и создания надежных, масштабируемых и глобально распределенных приложений с помощью Azure Cosmos DB.

Дополнительные ресурсы

Дальнейшие шаги

Теперь вы можете более подробно узнать об оценщике потока изменений в следующих статьях: