Hubs de Eventos do Azure biblioteca de clientes para Java – versão 5.16.0

Hubs de Eventos do Azure é um serviço de publicação-assinatura altamente escalonável que pode ingerir milhões de eventos por segundo e transmiti-los para vários consumidores. Isso permite processar e analisar as grandes quantidades de dados produzidos por seus dispositivos e aplicativos conectados. Depois que os Hubs de Eventos coletarem os dados, você poderá recuperá-los, transformá-los e armazená-los usando qualquer provedor de análise em tempo real ou com adaptadores de armazenamento/lote. Se você quiser saber mais sobre Hubs de Eventos do Azure, convém examinar: O que são os Hubs de Eventos?

A biblioteca de clientes dos Hubs de Eventos do Azure permite publicar e consumir eventos dos Hubs de Eventos do Azure e pode ser usada para:

  • Emitir telemetria sobre seu aplicativo para fins de diagnóstico e business intelligence.
  • Publicar fatos sobre o estado do seu aplicativo, que as partes interessadas podem observar e usar como gatilho para tomar medidas.
  • Observar operações interessantes e interações que ocorrem na sua empresa ou em outro ecossistema, permitindo que sistemas flexíveis interajam sem a necessidade de vinculá-los.
  • Receber eventos de um ou mais editores, transformá-los para atender melhor às necessidades do ecossistema e publicar os eventos transformados em um novo fluxo para que os consumidores observem.

Código-fonte | Documentação | de referência da APIDocumentação do produto | Amostras | Solucionando problemas

Sumário

Introdução

Pré-requisitos

Incluir o pacote

Incluir o arquivo da BOM

Inclua o azure-sdk-bom em seu projeto para assumir a dependência da versão ga (disponibilidade geral) da biblioteca. No trecho a seguir, substitua o espaço reservado {bom_version_to_target} pelo número de versão. Para saber mais sobre o BOM, consulte o BOM README do SDK do AZURE.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

e inclua a dependência direta na seção dependências sem a marca de versão, conforme mostrado abaixo.

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
  </dependency>
</dependencies>

Incluir dependência direta

Se você quiser assumir a dependência de uma versão específica da biblioteca que não está presente no BOM, adicione a dependência direta ao seu projeto da seguinte maneira.

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.16.0</version>
</dependency>

Autenticar o cliente

Para que a biblioteca de clientes dos Hubs de Eventos interaja com um Hub de Eventos, ele precisará entender como se conectar e autorizar com ele.

Criar um produtor do Hub de Eventos usando uma cadeia de conexão

O meio mais fácil para fazer isso é usar uma cadeia de conexão, que é criada automaticamente ao criar um namespace dos Hubs de Eventos. Se você não estiver familiarizado com políticas de acesso compartilhado no Azure, convém seguir o guia passo a passo para obter uma cadeia de conexão dos Hubs de Eventos.

O produtor do Hub de Eventos assíncrono e síncrono e os clientes consumidores podem ser criados usando EventHubClientBuilder. Invocar build*Client() cria um produtor ou consumidor síncrono enquanto build*AsyncClient() cria sua contraparte assíncrona.

O snippet abaixo cria um produtor do Hub de Eventos síncrono.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

Criar um cliente do Hub de Eventos usando plataforma de identidade da Microsoft (antigo Azure Active Directory)

O SDK do Azure para Java dá suporte a um pacote de Identidade do Azure, facilitando a obtenção de credenciais de plataforma de identidade da Microsoft. Primeiro, adicione o pacote:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-identity</artifactId>
    <version>1.10.1</version>
</dependency>

Todas as maneiras implementadas de solicitar uma credencial podem ser encontradas no com.azure.identity.credential pacote. O exemplo a seguir mostra como usar um segredo do cliente do aplicativo AAD (Azure Active Directory) para autorizar com Hubs de Eventos do Azure.

Autorizando com DefaultAzureCredential

A autorização é mais fácil usando DefaultAzureCredential. Ele encontra a melhor credencial a ser usada em seu ambiente de execução. Para obter mais informações sobre como usar a autorização do Azure Active Directory com hubs de eventos, consulte a documentação associada.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

