Compartir a través de


biblioteca cliente de Azure Event Hubs para Java: versión 5.16.0

Azure Event Hubs es un servicio de publicación y suscripción altamente escalable que puede ingerir millones de eventos por segundo y transmitirlos a varios consumidores. Esto le permite procesar y analizar las grandes cantidades de datos generados por los dispositivos y aplicaciones conectados. Una vez que Event Hubs ha recopilado los datos, puede recuperarlos, transformarlos y almacenarlos mediante cualquier proveedor de análisis en tiempo real o con adaptadores de procesamiento por lotes o almacenamiento. Si desea obtener más información sobre Azure Event Hubs, puede que desee revisar: ¿Qué es Event Hubs?

La biblioteca cliente de Azure Event Hubs permite publicar y consumir eventos de Azure Event Hubs y se puede usar para lo siguiente:

  • Emitir telemetría sobre la aplicación con fines de diagnóstico e inteligencia empresarial.
  • Publicar datos sobre el estado de la aplicación que las partes interesadas pueden consultar y usar como desencadenadores para tomar medidas.
  • Observar las operaciones e interacciones interesantes que se están produciendo en el negocio u otro ecosistema, lo que permite que los sistemas de acoplamiento flexible interactúen sin necesidad de enlazarlos.
  • Recibir eventos de uno o varios editores, transformarlos para satisfacer mejor las necesidades del ecosistema y, a continuación, publicar los eventos transformados en un nuevo flujo para que los consumidores los observen.

Código | fuenteDocumentación | de referencia de API | Documentación del productoMuestras | Solución de problemas

Tabla de contenido

Introducción

Requisitos previos

Inclusión del paquete

Inclusión del archivo BOM

Incluya azure-sdk-bom en el proyecto para depender de la versión de disponibilidad general (GA) de la biblioteca. En el fragmento de código siguiente, reemplace el marcador de posición {bom_version_to_target} por el número de versión. Para más información sobre la lista de materiales, consulte el archivo Léame bom del SDK de AZURE.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

y, a continuación, incluya la dependencia directa en la sección de dependencias sin la etiqueta de versión, como se muestra a continuación.

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
  </dependency>
</dependencies>

Inclusión de dependencias directas

Si desea depender de una versión determinada de la biblioteca que no está presente en la lista de materiales, agregue la dependencia directa al proyecto como se indica a continuación.

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.16.0</version>
</dependency>

Autenticar el cliente

Para que la biblioteca cliente de Event Hubs interactúe con un centro de eventos, deberá comprender cómo conectarse y autorizarla.

Creación de un productor del centro de eventos mediante una cadena de conexión

El medio más sencillo para hacerlo es usar una cadena de conexión, que se crea automáticamente al crear un espacio de nombres de Event Hubs. Si no está familiarizado con las directivas de acceso compartido en Azure, puede seguir la guía paso a paso para obtener una cadena de conexión de Event Hubs.

Tanto el productor de Event Hubs como los clientes de consumidor y asincrónicos y asincrónicos se pueden crear mediante EventHubClientBuilder. La invocación build*Client() crea un productor o consumidor sincrónico mientras build*AsyncClient() crea su homólogo asincrónico.

El fragmento de código siguiente crea un productor sincrónico del centro de eventos.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

Creación de un cliente del centro de eventos mediante Plataforma de identidad de Microsoft (anteriormente Azure Active Directory)

Azure SDK para Java admite un paquete de identidad de Azure, lo que facilita la obtención de credenciales de Plataforma de identidad de Microsoft. En primer lugar, agregue el paquete:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-identity</artifactId>
    <version>1.10.1</version>
</dependency>

Todas las formas implementadas de solicitar una credencial se pueden encontrar en el com.azure.identity.credential paquete. En el ejemplo siguiente se muestra cómo usar un secreto de cliente de aplicación de Azure Active Directory (AAD) para autorizar con Azure Event Hubs.

Autorización con DefaultAzureCredential

La autorización es más fácil mediante DefaultAzureCredential. Encuentra la mejor credencial para usar en su entorno en ejecución. Para más información sobre el uso de la autorización de Azure Active Directory con Event Hubs, consulte la documentación asociada.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

