Compartir vía


Programación asincrónica en Azure SDK para Java

En este artículo se describe el modelo de programación asincrónica en Azure SDK para Java.

Azure SDK contenía inicialmente solo API asincrónicas sin bloqueo para interactuar con los servicios de Azure. Estas API permiten usar Azure SDK para crear aplicaciones escalables que usan los recursos del sistema de forma eficaz. Sin embargo, el SDK de Azure para Java también contiene clientes sincrónicos para satisfacer a un público más amplio y también hacer que las bibliotecas cliente sean más accesibles para aquellos usuarios que no están familiarizados con la programación asincrónica. (Consulte Accesible en las directrices de diseño del SDK de Azure). Por lo tanto, todas las bibliotecas cliente de Java del SDK de Azure para Java ofrecen clientes asincrónicos y sincrónicos. Sin embargo, se recomienda utilizar los clientes asincrónicos para los sistemas de producción con el fin de maximizar el uso de los recursos del sistema.

Flujos reactivos

Si observa la sección Clientes de servicio asincrónicos del documento Directrices de diseño de Azure SDK para Java, observará que, en lugar de usar el elemento CompletableFuture proporcionado por Java 8, nuestras API asincrónicas usan tipos reactivos. ¿Por qué hemos elegido tipos reactivos en lugar de los tipos que están disponibles de forma nativa en el JDK?

Java 8 presentó características como Streams (flujos), Lambdas y CompletableFuture. Estas características proporcionan muchas funcionalidades, pero tienen algunas limitaciones.

CompletableFuture proporciona funcionalidades sin bloqueo basadas en devoluciones de llamada y la interfaz CompletionStage permitida para facilitar la composición de una serie de operaciones asincrónicas. Las expresiones lambda hacen que estas API basadas en inserciones sean más legibles. Los flujos proporcionan operaciones de estilo funcional para controlar una colección de elementos de datos. Sin embargo, los flujos son sincrónicos y no se pueden reutilizar. CompletableFuture permite realizar una solicitud única, proporciona compatibilidad para una devolución de llamada y espera una respuesta única. Sin embargo, muchos servicios en la nube requieren la capacidad de transmitir datos, por ejemplo, Event Hubs.

Los flujos reactivos pueden ayudar a superar estas limitaciones mediante la transmisión de elementos de un origen a un suscriptor. Cuando un suscriptor solicita datos de un origen, el origen envía cualquier número de resultados de vuelta. No tiene por qué enviarlos todos a la vez. La transferencia tiene lugar durante un período de tiempo, siempre que el origen tenga datos para enviar.

En este modelo, el suscriptor registra controladores de eventos para procesar los datos cuando llegan. Estas interacciones basadas en inserciones notifican al suscriptor mediante distintas señales:

  • Una llamada a onSubscribe() indica que la transferencia de datos está a punto de comenzar.
  • Una llamada a onError() indica que se produjo un error, que también marca el final de la transferencia de datos.
  • Una llamada a onComplete() indica la finalización correcta de la transferencia de datos.

A diferencia de los flujos de Java, los flujos reactivos tratan los errores como eventos de primera clase. Los flujos reactivos tienen un canal dedicado para que el origen comunique los errores al suscriptor. Además, los flujos reactivos permiten que el suscriptor negocie la velocidad de transferencia de datos para transformar estos flujos en un modelo de inserción y extracción.

La especificación Flujos reactivos proporciona un estándar para la forma en que se debe realizar la transferencia de datos. En un nivel general, la especificación define las cuatro interfaces siguientes y especifica reglas sobre cómo se deben implementar estas interfaces.

  • Publisher (Publicador) es el origen de un flujo de datos.
  • Subscriber (Suscriptor) es el consumidor de un flujo de datos.
  • Subscription (Suscripción) administra el estado de la transferencia de datos entre un publicador y un suscriptor.
  • Processor (Procesador) es tanto un publicador como un suscriptor.

Existen algunas bibliotecas de Java conocidas que proporcionan implementaciones de esta especificación, como RxJava, Akka Streams, Vert.x y Project Reactor.