Principais conceitos

  • Um produtor do Hub de Eventos é uma fonte de dados de telemetria, diagnóstico informações, logs de uso ou outros dados de log, como parte de uma solução de dispositivo inserido, um aplicativo de dispositivo móvel, um título de jogo em execução em um console ou outro dispositivo, alguma solução de negócios baseada em cliente ou servidor ou um site.

  • Um consumidor do Hub de Eventos pega essas informações no Hub de Eventos e as processa. O processamento pode envolver agregação, computação complexa e filtragem. O processamento também pode envolver a distribuição ou o armazenamento das informações de maneira bruta ou transformada. Os consumidores do Hub de Eventos geralmente são partes de infraestrutura de plataforma robustas e de alta escala com recursos de análise integrados, como Azure Stream Analytics, Apache Spark ou Apache Storm.

  • Uma partição é uma sequência ordenada de eventos que é mantida em um Hub de Eventos. Os Hubs de Eventos do Azure fornecem streaming de mensagens por meio de um padrão de consumidor particionado no qual cada consumidor lê apenas um subconjunto específico, ou partição, do fluxo de mensagens. À medida que novos eventos chegam, eles são adicionados ao final dessa sequência. O número de partições é especificado no momento em que um Hub de Eventos é criado e não pode ser alterado.

  • Um grupo de consumidores é uma exibição de um Hub de Eventos inteiro. Os grupos de consumidores habilitam vários aplicativos de consumo para que cada um tenha um modo de exibição do fluxo de evento separado e para ler o fluxo de forma independente em seu próprio ritmo e com seus próprio deslocamentos. Pode haver no máximo cinco leitores simultâneos em uma partição por grupo de consumidores; no entanto, é recomendável que haja apenas um consumidor ativo para um determinado emparelhamento de partição e grupo de consumidores. Cada leitor ativo recebe os eventos de sua partição; se houver vários leitores na mesma partição, eles receberão eventos duplicados.

Para obter mais conceitos e discussões mais profundas, consulte: Recursos dos Hubs de Eventos. Além disso, os conceitos do AMQP estão bem documentados no PROTOCOLO AMQP (Advanced Messaging Queuing Protocol) versão 1.0.

Exemplos

Envio de eventos para um Hub de Eventos

Para publicar eventos, você precisará criar um assíncrono EventHubProducerAsyncClient ou um síncrono EventHubProducerClient. Cada produtor pode enviar eventos para uma partição específica ou permitir que o serviço dos Hubs de Eventos decida em quais eventos de partição devem ser publicados. É recomendável usar o roteamento automático quando a publicação de eventos precisa estar altamente disponível ou quando os dados de evento devem ser distribuídos uniformemente entre as partições.

Criar um produtor do Hub de Eventos e publicar eventos

Os desenvolvedores podem criar um produtor usando EventHubClientBuilder e chamando buildProducer*Client(). Especificar CreateBatchOptions.setPartitionId(String) enviará eventos para uma partição específica. Se partitionId não for especificado, os eventos serão roteados automaticamente para uma partição. Especificar CreateBatchOptions.setPartitionKey(String) informará o serviço de Hubs de Eventos para hash dos eventos e os enviará para a mesma partição.

O snippet abaixo cria um produtor síncrono e envia eventos para qualquer partição, permitindo que o serviço dos Hubs de Eventos encaminhe o evento para uma partição disponível.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
EventDataBatch eventDataBatch = producer.createBatch();

for (EventData eventData : allEvents) {
    if (!eventDataBatch.tryAdd(eventData)) {
        producer.send(eventDataBatch);
        eventDataBatch = producer.createBatch();

        // Try to add that event that couldn't fit before.
        if (!eventDataBatch.tryAdd(eventData)) {
            throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                + eventDataBatch.getMaxSizeInBytes());
        }
    }
}

// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
    producer.send(eventDataBatch);
}

// Clients are expected to be long-lived objects.
// Dispose of the producer to close any underlying resources when we are finished with it.
producer.close();

Observe que EventDataBatch.tryAdd(EventData) não é thread-safe. Certifique-se de sincronizar o acesso ao método ao usar vários threads para adicionar eventos.

Publicar eventos usando o identificador de partição

Muitas operações do Hub de Eventos ocorrem dentro do escopo de uma partição específica. Qualquer cliente pode chamar getPartitionIds() ou getEventHubProperties() para obter as IDs de partição e os metadados sobre em sua instância do Hub de Eventos.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

// Creating a batch with partitionId set will route all events in that batch to partition `0`.
CreateBatchOptions options = new CreateBatchOptions().setPartitionId("0");
EventDataBatch batch = producer.createBatch(options);

// Add events to batch and when you want to send the batch, send it using the producer.
producer.send(batch);

Publicar eventos usando a chave de partição

Quando um conjunto de eventos não está associado a nenhuma partição específica, pode ser desejável solicitar que o serviço de Hubs de Eventos mantenha diferentes eventos ou lotes de eventos juntos na mesma partição. Isso pode ser feito definindo um partition key ao publicar os eventos. No cenário abaixo, todos os eventos estão relacionados a cidades, portanto, eles são enviados com a chave de partição definida como "cidades".

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
    new EventData("New York"));

SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
producer.send(events, sendOptions);

