Асинхронное программирование в пакете SDK Azure для Java

В этой статье описывается модель асинхронного программирования в пакете SDK Azure для Java.

Изначально пакет SDK Azure содержал только асинхронные неблокирующие API для взаимодействия со службами Azure. Эти API позволяют использовать пакет SDK Azure для создания масштабируемых приложений, которые эффективно используют системные ресурсы. Однако пакет SDK Azure для Java также содержит синхронные клиенты, предназначенные для более широкой аудитории. Они также делают клиентские библиотеки доступными для пользователей, которые не знакомы с асинхронным программированием. (См. раздел Подходим в рекомендациях по проектированию пакета SDK Для Azure.) Таким образом, все клиентские библиотеки Java в пакете SDK Azure для Java предлагают как асинхронные, так и синхронные клиенты. Однако для производственных систем рекомендуется использовать асинхронные клиенты. Это позволит максимально эффективно использовать системные ресурсы.

Реактивные потоки

В разделе Клиенты асинхронной службы статьи Рекомендации по проектированию пакета Azure SDK для Java указано, что вместо класса CompletableFuture, предоставляемого Java 8, наши асинхронные API используют реактивные типы потоков. Почему мы используем реактивные типы потоков вместо типов, которые изначально доступны в JDK?

В Java 8 появились такие функции, как потоки, лямбда-выражения и CompletableFuture. Эти функции предоставляют множество возможностей, но имеют некоторые ограничения.

CompletableFuture предоставляет неблокирующие возможности на основе обратных вызовов, а интерфейс CompletionStage позволяет легко создавать серию асинхронных операций. Лямбда-выражения делают эти API на основе принудительной передачи более удобными для чтения. Потоки предоставляют функциональные операции для управления коллекцией элементов данных. Однако потоки являются синхронными и не могут использоваться повторно. CompletableFuture позволяет выполнить один запрос, обеспечивает поддержку обратного вызова и ожидает один ответ. Однако многие облачные службы нуждаются в возможности потоковой передачи данных (например, концентраторы событий).

Реактивные потоки могут помочь устранить эти ограничения путем потоковой передачи элементов из источника в базу данных подписчика. Когда подписчик запрашивает данные из источника, источник возвращает любое количество результатов. Их не нужно передавать сразу. Передача происходит в течение определенного периода времени, когда в источнике появляются данные для отправки.

В этой модели подписчик регистрирует обработчики событий для обработки данных при их поступлении. Эти взаимодействия на основе принудительной отправки уведомляют подписчика с помощью различных сигналов:

  • Вызов onSubscribe() указывает, что начнется перемещение данных.
  • Вызов onError() указывает на ошибку, которая также свидетельствует об окончании передачи данных.
  • Вызов onComplete() указывает на успешное завершение передачи данных.

В отличие от потоков Java, реактивные потоки обрабатывают ошибки как события первого класса. Реактивные потоки имеют выделенный канал, чтобы источник мог передавать любые ошибки в базу данных подписчика. Кроме того, реактивные потоки позволяют подписчику согласовать скорость передачи данных для преобразования этих потоков в модель опрашивания и запрашивания.

Спецификация реактивных потоков предоставляет стандарт, который будет использоваться для перемещения данных. На высоком уровне спецификация определяет указанные ниже четыре интерфейса, а также правила их реализации.

  • Издатель является источником потока данных.
  • Подписчик является потребителем потока данных.
  • Подписка позволяет управлять состоянием обмена данными между издателем и подписчиком.
  • Обработчик может быть как издателем, так и подписчиком.

Существуют некоторые хорошо известные библиотеки Java, которые предоставляют реализации этой спецификации, такие как RxJava, Akka Streams, Vert.x и Project Reactor.

