Compartir a través de


Programación asincrónica en el SDK de Azure para Java

En este artículo se describe el modelo de programación asincrónica en el SDK de Azure para Java.

Inicialmente, el SDK de Azure solo contenía API asincrónicas y sin bloqueo para interactuar con los servicios de Azure. Estas API permiten usar el SDK de Azure para crear aplicaciones escalables que usan recursos del sistema de forma eficaz. Sin embargo, el SDK de Azure para Java también contiene clientes sincrónicos para atender a un público más amplio y también hacer que nuestras bibliotecas cliente sean accesibles para los usuarios que no están familiarizados con la programación asincrónica. (Consulte Enfoque en las directrices de diseño del SDK de Azure). Por lo tanto, todas las bibliotecas cliente de Java del SDK de Azure para Java ofrecen clientes asincrónicos y sincrónicos. Sin embargo, se recomienda usar los clientes asincrónicos para sistemas de producción para maximizar el uso de recursos del sistema.

Flujos reactivos

Si examina la sección Async Service Clients (Clientes de servicio asincrónico ) en las Directrices de diseño de Azure SDK de Java, observará que, en lugar de usar CompletableFuture proporcionado por Java 8, nuestras API asincrónicas usan tipos reactivos. ¿Por qué hemos elegido tipos reactivos sobre tipos que están disponibles de forma nativa en JDK?

Java 8 introdujo características como Streams, Lambdas y CompletableFuture. Estas características proporcionan muchas funcionalidades, pero tienen algunas limitaciones.

CompletableFuture proporciona funcionalidades sin bloqueo basadas en devoluciones de llamada y la interfaz CompletionStage permitida para facilitar la composición de una serie de operaciones asincrónicas. Las lambdas hacen que estas API impulsadas por eventos sean más legibles. Los flujos proporcionan operaciones de estilo funcional para controlar una colección de elementos de datos. Sin embargo, los flujos son sincrónicos y no se pueden reutilizar. CompletableFuture permite realizar una única solicitud, admite una llamada de retorno y espera una única respuesta. Sin embargo, muchos servicios en la nube requieren la capacidad de transmitir datos: Event Hubs por ejemplo.

Las secuencias reactivas pueden ayudar a superar estas limitaciones mediante la transmisión de elementos desde un origen hacia un suscriptor. Cuando un suscriptor solicita datos de un origen, el origen devuelve cualquier número de resultados. No es necesario enviarlos a la vez. La transferencia se produce durante un período de tiempo, siempre que el origen tenga datos que enviar.

En este modelo, el suscriptor registra controladores de eventos para procesar los datos cuando llega. Estas interacciones basadas en empuje notifican al suscriptor a través de señales únicas.

  • Una onSubscribe() llamada indica que la transferencia de datos está a punto de comenzar.
  • Una onError() llamada indica que se produjo un error, que también marca el final de la transferencia de datos.
  • Una onComplete() llamada indica la finalización correcta de la transferencia de datos.

A diferencia de Java Streams, las secuencias reactivas tratan los errores como eventos de primera clase. Las secuencias reactivas tienen un canal dedicado para que el origen comunique los errores al suscriptor. Además, los flujos reactivos permiten al suscriptor negociar la velocidad de transferencia de datos para transformar estos flujos en un modelo de empuje y extracción.

La especificación De secuencias reactivas proporciona un estándar para cómo debe producirse la transferencia de datos. En un nivel alto, la especificación define las cuatro interfaces siguientes y especifica reglas sobre cómo se deben implementar estas interfaces.

  • Publisher es el origen de un flujo de datos.
  • El suscriptor es el consumidor de un flujo de datos.
  • La suscripción administra el estado de la transferencia de datos entre un publicador y un suscriptor.
  • Procesador es tanto un editor como un suscriptor.

Hay algunas bibliotecas de Java conocidas que proporcionan implementaciones de esta especificación, como RxJava, Akka Streams, Vert.x y Project Reactor.

