Programação assíncrona no SDK do Azure para Java

Este artigo descreve o modelo de programação assíncrona no SDK do Azure para Java.

Inicialmente, o SDK do Azure continha apenas APIs assíncronas sem bloqueio para interagir com serviços do Azure. Essas APIs permitem que você use o SDK do Azure para criar aplicativos escalonáveis que usam recursos do sistema com eficiência. No entanto, o SDK do Azure para Java também contém clientes síncronos para atender a um público mais amplo e tornar nossas bibliotecas de cliente mais acessíveis aos usuários que não estão familiarizados com a programação assíncrona. (Veja Acessível nas diretrizes de design do SDK do Azure.) Como tal, todas as bibliotecas de cliente Java no SDK do Azure para Java oferecem clientes assíncronos e síncronos. Porém, é recomendável usar os clientes assíncronos para sistemas de produção para maximizar o uso de recursos do sistema.

Fluxos reativos

Se você examinar a seção Clientes de serviço assíncrono nas Diretrizes de design do SDK do Azure para Java, observará que, em vez de usar CompletableFuture fornecido pelo Java 8, nossas APIs assíncronas usarão tipos reativos. Por que escolhemos tipos reativos a tipos que estão disponíveis nativamente no JDK?

O Java 8 introduziu recursos como Streams, Lambdas e CompletableFuture. Esses recursos fornecem muitas funcionalidades, mas têm algumas limitações.

CompletableFuture oferece funcionalidades sem bloqueio baseadas em retorno de chamada, enquanto a interface CompletionStage possibilita uma composição fácil de uma série de operações assíncronas. Os lambdas tornam essas APIs baseadas em push mais legíveis. Os fluxos fornecem operações de estilo funcional para lidar com uma coleção de elementos de dados. No entanto, os fluxos são síncronos e não podem ser reutilizados. CompletableFuture permite fazer uma só solicitação, fornece suporte para um retorno de chamada e espera uma resposta. No entanto, muitos serviços de nuvem exigem a capacidade de transmitir dados, por exemplo, Hubs de Eventos.

Os fluxos reativos podem ajudar a superar essas limitações transmitindo elementos de uma origem para um assinante. Quando um assinante solicita dados de uma origem, a origem envia qualquer número de resultados de volta. Ela não precisa enviá-los todos de uma vez. A transferência ocorre ao longo de um período sempre que a origem tem dados a serem enviados.

Nesse modelo, o assinante registra manipuladores de eventos para processar dados quando eles chegam. Essas interações baseadas em push notificam o assinante por meio de sinais distintos:

  • Uma chamada onSubscribe() indica que a transferência de dados está prestes a começar.
  • Uma chamada onError() indica que houve um erro, o que também marca o fim da transferência de dados.
  • Uma chamada onComplete() indica a conclusão bem-sucedida da transferência de dados.

Ao contrário dos fluxos Java, fluxos reativos tratam erros como eventos de primeira classe. Fluxos reativos têm um canal dedicado para a origem para comunicar todos os erros ao assinante. Além disso, os fluxos reativos permitem que o assinante negocie a taxa de transferência de dados para transformar esses fluxos em um modelo push-pull.

A especificação Fluxos Reativos fornece um padrão de como a transferência de dados deve ocorrer. Em um alto nível, a especificação define as quatro interfaces a seguir e especifica regras de como essas interfaces devem ser implementadas.

  • Editor é a origem de um fluxo de dados.
  • Assinante é o consumidor de um fluxo de dados.
  • Assinatura gerencia o estado de transferência de dados entre um editor e um assinante.
  • Processador é um editor e um assinante.

Há algumas bibliotecas Java conhecidas que fornecem implementações dessa especificação, como RxJava, Akka Streams, Vert.x e Project Reactor.