В пакете SDK Azure для Java для предоставления асинхронных API-интерфейсов реализован Project Reactor. Основным фактором, повлиявшим на это решение, было обеспечение беспрепятственной интеграции с Spring Webflux, который также использует Project Reactor. Второй фактор, повлиявший на выбор Project Reactor вместо RxJava, заключался в том, что Project Reactor использует Java 8, а RxJava по-прежнему Java 7. Project Reactor также предлагает обширный набор составляемых операторов, которые позволяют писать декларативный код для создания конвейеров обработки данных. Еще одно преимущество Project Reactor заключается в том, что в нем есть адаптеры для преобразования типов Project Reactor в другие популярные типы реализации.

Сравнение интерфейсов API для синхронных и асинхронных операций

Мы рассмотрели синхронные клиенты и параметры для асинхронных клиентов. В следующей таблице приведено краткое описание интерфейсов API, разработанных с использованием следующих параметров:

Тип API Нет значения Одно значение Несколько значений
Стандартные синхронные API-интерфейсы Java void T Iterable<T>
Стандартные асинхронные API-интерфейсы Java CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Интерфейсы реактивных потоков Publisher<Void> Publisher<T> Publisher<T>
Реализация реактивных потоков в Project Reactor Mono<Void> Mono<T> Flux<T>

Для полноты понимания стоит упомянуть, что в Java 9 появился класс Flow, который включает четыре интерфейса для реактивных потоков. Однако этот класс не содержит никакой реализации.

Использование асинхронных API-интерфейсов в пакете SDK Azure для Java

Спецификация реактивных потоков не различает типы издателей. В спецификации реактивных потоков издатели просто создают ноль или более элементов данных. Во многих случаях существует полезное различие между издателем, создающим максимум один элемент данных, и издателем, создающим ноль или более элементов. В облачных API это различие указывает, возвращает ли запрос однозначный ответ или коллекцию. Project Reactor предоставляет два типа, чтобы указать это различие — Mono и Flux. API-интерфейс, возвращающий Mono, будет содержать ответ, который имеет не более одного значения, а API, возвращающий Flux, — ответ, который имеет ноль или более значений.

Предположим, что вы используете ConfigurationAsyncClient для получения конфигурации, хранящейся с помощью службы "Конфигурация приложений Azure". (Дополнительные сведения см. в статье Что такое служба конфигурации приложений Azure?)

Если вы создаете ConfigurationAsyncClient и вызываете getConfigurationSetting() в клиенте, он возвращает Mono. Это означает, что ответ содержит одно значение. Однако при вызове одного метода не будет выполнено никаких действий. Клиент еще не выполнил запрос к службе "Конфигурация приложений Azure". На этом этапе Mono<ConfigurationSetting>, возвращаемый этим API, — это просто "сборка" конвейера обработки данных. Это означает, что необходимая настройка для использования данных завершена. Чтобы непосредственно запустить передачу данных (то есть выполнить запрос к службе и получить ответ), необходимо подписаться на возвращенный Mono. Поэтому при работе с этими реактивными потоками необходимо вызвать subscribe(), так как иначе ничего не произойдет.

В следующем примере показано, как подписаться на Mono и отобразить значение конфигурации в консоли.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Обратите внимание, что после вызова getConfigurationSetting() в клиенте пример кода подписывается на результат и предоставляет три отдельных лямбда-выражения. Первое лямбда-выражение использует данные, полученные от службы, которая активируется после успешного ответа. Второе лямбда-выражение активируется при возникновении ошибки во время получения конфигурации. Третье лямбда-выражение вызывается после завершения потока данных. Это означает, что из этого потока больше не ожидается элементов данных.

Примечание.

Как и при любом асинхронном программировании, после создания подписки выполнение продолжается в обычном режиме. Если нет ничего, что поддерживало бы программу в активном состоянии и состоянии выполнения, ее работа может быть завершена до окончания асинхронной операции. Основной поток, вызвавший subscribe(), не будет ожидать, пока вы сделаете сетевой вызов службы "Конфигурация приложений Azure" и получите ответ. В производственных системах можно продолжить обработку других данных. Однако в этом примере можно добавить небольшую задержку. Для этого вызовите Thread.sleep() или используйте CountDownLatch, чтобы дать возможность асинхронной операции завершиться.

