EventHubProducerAsyncClient Class
- java.lang.Object
- com.azure.messaging.eventhubs.EventHubProducerAsyncClient
Implements
public class EventHubProducerAsyncClient
implements Closeable
An asynchronous producer responsible for transmitting EventData to a specific Event Hub, grouped together in batches. Depending on the CreateBatchOptions specified when creating an EventDataBatch, the events may be automatically routed to an available partition or specific to a partition.
Allowing automatic routing of partitions is recommended when:
- The sending of events needs to be highly available.
- The event data should be evenly distributed among all available partitions.
If no partition id is specified, the following rules are used for automatically selecting one:
- Distribute the events equally amongst all available partitions using a round-robin approach.
- If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the message to another available partition.
Create a producer and publish events to any partition
// The required parameter is a way to authenticate with Event Hubs using credentials.
// The connectionString provides a way to authenticate with Event Hub.
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.connectionString(
"Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}",
"event-hub-name")
.buildAsyncProducerClient();
// Creating a batch without options set, will allow for automatic routing of events to any partition.
producer.createBatch().flatMap(batch -> {
batch.tryAdd(new EventData("test-event-1"));
batch.tryAdd(new EventData("test-event-2"));
return producer.send(batch);
}).subscribe(unused -> { },
error -> System.err.println("Error occurred while sending batch:" + error),
() -> System.out.println("Send complete."));
Publish events to partition "foo"
// Creating a batch with partitionId set will route all events in that batch to partition `foo`.
CreateBatchOptions options = new CreateBatchOptions().setPartitionId("foo");
producer.createBatch(options).flatMap(batch -> {
batch.tryAdd(new EventData("test-event-1"));
batch.tryAdd(new EventData("test-event-2"));
return producer.send(batch);
}).subscribe(unused -> { },
error -> System.err.println("Error occurred while sending batch:" + error),
() -> System.out.println("Send complete."));
Publish events to the same partition, grouped together using partition key
// Creating a batch with partitionKey set will tell the service to hash the partitionKey and decide which
// partition to send the events to. Events with the same partitionKey are always routed to the same partition.
CreateBatchOptions options = new CreateBatchOptions().setPartitionKey("bread");
producer.createBatch(options).flatMap(batch -> {
batch.tryAdd(new EventData("sourdough"));
batch.tryAdd(new EventData("rye"));
return producer.send(batch);
}).subscribe(unused -> { },
error -> System.err.println("Error occurred while sending batch:" + error),
() -> System.out.println("Send complete."));
Publish events using a size-limited EventDataBatch
Flux<EventData> telemetryEvents = Flux.just(firstEvent, secondEvent);
// Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
// In this case, all the batches created with these options are limited to 256 bytes.
CreateBatchOptions options = new CreateBatchOptions().setMaximumSizeInBytes(256);
AtomicReference<EventDataBatch> currentBatch = new AtomicReference<>(
producer.createBatch(options).block());
// The sample Flux contains two events, but it could be an infinite stream of telemetry events.
telemetryEvents.flatMap(event -> {
final EventDataBatch batch = currentBatch.get();
if (batch.tryAdd(event)) {
return Mono.empty();
}
return Mono.when(
producer.send(batch),
producer.createBatch(options).map(newBatch -> {
currentBatch.set(newBatch);
// Add the event that did not fit in the previous batch.
if (!newBatch.tryAdd(event)) {
throw Exceptions.propagate(new IllegalArgumentException(
"Event was too large to fit in an empty batch. Max size: " + newBatch.getMaxSizeInBytes()));
}
return newBatch;
}));
}).then()
.doFinally(signal -> {
final EventDataBatch batch = currentBatch.getAndSet(null);
if (batch != null && batch.getCount() > 0) {
producer.send(batch).block();
}
});
Method Summary
Modifier and Type | Method and Description |
---|---|
void |
close()
Disposes of the EventHubProducerAsyncClient. |
createBatch()
Creates an EventDataBatch that can fit as many events as the transport allows. |
|
createBatch(CreateBatchOptions options)
Creates an EventDataBatch configured with the options specified. |
|
getEventHubName()
Gets the Event Hub name this client interacts with. |
|
getEventHubProperties()
Retrieves information about an Event Hub, including the number of partitions present and their identifiers. |
|
getFullyQualifiedNamespace()
Gets the fully qualified Event Hubs namespace that the connection is associated with. |
|
getIdentifier()
Gets the client identifier. |
|
getPartitionIds()
Retrieves the identifiers for the partitions of an Event Hub. |
|
getPartitionProperties(String partitionId)
Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream. |
|
send(EventDataBatch batch)
Sends the batch to the associated Event Hub. |
|
send(Iterable<EventData> events)
Sends a set of events to the associated Event Hub using a batched approach. |
|
send(Iterable<EventData> events, SendOptions options)
Sends a set of events to the associated Event Hub using a batched approach. |
Methods inherited from java.lang.Object
Method Details
close
public void close()
Disposes of the EventHubProducerAsyncClient. If the client had a dedicated connection, the underlying connection is also closed.
createBatch
public Mono
Creates an EventDataBatch that can fit as many events as the transport allows.
Returns:
createBatch
public Mono
Creates an EventDataBatch configured with the options specified.
Parameters:
Returns:
getEventHubName
public String getEventHubName()
Gets the Event Hub name this client interacts with.
Returns:
getEventHubProperties
public Mono
Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
Gets the fully qualified Event Hubs namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net
.
Returns:
getIdentifier
public String getIdentifier()
Gets the client identifier.
Returns:
getPartitionIds
public Flux
Retrieves the identifiers for the partitions of an Event Hub.
Returns:
getPartitionProperties
public Mono
Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.
Parameters:
Returns:
send
public Mono
Sends the batch to the associated Event Hub.
Parameters:
Returns:
send
public Mono
Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.
List<EventData> events = Arrays.asList(new EventData("maple"), new EventData("aspen"),
new EventData("oak"));
producer
.send(events)
.subscribe(unused -> { },
error -> System.err.println("Error occurred while sending events:" + error),
() -> System.out.println("Send complete."));
For more information regarding the maximum event size allowed, see Azure Event Hubs Quotas and Limits.
Parameters:
Returns:
send
public Mono
Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.
List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
new EventData("New York"));
SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
producer
.send(events, sendOptions)
.subscribe(unused -> { },
error -> System.err.println("Error occurred while sending events:" + error),
() -> System.out.println("Send complete."));
For more information regarding the maximum event size allowed, see Azure Event Hubs Quotas and Limits.
Parameters:
Returns: