Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este artículo se describe el modelo de programación asincrónica en el SDK de Azure para Java.
Inicialmente, el SDK de Azure solo contenía API asincrónicas y sin bloqueo para interactuar con los servicios de Azure. Estas API permiten usar el SDK de Azure para crear aplicaciones escalables que usan recursos del sistema de forma eficaz. Sin embargo, el SDK de Azure para Java también contiene clientes sincrónicos para atender a un público más amplio y hacer que las bibliotecas cliente sean accesibles para los usuarios que no están familiarizados con la programación asincrónica. (Consulte Enfoque en las directrices de diseño del SDK de Azure). Por lo tanto, todas las bibliotecas cliente de Java del SDK de Azure para Java ofrecen clientes asincrónicos y sincrónicos. Sin embargo, use los clientes asincrónicos para los sistemas de producción para maximizar el uso de recursos del sistema.
Flujos reactivos en el SDK de Azure para Java
Si examina la sección Async Service Clients de la Java SDK de Azure Design Guidelines, verá que, en lugar de usar CompletableFuture proporcionado por Java 8, las API asincrónicas usan tipos reactivos. ¿Por qué el equipo de SDK de Azure eligió tipos reactivos sobre los tipos que están disponibles de forma nativa en JDK?
Java 8 introdujo características como Streams, Lambdas y CompletableFuture. Estas características proporcionan muchas funcionalidades, pero tienen algunas limitaciones.
CompletableFuture proporciona funcionalidades de devolución de llamada, sin bloqueo y la CompletionStage interfaz facilita la redacción de una serie de operaciones asincrónicas. Las lambdas hacen que estas API impulsadas por eventos sean más legibles. Los flujos proporcionan operaciones de estilo funcional para controlar una colección de elementos de datos. Sin embargo, los flujos son sincrónicos y no se pueden reutilizar.
CompletableFuture permite realizar una única solicitud, admite una llamada de retorno y espera una única respuesta. Sin embargo, muchos servicios en la nube requieren la capacidad de transmitir datos: Event Hubs por ejemplo.
Los flujos reactivos pueden ayudar a superar estas limitaciones transmitiendo elementos desde una fuente a un suscriptor. Cuando un suscriptor solicita datos de un origen, el origen devuelve cualquier número de resultados. No es necesario enviarlos a la vez. La transferencia se produce durante un período de tiempo, siempre que el origen tenga datos que enviar.
En este modelo, el suscriptor registra controladores de eventos para procesar los datos cuando llega. Estas interacciones basadas en empuje notifican al suscriptor a través de señales únicas.
- Una
onSubscribe()llamada indica que la transferencia de datos está a punto de comenzar. - Una
onError()llamada indica que se produjo un error, que también marca el final de la transferencia de datos. - Una
onComplete()llamada indica la finalización correcta de la transferencia de datos.
A diferencia de Java Streams, las secuencias reactivas tratan los errores como eventos de primera clase. Las secuencias reactivas tienen un canal dedicado para que el origen comunique los errores al suscriptor. Además, los flujos reactivos permiten al suscriptor negociar la velocidad de transferencia de datos para transformar estos flujos en un modelo de empuje y extracción.
La especificación De secuencias reactivas proporciona un estándar para cómo debe producirse la transferencia de datos. En un nivel alto, la especificación define las cuatro interfaces siguientes y especifica reglas sobre cómo se deben implementar estas interfaces.
- Publisher es el origen de un flujo de datos.
- El suscriptor es el consumidor de un flujo de datos.
- La suscripción administra el estado de la transferencia de datos entre un publicador y un suscriptor.
- Procesador es tanto un editor como un suscriptor.
Algunas bibliotecas de Java conocidas proporcionan implementaciones de esta especificación, como RxJava, Akka Streams, Vert.x y Project Reactor.
El SDK de Azure para Java adoptó Project Reactor para ofrecer sus API asincrónicas. El factor principal que impulsa esta decisión era proporcionar una integración fluida con Spring Webflux, que también usa Project Reactor. Otro factor que contribuye a elegir Project Reactor sobre RxJava fue que Project Reactor usa Java 8, pero RxJava, en ese momento, todavía estaba en Java 7. Project Reactor también ofrece un amplio conjunto de operadores que se pueden componer y le permiten escribir código declarativo para crear canalizaciones de procesamiento de datos. Otra cosa agradable sobre Project Reactor es que tiene adaptadores para convertir tipos de Project Reactor a otros tipos de implementación populares.
Comparación de las API de operaciones sincrónicas y asincrónicas
Ha aprendido sobre los clientes sincrónicos y las opciones para clientes asincrónicos. En la tabla siguiente se resume el aspecto de las API diseñadas con estas opciones:
| Tipo de API | No hay ningún valor | Valor único | Varios valores |
|---|---|---|---|
| Java estándar: API sincrónicas | void |
T |
Iterable<T> |
| Java estándar: API asincrónicas | CompletableFuture<Void> |
CompletableFuture<T> |
CompletableFuture<List<T>> |
| Interfaces de flujos reactivos | Publisher<Void> |
Publisher<T> |
Publisher<T> |
| Implementación de Project Reactor de los flujos reactivos | Mono<Void> |
Mono<T> |
Flux<T> |
Por motivos de integridad, merece la pena mencionar que Java 9 introdujo la clase Flow que incluye las cuatro interfaces reactivas de secuencias. Sin embargo, esta clase no incluye ninguna implementación.
Uso de api asincrónicas en el SDK de Azure para Java
La especificación de secuencias reactivas no diferencia entre los tipos de publicadores. En la especificación de flujos reactivos, los publicadores simplemente producen cero o más elementos de datos. En muchos casos, hay una distinción útil entre un publicador que produce como máximo un elemento de datos frente a uno que genera cero o más. En las API basadas en la nube, esta distinción indica si una solicitud devuelve una respuesta con un solo valor o una colección. Project Reactor proporciona dos tipos para hacer esta distinción: Mono y Flux. Una API que devuelve un Mono contiene una respuesta que tiene como máximo un valor y una API que devuelve un Flux valor contendrá una respuesta que tiene cero o más valores.
Por ejemplo, supongamos que usa configurationAsyncClient para recuperar una configuración almacenada mediante el servicio Azure App Configuration. (Para más información, consulte ¿Qué es Azure App Configuration?).
Si crea un ConfigurationAsyncClient y llama al getConfigurationSetting() en el cliente, devuelve un Mono, lo que indica que la respuesta contiene un valor único. Sin embargo, llamar solo a este método no hace nada. El cliente aún no ha realizado una solicitud al servicio Azure App Configuration. En esta fase, el valor Mono<ConfigurationSetting> devuelto por esta API es simplemente un "ensamblado" de la canalización de procesamiento de datos. Esta arquitectura significa que se ha completado la configuración necesaria para consumir los datos. Para desencadenar realmente la transferencia de datos (es decir, para realizar la solicitud al servicio y obtener la respuesta), debe suscribirse al elemento Monodevuelto. Por lo tanto, al tratar con estos flujos reactivos, debe recordar llamar a subscribe(), porque no sucederá nada hasta que lo haga.
En el ejemplo siguiente se muestra cómo suscribirse a Mono e imprimir el valor de configuración en la consola.
ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
.connectionString("<your connection string>")
.buildAsyncClient();
asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
config -> System.out.println("Config value: " + config.getValue()),
ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
() -> System.out.println("Successfully retrieved configuration setting"));
System.out.println("Done");
Observe que después de llamar getConfigurationSetting() al cliente, el código de ejemplo se suscribe al resultado y proporciona tres lambdas independientes. La primera expresión lambda consume los datos recibidos del servicio, que se desencadena tras una respuesta correcta. La segunda función lambda se activa si se produce un error al recuperar la configuración. La tercera expresión lambda se invoca cuando se completa el flujo de datos, lo que significa que no se esperan más elementos de datos de esta secuencia.
Nota:
Al igual que con toda la programación asincrónica, una vez creada la suscripción, la ejecución continúa como de costumbre. Si no hay nada para mantener el programa activo y en ejecución, puede finalizar antes de que se complete la operación asincrónica. El subproceso principal que llamó a subscribe() no espera hasta que realice la llamada de red a Azure App Configuration y reciba una respuesta. En los sistemas de producción, podría seguir procesando otra cosa, pero en este ejemplo puede agregar un pequeño retraso llamando a Thread.sleep() o usando CountDownLatch para dar a la operación asincrónica una oportunidad de completarse.
Como se muestra en el ejemplo siguiente, las API que devuelven también Flux siguen un patrón similar. La diferencia es que el primer callback proporcionado al método subscribe() se llama varias veces para cada elemento de datos que hay en la respuesta. Las devoluciones de llamada de error o de finalización se llaman exactamente una vez y se consideran señales de finalización. No se invocan otras devoluciones de llamada si se recibe alguna de estas señales desde el publicador.
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(
event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
Contrapresión
¿Qué ocurre cuando el origen produce los datos a una velocidad más rápida que el suscriptor puede controlar? El suscriptor puede verse abrumado por los datos, lo que puede provocar errores de falta de memoria. El suscriptor necesita una forma de volver a comunicarse con el publicador para que reduzca la velocidad cuando no la puede mantener. De forma predeterminada, cuando se invoca subscribe() en un Flux, tal como se muestra en el ejemplo anterior, el suscriptor solicita un flujo de datos ilimitado, lo que indica al publicador que envíe los datos lo antes posible. Este comportamiento no siempre es deseable, y es posible que el suscriptor tenga que controlar el ritmo de publicación mediante "contrapresión". La contrapresión permite al suscriptor tomar el control del flujo de elementos de datos. Un suscriptor solicita un número limitado de elementos de datos que puede controlar. Una vez que el suscriptor complete el procesamiento de estos elementos, puede solicitar más. Mediante el uso de la contrapresión, puede transformar un modelo de inserción para la transferencia de datos en un modelo de inserción y extracción.
En el ejemplo siguiente se muestra cómo puede controlar la velocidad a la que el consumidor de Event Hubs recibe los eventos:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.request(1); // request another event when the subscriber is ready
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Cuando el suscriptor se conecta primero al publicador, el publicador entrega al suscriptor una Subscription instancia, que administra el estado de la transferencia de datos. Este Subscription es el medio a través del cual el suscriptor puede aplicar la contrapresión llamando a request() para especificar cuántos elementos de datos más puede controlar.
Si el suscriptor solicita más de un elemento de datos cada vez que llama a onNext(), request(10) por ejemplo, el publicador envía los siguientes 10 elementos inmediatamente si están disponibles o cuando están disponibles. Estos elementos se acumulan en un búfer en el lado del suscriptor y, dado que cada llamada a onNext() solicita 10 más, la acumulación sigue creciendo hasta que el publicador se queda sin más elementos de datos para enviar o hasta que se desborda el búfer del suscriptor, lo que provoca errores de falta de memoria.
Cancelar una suscripción a un flujo reactivo
Una suscripción administra el estado de la transferencia de datos entre un publicador y un suscriptor. La suscripción permanece activa hasta que el publicador termina de transferir todos los datos al suscriptor o al suscriptor ya no está interesado en recibir datos. Puede cancelar una suscripción de dos maneras, como se muestra en los ejemplos siguientes.
En el ejemplo siguiente se cancela la suscripción mediante la eliminación del suscriptor:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
Disposable disposable = asyncClient.receive().subscribe(
partitionEvent -> {
Long num = partitionEvent.getData().getSequenceNumber()
System.out.println("Sequence number of received event: " + num);
},
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();
En el ejemplo siguiente se cancela la suscripción llamando al cancel() método en Subscription:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.cancel(); // Cancels the subscription. No further event is received.
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Conclusión
Los subprocesos son recursos caros que no se deben emplear en estar a la espera de respuestas de llamadas de servicios remotos. A medida que aumenta la adopción de arquitecturas de microservicios, la necesidad de escalar y usar recursos de forma eficaz es fundamental. Las API asincrónicas son favorables cuando hay operaciones enlazadas a la red. El SDK de Azure para Java ofrece un amplio conjunto de API para operaciones asincrónicas para ayudar a maximizar los recursos del sistema. Pruebe los clientes asincrónicos.
Para obtener más información sobre los operadores que mejor se adapten a sus tareas concretas, consulte ¿Qué operador necesito? en la Guía de referencia de Reactor 3.
Pasos siguientes
Ahora que comprende mejor los distintos conceptos de programación asincrónica, es importante aprender a iterar los resultados. Para más información sobre las mejores estrategias de iteración y detalles sobre cómo funciona la paginación, consulte Paginación e iteración en el SDK de Azure para Java.