El SDK de Azure para Java adoptó Project Reactor para ofrecer sus API asincrónicas. El factor principal que impulsa esta decisión era proporcionar una integración fluida con Spring Webflux, que también usa Project Reactor. Otro factor que contribuye a elegir Project Reactor sobre RxJava fue que Project Reactor usa Java 8, pero RxJava, en ese momento, todavía estaba en Java 7. Project Reactor también ofrece un amplio conjunto de operadores que se pueden componer y le permiten escribir código declarativo para crear canalizaciones de procesamiento de datos. Otra cosa agradable sobre Project Reactor es que tiene adaptadores para convertir tipos de Project Reactor a otros tipos de implementación populares.

Comparación de las API de operaciones sincrónicas y asincrónicas

Hemos discutido los clientes sincrónicos y las opciones para los clientes asincrónicos. En la tabla siguiente se resume el aspecto de las API diseñadas con estas opciones:

Tipo de API No hay ningún valor Valor único Varios valores
Java estándar: API sincrónicas void T Iterable<T>
Java estándar: API asincrónicas CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Interfaces de flujos reactivos Publisher<Void> Publisher<T> Publisher<T>
Implementación de Project Reactor de los flujos reactivos Mono<Void> Mono<T> Flux<T>

Por motivos de integridad, merece la pena mencionar que Java 9 introdujo la clase Flow que incluye las cuatro interfaces reactivas de secuencias. Sin embargo, esta clase no incluye ninguna implementación.

Uso de api asincrónicas en el SDK de Azure para Java

La especificación de secuencias reactivas no diferencia entre los tipos de publicadores. En la especificación de flujos reactivos, los publicadores simplemente producen cero o más elementos de datos. En muchos casos, hay una distinción útil entre un publicador que produce como máximo un elemento de datos frente a uno que genera cero o más. En las API basadas en la nube, esta distinción indica si una solicitud devuelve una respuesta con un solo valor o una colección. Project Reactor proporciona dos tipos para hacer esta distinción: Mono y Flux. Una API que devuelve un Mono contiene una respuesta que tiene como máximo un valor y una API que devuelve un Flux valor contendrá una respuesta que tiene cero o más valores.

Por ejemplo, supongamos que usa configurationAsyncClient para recuperar una configuración almacenada mediante el servicio Azure App Configuration. (Para más información, consulte ¿Qué es Azure App Configuration?).

Si crea un ConfigurationAsyncClient y llama al getConfigurationSetting() en el cliente, devuelve un Mono, lo que indica que la respuesta contiene un valor único. Sin embargo, llamar solo a este método no hace nada. El cliente aún no ha realizado una solicitud al servicio Azure App Configuration. En esta fase, el valor Mono<ConfigurationSetting> devuelto por esta API es simplemente un "ensamblado" de la canalización de procesamiento de datos. Esto significa que se ha completado la configuración necesaria para consumir los datos. Para desencadenar realmente la transferencia de datos (es decir, para realizar la solicitud al servicio y obtener la respuesta), debe suscribirse al elemento Monodevuelto. Por lo tanto, al tratar con estos flujos reactivos, debe recordar llamar a subscribe(), porque no sucederá nada hasta que lo haga.

En el ejemplo siguiente se muestra cómo suscribirse a Mono e imprimir el valor de configuración en la consola.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Observe que después de llamar getConfigurationSetting() al cliente, el código de ejemplo se suscribe al resultado y proporciona tres lambdas independientes. La primera expresión lambda consume los datos recibidos del servicio, que se desencadena tras una respuesta correcta. La segunda lambda se desencadena cuando se produce un error al recuperar la configuración. La tercera expresión lambda se invoca cuando se completa el flujo de datos, lo que significa que no se esperan más elementos de datos de esta secuencia.

Nota:

Al igual que con toda la programación asincrónica, una vez creada la suscripción, la ejecución continúa como de costumbre. Si no hay nada para mantener el programa activo y en ejecución, puede finalizar antes de que se complete la operación asincrónica. El subproceso principal que llamó subscribe() no esperará hasta que realice la llamada de red a Azure App Configuration y reciba una respuesta. En los sistemas de producción, podría seguir procesando otra cosa, pero en este ejemplo puede agregar un pequeño retraso llamando a Thread.sleep() o usando CountDownLatch para dar a la operación asincrónica una oportunidad de completarse.

Como se muestra en el ejemplo siguiente, las API que devuelven también Flux siguen un patrón similar. La diferencia es que el primer callback proporcionado al método subscribe() se llama varias veces para cada elemento de datos que hay en la respuesta. Las devoluciones de llamada de error o de finalización se llaman exactamente una vez y se consideran señales de finalización. No se invocan otras devoluciones de llamada si se recibe alguna de estas señales desde el publicador.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Contrapresión

¿Qué ocurre cuando el origen produce los datos a una velocidad más rápida que el suscriptor puede controlar? El suscriptor puede verse abrumado por los datos, lo que puede provocar errores de falta de memoria. El suscriptor necesita una forma de volver a comunicarse con el publicador para que reduzca la velocidad cuando no la puede mantener. De forma predeterminada, cuando se llama a subscribe() en un Flux como se muestra en el ejemplo anterior, el suscriptor solicita una secuencia de datos ilimitada, lo que indica al publicador que envíe los datos lo antes posible. Este comportamiento no siempre es deseable y es posible que el suscriptor tenga que controlar la tasa de publicación a través de la "presión inversa". La contrapresión permite al suscriptor tomar el control del flujo de elementos de datos. Un suscriptor solicitará un número limitado de elementos de datos que pueden controlar. Una vez completado el procesamiento de estos elementos, el suscriptor puede solicitar más. Mediante el uso de la contrapresión, puede transformar un modelo de inserción para la transferencia de datos en un modelo de inserción y extracción.

En el ejemplo siguiente se muestra cómo puede controlar la velocidad a la que el consumidor de Event Hubs recibe los eventos:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Cuando el suscriptor se conecta primero al publicador, el publicador entrega al suscriptor una Subscription instancia, que administra el estado de la transferencia de datos. Este Subscription es el medio a través del cual el suscriptor puede aplicar la contrapresión llamando a request() para especificar cuántos elementos de datos más puede controlar.

Si el suscriptor solicita más de un elemento de datos cada vez que llama a onNext(), request(10) por ejemplo, el publicador enviará los siguientes 10 elementos inmediatamente si están disponibles o cuando estén disponibles. Estos elementos se acumulan en un búfer en el extremo del suscriptor y, puesto que cada llamada a onNext() solicita 10 elemento más, el trabajo pendiente crece hasta que el publicador no tiene más elementos de datos que enviar o el búfer del suscriptor se desborda, lo que provoca errores de memoria insuficiente.

Cancelar una suscripción

Una suscripción administra el estado de la transferencia de datos entre un publicador y un suscriptor. La suscripción está activa hasta que el publicador ha completado la transferencia de todos los datos al suscriptor o el suscriptor ya no está interesado en recibir datos. Hay un par de maneras de cancelar una suscripción, como se muestra a continuación.

En el ejemplo siguiente se cancela la suscripción mediante la eliminación del suscriptor:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

En el ejemplo siguiente se cancela la suscripción llamando al cancel() método en Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Conclusión

Los subprocesos son recursos caros que no se deben emplear en estar a la espera de respuestas de llamadas de servicios remotos. A medida que aumenta la adopción de arquitecturas de microservicios, la necesidad de escalar y usar recursos de forma eficaz es fundamental. Las API asincrónicas son favorables cuando hay operaciones enlazadas a la red. El SDK de Azure para Java ofrece un amplio conjunto de API para operaciones asincrónicas para ayudar a maximizar los recursos del sistema. Le recomendamos que pruebe nuestros clientes asincrónicos.

Para obtener más información sobre los operadores que mejor se adapten a sus tareas concretas, consulte ¿Qué operador necesito? en la Guía de referencia de Reactor 3.

Pasos siguientes

Ahora que comprende mejor los distintos conceptos de programación asincrónica, es importante aprender a iterar los resultados. Para más información sobre las mejores estrategias de iteración y detalles sobre cómo funciona la paginación, consulte Paginación e iteración en el SDK de Azure para Java.