Conceptos clave

  • Un productor del centro de eventos es un origen de datos de telemetría, información de diagnóstico, registros de uso u otros datos de registro, como parte de una solución de dispositivo insertada, una aplicación de dispositivo móvil, un título de juego que se ejecuta en una consola u otro dispositivo, alguna solución empresarial basada en cliente o servidor o un sitio web.

  • Un consumidor de Event Hubs recoge dicha información del centro de eventos y la procesa. El procesamiento puede implicar la agregación, el cálculo complejo y el filtrado. El procesamiento también puede implicar la distribución o el almacenamiento de la información sin procesar o transformada. Los consumidores de centros de eventos suelen ser componentes sólidos y a gran escala de la infraestructura de la plataforma con funcionalidades de análisis integradas, como Azure Stream Analytics, Apache Spark o Apache Storm.

  • Una partición es una secuencia ordenada de eventos que se mantiene en un centro de eventos. Azure Event Hubs proporciona streaming de mensajes a través de un patrón de consumidor con particiones en el que cada consumidor solo lee un subconjunto específico, o partición, de la secuencia de mensajes. A medida que llegan eventos más recientes, se agregan al final de esta secuencia. El número de particiones se especifica en el momento en que se crea un centro de eventos y no se puede modificar.

  • Un grupo de consumidores es una vista de un centro de eventos completo. Los grupos de consumidores habilitan a varias aplicaciones de consumo para que cada una de ellas tenga una vista independiente de la secuencia de eventos, y para que lea la secuencia de manera independiente a su propio ritmo y desde su propia ubicación. Puede haber como máximo cinco lectores simultáneos en una partición por grupo de consumidores; sin embargo, se recomienda que solo haya un consumidor activo para un emparejamiento determinado de partición y grupo de consumidores. Cada lector activo recibe los eventos de su partición; si hay varios lectores en la misma partición, recibirán eventos duplicados.

Para obtener más conceptos y una explicación más detallada, consulte : Características de Event Hubs. Además, los conceptos de AMQP están bien documentados en OASIS Advanced Messaging Queuing Protocol (AMQP) Versión 1.0.

Ejemplos

Publicación de eventos en un centro de eventos

Para publicar eventos, deberá crear un asincrónico o sincrónico EventHubProducerAsyncClientEventHubProducerClient. Cada productor puede enviar eventos a una partición específica o permitir que el servicio Event Hubs decida en qué eventos de partición se deben publicar. Se recomienda usar el enrutamiento automático cuando la publicación de eventos debe ser de alta disponibilidad o cuando los datos del evento se deben distribuir uniformemente entre las particiones.

Creación de un productor del centro de eventos y publicación de eventos

Los desarrolladores pueden crear un productor mediante EventHubClientBuilder y llamar a buildProducer*Client(). Especificar CreateBatchOptions.setPartitionId(String) enviará eventos a una partición específica. Si partitionId no se especifica , los eventos se enrutan automáticamente a una partición. Al especificar CreateBatchOptions.setPartitionKey(String) , se indicará al servicio Event Hubs que aplica un algoritmo hash a los eventos y los envía a la misma partición.

El fragmento de código siguiente crea un productor sincrónico y envía eventos a cualquier partición, lo que permite al servicio Event Hubs enrutar el evento a una partición disponible.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
EventDataBatch eventDataBatch = producer.createBatch();

for (EventData eventData : allEvents) {
    if (!eventDataBatch.tryAdd(eventData)) {
        producer.send(eventDataBatch);
        eventDataBatch = producer.createBatch();

        // Try to add that event that couldn't fit before.
        if (!eventDataBatch.tryAdd(eventData)) {
            throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                + eventDataBatch.getMaxSizeInBytes());
        }
    }
}

// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
    producer.send(eventDataBatch);
}

// Clients are expected to be long-lived objects.
// Dispose of the producer to close any underlying resources when we are finished with it.
producer.close();

Tenga en cuenta que EventDataBatch.tryAdd(EventData) no es seguro para subprocesos. Asegúrese de sincronizar el acceso al método al usar varios subprocesos para agregar eventos.

Publicación de eventos mediante el identificador de partición

Muchas operaciones del centro de eventos tienen lugar dentro del ámbito de una partición específica. Cualquier cliente puede llamar getPartitionIds() a o getEventHubProperties() para obtener los identificadores de partición y los metadatos de en su instancia del centro de eventos.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

// Creating a batch with partitionId set will route all events in that batch to partition `0`.
CreateBatchOptions options = new CreateBatchOptions().setPartitionId("0");
EventDataBatch batch = producer.createBatch(options);

// Add events to batch and when you want to send the batch, send it using the producer.
producer.send(batch);

Publicación de eventos mediante la clave de partición

Cuando un conjunto de eventos no está asociado a ninguna partición específica, puede ser conveniente solicitar que el servicio Event Hubs mantenga diferentes eventos o lotes de eventos juntos en la misma partición. Esto se puede lograr estableciendo al partition key publicar los eventos. En el escenario siguiente, todos los eventos están relacionados con las ciudades, por lo que se envían con la clave de partición establecida en "ciudades".

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
    new EventData("New York"));

SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
producer.send(events, sendOptions);

Consumo de eventos desde una partición del centro de eventos

Para consumir eventos, cree o EventHubConsumerAsyncClientEventHubConsumerClient para un grupo de consumidores específico. Además, un consumidor debe especificar dónde en el flujo de eventos para empezar a recibir eventos.

Consumo de eventos con EventHubConsumerAsyncClient

En el fragmento de código siguiente, creamos un consumidor asincrónico que recibe eventos de partitionId y solo escucha los eventos más recientes que se insertan en la partición. Los desarrolladores pueden empezar a recibir eventos de varias particiones mediante la misma EventHubConsumerAsyncClient llamada a receiveFromPartition(String, EventPosition) con otro identificador de partición.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        new DefaultAzureCredentialBuilder().build())
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildAsyncConsumerClient();

// Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
String partitionId = "0";
EventPosition startingPosition = EventPosition.latest();

// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
//
// NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
// operation.  If the program ends after this, or the class is immediately disposed, no events will be
// received.
Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
    .subscribe(partitionEvent -> {
        PartitionContext partitionContext = partitionEvent.getPartitionContext();
        EventData event = partitionEvent.getData();

        System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
        System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
    }, error -> {
        // This is a terminal signal.  No more events will be received from the same Flux object.
        System.err.print("An error occurred:" + error);
    }, () -> {
        // This is a terminal signal.  No more events will be received from the same Flux object.
        System.out.print("Stream has ended.");
    });

Consumo de eventos con EventHubConsumerClient

Los desarrolladores pueden crear un consumidor sincrónico que devuelva eventos en lotes mediante .EventHubConsumerClient En el fragmento de código siguiente, se crea un consumidor que comienza a leer eventos desde el principio de la secuencia de eventos de la partición.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubConsumerClient consumer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildConsumerClient();

Instant twelveHoursAgo = Instant.now().minus(Duration.ofHours(12));
EventPosition startingPosition = EventPosition.fromEnqueuedTime(twelveHoursAgo);
String partitionId = "0";

// Reads events from partition '0' and returns the first 100 received or until the 30 seconds has elapsed.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 100,
    startingPosition, Duration.ofSeconds(30));

Long lastSequenceNumber = -1L;
for (PartitionEvent partitionEvent : events) {
    // For each event, perform some sort of processing.
    System.out.print("Event received: " + partitionEvent.getData().getSequenceNumber());
    lastSequenceNumber = partitionEvent.getData().getSequenceNumber();
}

// Figure out what the next EventPosition to receive from is based on last event we processed in the stream.
// If lastSequenceNumber is -1L, then we didn't see any events the first time we fetched events from the
// partition.
if (lastSequenceNumber != -1L) {
    EventPosition nextPosition = EventPosition.fromSequenceNumber(lastSequenceNumber, false);

    // Gets the next set of events from partition '0' to consume and process.
    IterableStream<PartitionEvent> nextEvents = consumer.receiveFromPartition(partitionId, 100,
        nextPosition, Duration.ofSeconds(30));
}

Consumo de eventos mediante EventProcessorClient

Para consumir eventos para todas las particiones de un centro de eventos, puede crear un EventProcessorClient para un grupo de consumidores específico.

EventProcessorClient delegará el procesamiento de eventos en una función de devolución de llamada que proporcione, lo que le permite centrarse en la lógica necesaria para proporcionar valor, mientras que el procesador es responsable de administrar las operaciones de consumidor subyacentes.

En nuestro ejemplo, nos centraremos en la creación de EventProcessorClient, usaremos los SampleCheckpointStore disponibles en los ejemplos y una función de devolución de llamada que procesa los eventos recibidos del centro de eventos y escribe en la consola. En el caso de las aplicaciones de producción, se recomienda usar un almacén duradero, como almacén de puntos de control con blobs de Azure Storage.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
    .consumerGroup("<< CONSUMER GROUP NAME >>")
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .checkpointStore(new SampleCheckpointStore())
    .processEvent(eventContext -> {
        System.out.printf("Partition id = %s and sequence number of event = %s%n",
            eventContext.getPartitionContext().getPartitionId(),
            eventContext.getEventData().getSequenceNumber());
    })
    .processError(errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    })
    .buildEventProcessorClient();

Solución de problemas

Consulte TROUBLESHOOTING.md.

Pasos siguientes

Más allá de lo descrito, la biblioteca cliente de Azure Event Hubs ofrece compatibilidad con muchos otros escenarios para aprovechar el conjunto de características completo del servicio Azure Event Hubs. Para explorar algunos de estos escenarios, consulte el archivo Léame de ejemplos.

Contribuciones

Si desea convertirse en colaborador activo de este proyecto, consulte nuestras Directrices de contribución para obtener más información.

Impresiones