O SDK do Azure para Java adotou a Project Reactor para oferecer suas APIs assíncronas. O fator principal que determina essa decisão é proporcionar uma integração fácil ao Spring Webflux, que também usa a Project Reactor. Outro fator que influenciou na escolha da Project Reactor à RxJava foi o Project Reactor usar Java 8, enquanto a RxJava naquele momento ainda usava Java 7. A Project Reactor também oferece um conjunto avançado de operadores que podem ser compostos e permitem escrever código declarativo para a criação de pipelines de processamento de dados. Outro dado interessante sobre a Project Reactor é que ela tem adaptadores para converter tipos Project Reactor em outros tipos de implementação populares.

Comparação de APIs de operações síncronas e assíncronas

Discutimos os clientes síncronos e as opções para clientes assíncronos. A seguinte tabela resume como são as APIs usando essas opções:

Tipo de API Sem valor Um único valor Vários valores
Java padrão – APIs síncronas void T Iterable<T>
Java padrão – APIs assíncronas CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Interfaces de fluxos reativos Publisher<Void> Publisher<T> Publisher<T>
Implementação da Project Reactor de fluxos reativos Mono<Void> Mono<T> Flux<T>

Para fornecer informações completas, vale mencionar que o Java 9 introduziu a classe Flow que inclui as quatro interfaces de fluxos reativos. No entanto, essa classe não inclui nenhuma implementação.

Usar APIs assíncronas no SDK do Azure para Java

A especificação de fluxos reativos não diferencia entre os tipos de publicadores. Na especificação de fluxos reativos, os editores simplesmente produzem zero ou mais elementos de dados. Em muitos casos, há uma distinção útil entre um editor que produz no máximo um elemento de dados em comparação a um que produz zero ou mais. Em APIs baseadas em nuvem, essa distinção indica se uma solicitação retorna uma resposta de valor único ou uma coleção. A Project Reactor fornece dois tipos para fazer essa distinção: Mono e Flux. Uma API que retorna um Mono conterá uma resposta com no máximo um valor e uma API que retorna um Flux conterá uma resposta com zero ou mais valores.

Por exemplo, suponha que você use um ConfigurationAsyncClient para recuperar uma configuração armazenada usando o serviço de Configuração de Aplicativos do Azure. (Para obter mais informações, confira O que é a Configuração de Aplicativos do Azure?.)

Se você criar um ConfigurationAsyncClient e chamar getConfigurationSetting() no cliente, ele retornará um Mono, o que indica que a resposta contém um só valor. No entanto, chamar esse método sozinho não faz nada. O cliente ainda não fez uma solicitação para o serviço de Configuração de Aplicativos do Azure. Nessa fase, o Mono<ConfigurationSetting> retornado por essa API é apenas um "assembly" do pipeline de processamento de dados. Isso significa que a configuração necessária para o consumo dos dados foi concluída. Para realmente disparar a transferência de dados (ou seja, fazer a solicitação para o serviço e obter a resposta), você deve assinar o Mono retornado. Portanto, ao lidar com esses fluxos reativos, você deve se lembrar de chamar subscribe(), pois nada acontecerá até que você faça isso.

O exemplo a seguir mostra como assinar o Mono e imprimir o valor de configuração no console.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Observe que, depois de chamar getConfigurationSetting() no cliente, o código de exemplo assina o resultado e fornece três lambdas separados. O primeiro lambda consome dados recebidos do serviço, que são disparados após uma resposta bem-sucedida. O segundo lambda será disparado se houver um erro ao recuperar a configuração. O terceiro lambda é invocado quando o fluxo de dados é concluído, o que significa que não é esperado mais nenhum elemento de dados desse fluxo.

Observação

Assim como acontece com toda a programação assíncrona, depois que a assinatura é criada, a execução prossegue como de costume. Se não houver nada para manter o programa ativo e em execução, ele poderá terminar antes do fim da operação assíncrona. O thread principal que chamou subscribe() não aguardará até que você faça a chamada de rede para a Configuração de Aplicativos do Azure e receba uma resposta. Em sistemas de produção, você pode continuar para processar outra coisa, porém, neste exemplo, você pode adicionar um pequeno atraso chamando Thread.sleep() ou usar um CountDownLatch para dar à operação assíncrona a oportunidade de ser concluída.