Azure SDK para Java ha adoptado Project Reactor para ofrecer sus API asincrónicas. El factor principal que impulsó esta decisión era proporcionar una integración fluida con Spring Webflux, que también utiliza Project Reactor. Otro factor que contribuyó a la elección de Project Reactor sobre RxJava era que Project Reactor usa Java 8 pero RxJava, en ese momento, estaba todavía en Java 7. Project Reactor también ofrece un amplio conjunto de operadores que admiten composición y permiten escribir código declarativo para crear canalizaciones de procesamiento de datos. Otro aspecto importante sobre Project Reactor es que tiene adaptadores para convertir los tipos de Project Reactor en otros tipos de implementación populares.

Comparación de las API de operaciones sincrónicas y asincrónicas

Hemos tratado los clientes sincrónicos y las opciones para los clientes asincrónicos. En la tabla siguiente se resume el aspecto de las API diseñadas con estas opciones:

Tipo de API No hay ningún valor Un solo valor Varios valores
Java estándar: API sincrónicas void T Iterable<T>
Java estándar: API asincrónicas CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Interfaces de flujos reactivos Publisher<Void> Publisher<T> Publisher<T>
Implementación de Project Reactor de los flujos reactivos Mono<Void> Mono<T> Flux<T>

Para tener una visión completa, merece la pena mencionar que Java 9 presentó la clase Flow, que incluye las cuatro interfaces de flujos reactivos. Sin embargo, esta clase no incluye ninguna implementación.

Uso de las API asincrónicas en Azure SDK para Java

La especificación de flujos reactivos no distingue entre tipos de publicadores. En la especificación de flujos reactivos, los publicadores simplemente producen cero o más elementos de datos. En muchos casos, hay una distinción útil entre un publicador que produce a lo sumo un elemento de datos frente a uno que produce cero o más. En las API basadas en la nube, esta distinción indica si una solicitud devuelve una respuesta de un solo valor o una colección. Project Reactor proporciona dos tipos para hacer esta distinción: Mono y Flux. Una API que devuelve un tipo Mono contendrá una respuesta con un valor como máximo y una API que devuelva un tipo Flux contendrá una respuesta que tiene cero o más valores.

Por ejemplo, supongamos que usa un elemento ConfigurationAsyncClient para recuperar una configuración almacenada mediante el servicio Azure App Configuration. (Para más información, consulte ¿Qué es Azure App Configuration?)

Si crea un elemento ConfigurationAsyncClient y llama a getConfigurationSetting() en el cliente, devuelve un tipo Mono, lo que indica que la respuesta contiene un valor único. Sin embargo, llamar a este método por si solo no hace nada. El cliente todavía no ha realizado una solicitud al servicio Azure App Configuration. En esta fase, el valor Mono<ConfigurationSetting> devuelto por esta API es simplemente un "ensamblado" de la canalización de procesamiento de datos. Esto significa que se ha completado la configuración necesaria para consumir los datos. Para desencadenar realmente la transferencia de datos (es decir, para realizar la solicitud al servicio y obtener la respuesta), debe suscribirse al elemento Monodevuelto. Por lo tanto, al tratar con estos flujos reactivos, debe recordar llamar a subscribe(), porque no sucederá nada hasta que lo haga.

En el ejemplo siguiente se muestra cómo suscribirse al elemento Mono e imprimir el valor de configuración en la consola.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Observe que después de llamar a getConfigurationSetting() en el cliente, el código de ejemplo se suscribe al resultado y proporciona tres expresiones lambda independientes. La primera expresión lambda consume los datos recibidos del servicio, que se desencadena tras una respuesta correcta. La segunda expresión lambda se desencadena si se produce un error al recuperar la configuración. La tercera expresión lambda se invoca cuando se completa el flujo de datos, lo que significa que no se esperan más elementos de datos de este flujo.

Nota:

Como con toda la programación asincrónica, una vez creada la suscripción, la ejecución continúa de la forma habitual. Si no hay nada para mantener el programa activo y en ejecución, puede finalizar antes de que se complete la operación asincrónica. El subproceso principal que llamó a subscribe() no esperará hasta que realice la llamada de red a Azure App Configuration y reciba una respuesta. En los sistemas de producción, puede continuar procesando otra cosa, pero en este ejemplo puede agregar un pequeño retraso mediante una llamada a Thread.sleep() o usar un elemento CountDownLatch para que la operación asincrónica tenga la oportunidad de completarse.

