Share via


Hubs de Eventos do Azure biblioteca de clientes do Checkpoint Store para Java – versão 1.17.0

usando blobs de armazenamento

Hubs de Eventos do Azure Repositório de Pontos de Verificação pode ser usado para armazenar pontos de verificação durante o processamento de eventos de Hubs de Eventos do Azure. Esse pacote usa Blobs de Armazenamento como um repositório persistente para manter pontos de verificação e informações de propriedade de partição. O BlobCheckpointStore fornecido neste pacote pode ser conectado ao EventProcessor.

Código-fonte | Documentação de referência da API | Documentação do produto | Exemplos

Introdução

Pré-requisitos

Incluir o pacote

Incluir o arquivo da BOM

Inclua o azure-sdk-bom em seu projeto para assumir a dependência da versão ga (disponibilidade geral) da biblioteca. No trecho a seguir, substitua o espaço reservado {bom_version_to_target} pelo número de versão. Para saber mais sobre a BOM, consulte o BOM README do SDK do AZURE.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

e inclua a dependência direta na seção dependências sem a marca de versão, conforme mostrado abaixo.

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
  </dependency>
</dependencies>

Incluir dependência direta

Se você quiser assumir a dependência de uma versão específica da biblioteca que não está presente na BOM, adicione a dependência direta ao seu projeto da seguinte maneira.

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
    <version>1.17.0</version>
</dependency>

Autenticar o cliente de contêiner de armazenamento

Para criar uma instância do , um ContainerAsyncClient deve primeiro ser criado com o token SAS apropriado com acesso de gravação e cadeia de BlobCheckpointStoreconexão. Para tornar isso possível, você precisará da cadeia de caracteres SAS da conta (assinatura de acesso compartilhado) da conta de armazenamento. Saiba mais em Token SAS.

Principais conceitos

Definindo o ponto de verificação

Ponto de verificação é um processo pelo qual os leitores marcam ou confirmam sua posição em uma sequência de eventos da partição. O ponto de verificação é responsabilidade do consumidor e ocorre em uma base por partição dentro de um grupo de consumidores. Essa responsabilidade significa que, para cada grupo de consumidores, cada leitor de partição deve manter o controle da sua posição atual no fluxo de eventos e pode informar o serviço quando considerar o fluxo de dados concluído. Se um leitor se desconecta de uma partição, ao se reconectar, ele começa a ler no ponto de verificação que foi anteriormente enviado pelo último leitor dessa partição nesse grupo de consumidores. Quando o leitor se conecta, ele passa esse deslocamento para o hub de eventos para especificar o local para começar a ler. Assim, você pode usar o ponto de verificação para marcar eventos como "concluídos" por aplicativos de downstream e oferecer resiliência caso ocorra um failover entre leitores em execução em máquinas diferentes. É possível retornar aos dados mais antigos, especificando um deslocamento inferior desse processo de ponto de verificação. Por meio desse mecanismo, o ponto de verificação permite resiliência de failover e reprodução de fluxo de eventos.

Números de sequência de & deslocamentos

Ambos os números de sequência de deslocamento & referem-se à posição de um evento dentro de uma partição. Você pode pensar neles como um cursor do lado do cliente. O deslocamento é uma numeração em bytes do evento. O número de deslocamento/sequência permite que um consumidor de evento (leitor) especifique um ponto no fluxo de eventos do qual deseja começar a ler eventos. Você pode especificar o carimbo de data/hora de modo que receba eventos que foram enfileirados somente após o carimbo de data/hora fornecido. Os consumidores são responsáveis por armazenar seus próprios valores de deslocamento fora do serviço hubs de eventos. Dentro de uma partição, cada evento inclui um deslocamento, um número de sequência e o carimbo de data/hora de quando ele foi enfileirado.

Exemplos

Criar uma instância do contêiner de armazenamento com o token SAS

BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
    .connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
    .containerName("<CONTAINER_NAME>")
    .sasToken("<SAS_TOKEN>")
    .buildAsyncClient();

Consumir eventos usando um cliente do processador de eventos

Para consumir eventos para todas as partições de um Hub de Eventos, você criará um EventProcessorClient para um grupo de consumidores específico. Quando um Hub de Eventos é criado, ele fornece um grupo de consumidores padrão que pode ser usado para começar.

O EventProcessorClient delegará o processamento de eventos a uma função de retorno de chamada que você fornecer, permitindo que você se concentre na lógica necessária para fornecer valor enquanto o processador é responsável por gerenciar as operações de consumidor subjacentes.

Em nosso exemplo, nos concentraremos na criação do EventProcessor, usaremos o BlobCheckpointStoree uma função de retorno de chamada simples para processar os eventos recebidos dos Hubs de Eventos, gravar no console e atualizar o ponto de verificação no Armazenamento de Blobs após cada evento.

BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
    .connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
    .containerName("<CONTAINER_NAME>")
    .sasToken("<SAS_TOKEN>")
    .buildAsyncClient();

EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
    .consumerGroup("<< CONSUMER GROUP NAME >>")
    .connectionString("<< EVENT HUB CONNECTION STRING >>")
    .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
    .processEvent(eventContext -> {
        System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
            + "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
    })
    .processError(errorContext -> {
        System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
    })
    .buildEventProcessorClient();

// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();

// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);

// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();

Solução de problemas

Habilitar o log do cliente

O SDK do Azure para Java oferece uma história de log consistente para ajudar a solucionar problemas de erros do aplicativo e agilizar a resolução. Os logs produzidos capturam o fluxo de um aplicativo antes que acessem o estado do terminal para ajudar a localizar o problema raiz. Exiba o wiki de log para obter diretrizes sobre como habilitar o registro em log.

Biblioteca SSL padrão

Todas as bibliotecas de cliente, por padrão, usam a biblioteca SSL com o uso do Tomcat nativo para habilitar o desempenho de nível nativo para operações SSL. A biblioteca SSL é um uber jar que contém bibliotecas nativas para Linux/macOS/Windows e fornece melhor desempenho em comparação com a implementação SSL padrão no JDK. Para obter mais informações, incluindo como reduzir o tamanho da dependência, consulte a seção ajuste de desempenho da wiki.

Próximas etapas

Comece explorando os exemplos aqui.

Participante

Se você quiser se tornar um contribuidor ativo para este projeto, consulte nossas Diretrizes de Contribuição para obter mais informações.

Impressões