Поделиться через


Создание приложения Java, использующего Azure Cosmos DB для NoSQL и обработчик веб-канала изменений

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

Azure Cosmos DB — это полностью управляемая служба базы данных NoSQL, предоставляемая корпорацией Майкрософт. Это позволяет легко создавать глобально распределенные и высокомасштабируемые приложения. В этом руководстве описывается процесс создания приложения Java, использующего базу данных Azure Cosmos DB для NoSQL и реализующий обработчик канала изменений для обработки данных в режиме реального времени. Приложение Java взаимодействует с Azure Cosmos DB для NoSQL с помощью пакета SDK Java для Azure Cosmos DB версии 4.

Внимание

Это руководство применимо только к пакету SDK Java для Azure Cosmos DB версии 4. Дополнительные сведения см. в заметках о выпуске пакета SDK Java для Azure Cosmos DB версии 4, репозитории Maven, обработчике веб-канала изменений в Azure Cosmos DB и пакете SDK java для Azure Cosmos DB версии 4. Если сейчас вы используете более раннюю версию, чем версия 4, руководство Перевод приложения на использование пакета средств разработки Java для Azure Cosmos DB версии 4 поможет вам обновить его до версии 4.

Необходимые компоненты

  • Учетная запись Azure Cosmos DB: ее можно создать из портал Azure или использовать эмулятор Azure Cosmos DB.

  • Среда разработки Java. Убедитесь, что на компьютере установлен пакет средств разработки Java (JDK) с по крайней мере 8 версиями.

  • Пакет SDK Java для Azure Cosmos DB версии 4: предоставляет необходимые функции для взаимодействия с Azure Cosmos DB.

Общие сведения

Веб-канал изменений Azure Cosmos DB предоставляет интерфейс, управляемый событиями, чтобы активировать действия в ответ на вставку документов, которая имеет много использования.

Работа по управлению событиями канала изменений выполняется в основном встроенной в пакет SDK библиотекой обработчика канала изменений. Эта библиотека даже умеет распределять события канала изменений между несколькими рабочими ролями, если это потребуется. Вам достаточно лишь предоставить ей обратный вызов.

В этом простом примере приложения Java демонстрируется обработка данных в режиме реального времени с помощью Azure Cosmos DB и обработчика канала изменений. Приложение вставляет примеры документов в контейнер веб-канала для имитации потока данных. Обработчик канала изменений, привязанный к контейнеру веб-канала, обрабатывает входящие изменения и регистрирует содержимое документа. Обработчик автоматически управляет арендами для параллельной обработки.

Исходный код

Вы можете клонировать репозиторий пакета SDK и найти этот пример в SampleChangeFeedProcessor.java:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

Пошаговое руководство

  1. ChangeFeedProcessorOptions Настройте приложение Java с помощью Azure Cosmos DB и пакета SDK Java для Azure Cosmos DB версии 4. Предоставляет ChangeFeedProcessorOptions основные параметры для управления поведением обработчика канала изменений во время обработки данных.

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    
  2. Инициализировать ChangeFeedProcessor с соответствующими конфигурациями, включая имя узла, контейнер веб-канала, контейнер аренды и логику обработки данных. Метод start() инициирует обработку данных, обеспечивая одновременную и в реальном времени обработку входящих изменений данных из контейнера веб-канала.

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. Укажите делегат, обрабатывающий входящие изменения данных с помощью handleChanges() метода. Метод обрабатывает полученные документы JsonNode из канала изменений. У разработчика есть два варианта обработки документа JsonNode, предоставленного веб-каналом изменений. Один из вариантов — работать с документом в виде JsonNode. Это отлично, особенно если у вас нет единой модели данных для всех документов. Второй вариант — преобразование JsonNode в POJO с той же структурой, что и JsonNode. Затем вы можете работать с POJO.

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Создайте и запустите приложение Java. Приложение запускает обработчик канала изменений, вставляет примеры документов в контейнер веб-канала и обрабатывает входящие изменения.

Заключение

В этом руководстве вы узнали, как создать приложение Java с помощью пакета SDK Java для Azure Cosmos DB версии 4 , использующего базу данных Azure Cosmos DB для NoSQL, и использовать обработчик канала изменений для обработки данных в режиме реального времени. Это приложение можно расширить для обработки более сложных вариантов использования и создания надежных, масштабируемых и глобально распределенных приложений с помощью Azure Cosmos DB.

Дополнительные ресурсы

Следующие шаги

Теперь вы можете перейти к дополнительным сведениям об оценке канала изменений в следующих статьях: