EventHubProducerAsyncClient Class

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventHubProducerAsyncClient

Implements

public class EventHubProducerAsyncClient
implements Closeable

An asynchronous producer responsible for transmitting EventData to a specific Event Hub, grouped together in batches. Depending on the CreateBatchOptions specified when creating an EventDataBatch, the events may be automatically routed to an available partition or specific to a partition.

Allowing automatic routing of partitions is recommended when:

  • The sending of events needs to be highly available.
  • The event data should be evenly distributed among all available partitions.

If no partition id is specified, the following rules are used for automatically selecting one:

  1. Distribute the events equally amongst all available partitions using a round-robin approach.
  2. If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the message to another available partition.

Create a producer and publish events to any partition

// The required parameter is a way to authenticate with Event Hubs using credentials.
 // The connectionString provides a way to authenticate with Event Hub.
 EventHubProducerAsyncClient producer = new EventHubClientBuilder()
     .connectionString(
         "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}",
         "event-hub-name")
     .buildAsyncProducerClient();

 // Creating a batch without options set, will allow for automatic routing of events to any partition.
 producer.createBatch().flatMap(batch -> {
     batch.tryAdd(new EventData("test-event-1"));
     batch.tryAdd(new EventData("test-event-2"));
     return producer.send(batch);
 }).subscribe(unused -> { },
     error -> System.err.println("Error occurred while sending batch:" + error),
     () -> System.out.println("Send complete."));

Publish events to partition "foo"

// Creating a batch with partitionId set will route all events in that batch to partition `foo`.
 CreateBatchOptions options = new CreateBatchOptions().setPartitionId("foo");
 producer.createBatch(options).flatMap(batch -> {
     batch.tryAdd(new EventData("test-event-1"));
     batch.tryAdd(new EventData("test-event-2"));
     return producer.send(batch);
 }).subscribe(unused -> { },
     error -> System.err.println("Error occurred while sending batch:" + error),
     () -> System.out.println("Send complete."));

Publish events to the same partition, grouped together using partition key

// Creating a batch with partitionKey set will tell the service to hash the partitionKey and decide which
 // partition to send the events to. Events with the same partitionKey are always routed to the same partition.
 CreateBatchOptions options = new CreateBatchOptions().setPartitionKey("bread");
 producer.createBatch(options).flatMap(batch -> {
     batch.tryAdd(new EventData("sourdough"));
     batch.tryAdd(new EventData("rye"));
     return producer.send(batch);
 }).subscribe(unused -> { },
     error -> System.err.println("Error occurred while sending batch:" + error),
     () -> System.out.println("Send complete."));

Publish events using a size-limited EventDataBatch

Flux<EventData> telemetryEvents = Flux.just(firstEvent, secondEvent);

 // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
 // In this case, all the batches created with these options are limited to 256 bytes.
 CreateBatchOptions options = new CreateBatchOptions().setMaximumSizeInBytes(256);
 AtomicReference<EventDataBatch> currentBatch = new AtomicReference<>(
     producer.createBatch(options).block());

 // The sample Flux contains two events, but it could be an infinite stream of telemetry events.
 telemetryEvents.flatMap(event -> {
     final EventDataBatch batch = currentBatch.get();
     if (batch.tryAdd(event)) {
         return Mono.empty();
     }

     return Mono.when(
         producer.send(batch),
         producer.createBatch(options).map(newBatch -> {
             currentBatch.set(newBatch);

             // Add the event that did not fit in the previous batch.
             if (!newBatch.tryAdd(event)) {
                 throw Exceptions.propagate(new IllegalArgumentException(
                     "Event was too large to fit in an empty batch. Max size: " + newBatch.getMaxSizeInBytes()));
             }

             return newBatch;
         }));
 }).then()
     .doFinally(signal -> {
         final EventDataBatch batch = currentBatch.getAndSet(null);
         if (batch != null && batch.getCount() > 0) {
             producer.send(batch).block();
         }
     });

Method Summary

Modifier and Type Method and Description
void close()

Disposes of the EventHubProducerAsyncClient.

Mono<EventDataBatch>

createBatch()

Creates an EventDataBatch that can fit as many events as the transport allows.

Mono<EventDataBatch>

createBatch(CreateBatchOptions options)

Creates an EventDataBatch configured with the options specified.

String

getEventHubName()

Gets the Event Hub name this client interacts with.

Mono<EventHubProperties>

getEventHubProperties()

Retrieves information about an Event Hub, including the number of partitions present and their identifiers.

String

getFullyQualifiedNamespace()

Gets the fully qualified Event Hubs namespace that the connection is associated with.

String

getIdentifier()

Gets the client identifier.

Flux<String>

getPartitionIds()

Retrieves the identifiers for the partitions of an Event Hub.

Mono<PartitionProperties>

getPartitionProperties(String partitionId)

Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.

Mono<Void>

send(EventDataBatch batch)

Sends the batch to the associated Event Hub.

Mono<Void>

send(Iterable<EventData> events)

Sends a set of events to the associated Event Hub using a batched approach.

Mono<Void>

send(Iterable<EventData> events, SendOptions options)

Sends a set of events to the associated Event Hub using a batched approach.

Methods inherited from java.lang.Object

Method Details

close

public void close()

Disposes of the EventHubProducerAsyncClient. If the client had a dedicated connection, the underlying connection is also closed.

createBatch

public Mono createBatch()

Creates an EventDataBatch that can fit as many events as the transport allows.

Returns:

A new EventDataBatch that can fit as many events as the transport allows.

createBatch

public Mono createBatch(CreateBatchOptions options)

Creates an EventDataBatch configured with the options specified.

Parameters:

options - A set of options used to configure the EventDataBatch.

Returns:

A new EventDataBatch that can fit as many events as the transport allows.

getEventHubName

public String getEventHubName()

Gets the Event Hub name this client interacts with.

Returns:

The Event Hub name this client interacts with.

getEventHubProperties

public Mono getEventHubProperties()

Retrieves information about an Event Hub, including the number of partitions present and their identifiers.

Returns:

The set of information for the Event Hub that this client is associated with.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Gets the fully qualified Event Hubs namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net.

Returns:

The fully qualified Event Hubs namespace that the connection is associated with.

getIdentifier

public String getIdentifier()

Gets the client identifier.

Returns:

The unique identifier string for current client.

getPartitionIds

public Flux getPartitionIds()

Retrieves the identifiers for the partitions of an Event Hub.

Returns:

A Flux of identifiers for the partitions of an Event Hub.

getPartitionProperties

public Mono getPartitionProperties(String partitionId)

Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.

Parameters:

partitionId - The unique identifier of a partition associated with the Event Hub.

Returns:

The set of information for the requested partition under the Event Hub this client is associated with.

send

public Mono send(EventDataBatch batch)

Sends the batch to the associated Event Hub.

Parameters:

batch - The batch to send to the service.

Returns:

A Mono that completes when the batch is pushed to the service.

send

public Mono send(Iterable events)

Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.

List<EventData> events = Arrays.asList(new EventData("maple"), new EventData("aspen"),
     new EventData("oak"));
 producer
     .send(events)
     .subscribe(unused -> { },
         error -> System.err.println("Error occurred while sending events:" + error),
         () -> System.out.println("Send complete."));

For more information regarding the maximum event size allowed, see Azure Event Hubs Quotas and Limits.

Parameters:

events - Events to send to the service.

Returns:

A Mono that completes when all events are pushed to the service.

send

public Mono send(Iterable events, SendOptions options)

Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.

List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
     new EventData("New York"));
 SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
 producer
     .send(events, sendOptions)
     .subscribe(unused -> { },
         error -> System.err.println("Error occurred while sending events:" + error),
         () -> System.out.println("Send complete."));

For more information regarding the maximum event size allowed, see Azure Event Hubs Quotas and Limits.

Parameters:

events - Events to send to the service.
options - The set of options to consider when sending this batch.

Returns:

A Mono that completes when all events are pushed to the service.

Applies to