Conforme mostrado no exemplo a seguir, as APIs que retornam um Flux também seguem um padrão semelhante. A diferença é que o primeiro retorno de chamada fornecido ao método subscribe() é chamado várias vezes para cada elemento de dados na resposta. O erro ou os retornos de chamada de conclusão são chamados exatamente uma vez e são considerados como sinais de terminal. Nenhum outro retorno de chamada será invocado se um desses sinais for recebido do editor.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Contrapressão

O que acontece quando a origem está produzindo os dados com mais rapidez do que assinante pode processar? O assinante pode sofrer uma sobrecarga de dados, o que pode levar a erros de memória insuficiente. O assinante precisa de uma forma de se comunicar de volta com o editor para reduzir o ritmo quando não conseguir acompanhar. Por padrão, quando você chama subscribe() em um Flux conforme mostrado no exemplo acima, o assinante está solicitando um fluxo de dados não associado, indicando ao editor para enviar os dados o mais rápido possível. Esse comportamento nem sempre é desejável, e o assinante pode precisar controlar a taxa de publicação por meio de "contrapressão". A contrapressão permite que o assinante assuma o controle do fluxo de elementos de dados. Um assinante solicitará um número limitado de elementos de dados que ele consiga processar. Depois que o assinante tiver concluído o processamento desses elementos, ele poderá solicitar mais. Ao usar a contrapressão, você pode transformar um modelo push para transferência de dados em um modelo de push-pull.

O seguinte exemplo mostra como você pode controlar a taxa à qual os eventos são recebidos pelo consumidor dos Hubs de Eventos:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Quando o assinante "conecta-se" pela primeira vez ao editor, o editor passa ao assinante uma instância de Subscription, que gerencia o estado da transferência de dados. É com essa Subscription que o assinante pode aplicar a contrapressão chamando request() para especificar quantos elementos de dados ele consegue processar.

Se o assinante solicitar mais de um elemento de dados sempre que chamar onNext(), request(10) por exemplo, o editor enviará os próximos dez elementos imediatamente se eles estiverem disponíveis ou quando ficarem disponíveis. Esses elementos se acumulam em um buffer na extremidade do assinante e, como cada chamada onNext() solicitará mais 10, a lista de pendências continuará crescendo até o editor não ter mais elementos de dados para enviar ou ocorrer um estouro de buffer no assinante, resultando em erros de memória insuficiente.

Cancelar uma assinatura

Uma assinatura gerencia o estado de transferência de dados entre um editor e um assinante. A assinatura está ativa até que o editor tenha concluído a transferência de todos os dados para o assinante ou o assinante não esteja mais interessado em receber dados. Há algumas maneiras de cancelar uma assinatura, conforme mostrado abaixo.

O seguinte exemplo cancela a assinatura descartando o assinante:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

O seguinte exemplo cancela a assinatura chamando o método cancel() em Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Conclusão

Os threads são recursos dispendiosos que você não deve desperdiçar aguardando respostas de chamadas de serviço remoto. À medida que a adoção de arquiteturas de microsserviços aumenta, a necessidade de escalar e usar recursos com eficiência torna-se vital. As APIs assíncronas são favoráveis quando há operações relacionadas à rede. O SDK do Azure para Java oferece um amplo conjunto de APIs para operações assíncronas para ajudar a maximizar os recursos do sistema. É altamente recomendável que você experimente nossos clientes assíncronos.

Para obter mais informações sobre os operadores ideais para as suas tarefas específicas, confira De qual operador eu preciso? no Guia de referência do Reactor 3.

Próximas etapas

Agora que você compreende melhor os vários conceitos de programação assíncrona, é importante aprender a iterar os resultados. Para obter mais informações sobre as melhores estratégias de iteração e detalhes de como a paginação funciona, confira paginação e iteração no SDK do Azure para Java.