Como se muestra en el ejemplo siguiente, las API que devuelven un elemento Flux también siguen un patrón similar. La diferencia es que la primera devolución de llamada proporcionada al método subscribe() es llamada varias veces para cada elemento de datos de la respuesta. Las devoluciones de llamada de error o de finalización se llaman exactamente una vez y se consideran señales de finalización. No se invocan otras devoluciones de llamada si se recibe alguna de estas señales desde el publicador.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Contrapresión

¿Qué ocurre cuando el origen genera los datos a una velocidad mayor que la que el suscriptor puede controlar? El suscriptor se puede saturar con los datos, lo que puede dar lugar a errores de memoria insuficiente. El suscriptor necesita una forma de volver a comunicarse con el publicador para que reduzca la velocidad cuando no la puede mantener. De forma predeterminada, cuando se llama a subscribe() en un elemento Flux como se muestra en el ejemplo anterior, el suscriptor solicita un flujo de datos no asociado, lo que indica al publicador que envíe los datos lo más rápido posible. Este comportamiento no siempre es recomendable y puede que el suscriptor tenga que controlar la velocidad de publicación mediante la "contrapresión". La contrapresión permite al suscriptor tomar el control del flujo de elementos de datos. Un suscriptor solicitará un número limitado de elementos de datos que pueda controlar. Después de que el suscriptor haya completado el procesamiento de estos elementos, el suscriptor puede solicitar más. Mediante el uso de la contrapresión, puede transformar un modelo de inserción para la transferencia de datos en un modelo de inserción y extracción.

En el ejemplo siguiente se muestra cómo puede controlar la velocidad a la que el consumidor de Event Hubs recibe los eventos:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Cuando el suscriptor "se conecta" por primera vez al publicador, el publicador entrega al suscriptor una instancia de Subscription, que administra el estado de la transferencia de datos. Este elemento Subscription es el medio por el que el suscriptor puede aplicar la contrapresión mediante una llamada a request() para especificar cuántos elementos de datos más puede controlar.

Si el suscriptor solicita más de un elemento de datos cada vez que llama a onNext(), por ejemplo, con request(10), el publicador enviará los 10 elementos siguientes inmediatamente si están disponibles o cuando estén disponibles. Estos elementos se acumulan en un búfer en el extremo del suscriptor y, puesto que cada llamada a onNext() solicita 10 elemento más, el trabajo pendiente crece hasta que el publicador no tiene más elementos de datos que enviar o el búfer del suscriptor se desborda, lo que provoca errores de memoria insuficiente.

Cancelación de una suscripción

Una suscripción administra el estado de la transferencia de datos entre un publicador y un suscriptor. La suscripción está activa hasta que el publicador haya completado la transferencia de todos los datos al suscriptor o el suscriptor ya no esté interesado en recibir datos. Hay un par de formas con las que puede cancelar una suscripción, como se muestra a continuación.

En el ejemplo siguiente se cancela la suscripción mediante la eliminación del suscriptor:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

En el ejemplo siguiente se cancela la suscripción mediante una llamada al método cancel() en Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Conclusión

Los subprocesos son recursos caros que no se deben emplear en estar a la espera de respuestas de llamadas de servicios remotos. A medida que aumenta la adopción de las arquitecturas de microservicios, la necesidad de escalar y usar los recursos de manera eficaz es fundamental. Las API asincrónicas son favorables cuando hay operaciones enlazadas a la red. Azure SDK para Java ofrece un amplio conjunto de API para operaciones asincrónicas que ayudan a maximizar los recursos del sistema. Le recomendamos que pruebe nuestros clientes asincrónicos.

Para más información sobre los operadores que mejor se adaptan a sus tareas específicas, consulte ¿Qué operador necesito? en la Guía de referencia de Reactor 3.

Pasos siguientes

Ahora que comprende mejor los distintos conceptos de la programación asincrónica, es importante aprender a recorrer en iteración los resultados. Para más información sobre las mejores estrategias de iteración y los detalles sobre cómo funciona la paginación, consulte Paginación e iteración en Azure SDK para Java.