Как показано в следующем примере, интерфейсы API, возвращающие Flux, следуют аналогичному шаблону. Разница заключается в том, что первый обратный вызов, предоставленный методу subscribe(), вызывается несколько раз для каждого элемента данных в ответе. Ошибки или обратные вызовы завершения вызываются только один раз и считаются сигналами терминала. Если от издателя получен любой из этих сигналов, никакие другие обратные вызовы не выполняются.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Обратная реакция

Что происходит, когда источник создает данные с более высокой скоростью, чем может обработать подписчик? Подписчик может получить переизбыток данных, что может привести к ошибкам нехватки памяти. Подписчику требуется способ обмена данными с издателем для замедления потока, когда его невозможно обработать. По умолчанию при вызове subscribe() для Flux, как показано в приведенном выше примере, подписчик запрашивает непривязанный поток данных, который указывает издателю, что данные требуется отправлять как можно быстрее. Такое поведение не всегда желательно, и подписчику может потребоваться управлять скоростью публикации с помощью "обратной реакции". Обратная реакция позволяет подписчику управлять потоком элементов данных. Подписчик запрашивает ограниченное число элементов данных, которые можно обработать. После того как подписчик завершит обработку этих элементов, он может запросить еще. С помощью обратной реакции можно преобразовать модель принудительной отправки в модель опрашивания и запрашивания для передачи данных.

В следующем примере показано, как можно контролировать скорость получения событий потребителем концентраторов событий.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Когда подписчик впервые подключается к издателю, он передает подписчику экземпляр Subscription, который управляет состоянием передаваемых данных. Этот экземпляр Subscription является средой передачи, с помощью которой подписчик может применять обратную реакцию, вызывая request(), чтобы указать, сколько элементов данных может быть обработано.

Если подписчик запрашивает несколько элементов данных каждый раз при вызове onNext(), например request(10), издатель немедленно отправит следующие 10 элементов, если они доступны (или когда станут доступны). Эти элементы накапливаются в буфере на стороне подписчика. Так как каждый вызов onNext() будет запрашивать следующие 10 элементов, невыполненная работа будет продолжать расти, пока у издателя не закончатся элементы данных для отправки или до переполнения буфера подписчика, что приведет к ошибкам нехватки памяти.

Отмена подписки

Подписка управляет состоянием передачи данными между издателем и подписчиком. Подписка активна до тех пор, пока издатель не завершит передачу всех данных подписчику или подписчик больше не захочет получать данные. Подписку можно отменить несколькими способами, как показано ниже.

В следующем примере подписка отменяется путем удаления подписчика.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

В следующем примере подписка отменяется путем вызова метода cancel() в Subscription.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Заключение

Потоки — это дорогостоящие ресурсы, которые не должны простаивать в ожидании ответов от вызовов удаленных служб. По мере увеличения архитектуры микрослужб необходимость масштабирования и эффективного использования ресурсов становится крайне важной. Асинхронные API-интерфейсы являются предпочтительными при наличии операций, связанных с сетью. Пакет SDK Azure для Java предоставляет обширный набор интерфейсов API для асинхронных операций, которые позволяют максимально увеличить объем системных ресурсов. Мы настоятельно рекомендуем попробовать и оценить асинхронные клиенты.

Дополнительные сведения об операторах, которые лучше подходят для конкретных задач, см. в этом разделе справочного руководства по работе с Reactor 3.

Следующие шаги

Теперь, когда вы лучше понимаете различные принципы асинхронного программирования, важно изучить, как выполнять итерацию по результатам. Дополнительные сведения о лучших стратегиях итерации и сведения о том, как работает разбиение на страницы, см. в статье Разбиение на страницы и итерация в пакете SDK Azure для Java.