Consumir eventos de uma partição do Hub de Eventos

Para consumir eventos, crie um EventHubConsumerAsyncClient ou EventHubConsumerClient para um grupo de consumidores específico. Além disso, um consumidor precisa especificar onde no fluxo de eventos para começar a receber eventos.

Consumir eventos com EventHubConsumerAsyncClient

No snippet abaixo, criamos um consumidor assíncrono que recebe eventos de partitionId e escuta apenas os eventos mais recentes que são enviados por push para a partição. Os desenvolvedores podem começar a receber eventos de várias partições usando o mesmo EventHubConsumerAsyncClient chamando receiveFromPartition(String, EventPosition) com outra ID de partição.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        new DefaultAzureCredentialBuilder().build())
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildAsyncConsumerClient();

// Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
String partitionId = "0";
EventPosition startingPosition = EventPosition.latest();

// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
//
// NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
// operation.  If the program ends after this, or the class is immediately disposed, no events will be
// received.
Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
    .subscribe(partitionEvent -> {
        PartitionContext partitionContext = partitionEvent.getPartitionContext();
        EventData event = partitionEvent.getData();

        System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
        System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
    }, error -> {
        // This is a terminal signal.  No more events will be received from the same Flux object.
        System.err.print("An error occurred:" + error);
    }, () -> {
        // This is a terminal signal.  No more events will be received from the same Flux object.
        System.out.print("Stream has ended.");
    });

Consumir eventos com EventHubConsumerClient

Os desenvolvedores podem criar um consumidor síncrono que retorna eventos em lotes usando um EventHubConsumerClient. No snippet abaixo, um consumidor é criado que começa a ler eventos desde o início do fluxo de eventos da partição.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubConsumerClient consumer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildConsumerClient();

Instant twelveHoursAgo = Instant.now().minus(Duration.ofHours(12));
EventPosition startingPosition = EventPosition.fromEnqueuedTime(twelveHoursAgo);
String partitionId = "0";

// Reads events from partition '0' and returns the first 100 received or until the 30 seconds has elapsed.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 100,
    startingPosition, Duration.ofSeconds(30));

Long lastSequenceNumber = -1L;
for (PartitionEvent partitionEvent : events) {
    // For each event, perform some sort of processing.
    System.out.print("Event received: " + partitionEvent.getData().getSequenceNumber());
    lastSequenceNumber = partitionEvent.getData().getSequenceNumber();
}

// Figure out what the next EventPosition to receive from is based on last event we processed in the stream.
// If lastSequenceNumber is -1L, then we didn't see any events the first time we fetched events from the
// partition.
if (lastSequenceNumber != -1L) {
    EventPosition nextPosition = EventPosition.fromSequenceNumber(lastSequenceNumber, false);

    // Gets the next set of events from partition '0' to consume and process.
    IterableStream<PartitionEvent> nextEvents = consumer.receiveFromPartition(partitionId, 100,
        nextPosition, Duration.ofSeconds(30));
}

Consumir eventos usando um EventProcessorClient

Para consumir eventos para todas as partições de um Hub de Eventos, você pode criar um EventProcessorClient para um grupo de consumidores específico.

O EventProcessorClient delegará o processamento de eventos a uma função de retorno de chamada que você fornecer, permitindo que você se concentre na lógica necessária para fornecer valor enquanto o processador é responsável por gerenciar as operações de consumidor subjacentes.

Em nosso exemplo, vamos nos concentrar na criação do EventProcessorClient, usar o SampleCheckpointStore disponível em exemplos e uma função de retorno de chamada que processa eventos recebidos do Hub de Eventos e grava no console. Para aplicativos de produção, é recomendável usar um repositório durável como o Repositório de Pontos de Verificação com Blobs de Armazenamento do Azure.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
    .consumerGroup("<< CONSUMER GROUP NAME >>")
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .checkpointStore(new SampleCheckpointStore())
    .processEvent(eventContext -> {
        System.out.printf("Partition id = %s and sequence number of event = %s%n",
            eventContext.getPartitionContext().getPartitionId(),
            eventContext.getEventData().getSequenceNumber());
    })
    .processError(errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    })
    .buildEventProcessorClient();

Solução de problemas

Confira TROUBLESHOOTING.md.

Próximas etapas

Além dos discutidos, a biblioteca de clientes Hubs de Eventos do Azure oferece suporte para muitos outros cenários a fim de aproveitar o conjunto completo de recursos do serviço Hubs de Eventos do Azure. Para explorar alguns desses cenários, marcar os exemplos LEIAME.

Participante

Se você quiser se tornar um contribuidor ativo para este projeto, consulte nossas Diretrizes de Contribuição para obter mais informações.

Impressões