你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

ServiceBusSenderAsyncClient 类

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusSenderAsyncClient

实现

public final class ServiceBusSenderAsyncClient
implements AutoCloseable

用于将消息发送到服务总线资源的 异步 客户端。

本文档中显示的示例使用名为 DefaultAzureCredential 的凭据对象进行身份验证,该对象适用于大多数方案,包括本地开发和生产环境。 此外,建议在生产环境中使用 托管标识 进行身份验证。 可以在 Azure 标识文档中找到有关不同身份验证方式及其相应凭据类型的详细信息。

示例:创建发送方的实例

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusSenderAsyncClient asyncSender = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sender()
     .queueName(queueName)
     .buildAsyncClient();

 // When users are done with the sender, they should dispose of it.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 asyncSender.close();

示例:将消息发送到服务总线资源

// `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
 // operation.  Users should use the callbacks on `subscribe` to understand the status of the send operation.
 asyncSender.createMessageBatch().flatMap(batch -> {
     batch.tryAddMessage(new ServiceBusMessage("test-1"));
     batch.tryAddMessage(new ServiceBusMessage("test-2"));
     return asyncSender.sendMessages(batch);
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred while sending batch:" + error);
 }, () -> {
     System.out.println("Send complete.");
 });

示例:使用大小限制 ServiceBusMessageBatch 的服务总线资源发送消息

Flux<ServiceBusMessage> telemetryMessages = Flux.just(firstMessage, secondMessage);

 // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
 // In this case, all the batches created with these options are limited to 256 bytes.
 CreateMessageBatchOptions options = new CreateMessageBatchOptions()
     .setMaximumSizeInBytes(256);
 AtomicReference<ServiceBusMessageBatch> currentBatch = new AtomicReference<>();

 // Sends the current batch if it is not null and not empty.  If the current batch is null, sets it.
 // Returns the batch to work with.
 Mono<ServiceBusMessageBatch> sendBatchAndGetCurrentBatchOperation = Mono.defer(() -> {
     ServiceBusMessageBatch batch = currentBatch.get();

     if (batch == null) {
         return asyncSender.createMessageBatch(options);
     }

     if (batch.getCount() > 0) {
         return asyncSender.sendMessages(batch).then(
             asyncSender.createMessageBatch(options)
                 .handle((ServiceBusMessageBatch newBatch, SynchronousSink<ServiceBusMessageBatch> sink) -> {
                     // Expect that the batch we just sent is the current one. If it is not, there's a race
                     // condition accessing currentBatch reference.
                     if (!currentBatch.compareAndSet(batch, newBatch)) {
                         sink.error(new IllegalStateException(
                             "Expected that the object in currentBatch was batch. But it is not."));
                     } else {
                         sink.next(newBatch);
                     }
                 }));
     } else {
         return Mono.just(batch);
     }
 });

 // The sample Flux contains two messages, but it could be an infinite stream of telemetry messages.
 Flux<Void> sendMessagesOperation = telemetryMessages.flatMap(message -> {
     return sendBatchAndGetCurrentBatchOperation.flatMap(batch -> {
         if (batch.tryAddMessage(message)) {
             return Mono.empty();
         } else {
             return sendBatchAndGetCurrentBatchOperation
                 .handle((ServiceBusMessageBatch newBatch, SynchronousSink<Void> sink) -> {
                     if (!newBatch.tryAddMessage(message)) {
                         sink.error(new IllegalArgumentException(
                             "Message is too large to fit in an empty batch."));
                     } else {
                         sink.complete();
                     }
                 });
         }
     });
 });

 // `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
 // operation.  Users should use the callbacks on `subscribe` to understand the status of the send operation.
 Disposable disposable = sendMessagesOperation.then(sendBatchAndGetCurrentBatchOperation)
     .subscribe(batch -> {
         System.out.println("Last batch should be empty: " + batch.getCount());
     }, error -> {
         System.err.println("Error sending telemetry messages: " + error);
     }, () -> {
         System.out.println("Completed.");

         // Continue using the sender and finally, dispose of the sender.
         // Clients should be long-lived objects as they require resources
         // and time to establish a connection to the service.
         asyncSender.close();
     });

示例:将消息发送到已启用会话的队列

以下代码片段演示如何将消息发送到已启用 服务总线会话的 队列。 将属性设置为 setMessageId(String messageId) “greetings”会将消息发送到 ID 为“greetings”的服务总线会话。

// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
     .sender()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Setting sessionId publishes that message to a specific session, in this case, "greeting".
 ServiceBusMessage message = new ServiceBusMessage("Hello world")
     .setSessionId("greetings");

 // `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
 // operation.  Users should use the callbacks on `subscribe` to understand the status of the send operation.
 sender.sendMessage(message).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred publishing batch: " + error);
 }, () -> {
     System.out.println("Send complete.");
 });

 // Continue using the sender and finally, dispose of the sender.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 sender.close();

方法摘要

修饰符和类型 方法和描述
Mono<Void> cancelScheduledMessage(long sequenceNumber)

取消计划消息的排队(如果尚未排队)。

Mono<Void> cancelScheduledMessages(Iterable<Long> sequenceNumbers)

取消已安排的消息排队(如果尚未排队)。

void close()

释放 ServiceBusSenderAsyncClient

Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext)

提交给定 ServiceBusTransactionContext的事务。

Mono<ServiceBusMessageBatch> createMessageBatch()

创建一个 ServiceBusMessageBatch 可以容纳传输允许的任意数量的消息的 。

Mono<ServiceBusMessageBatch> createMessageBatch(CreateMessageBatchOptions options)

ServiceBusMessageBatch创建配置了指定选项的 。

Mono<ServiceBusTransactionContext> createTransaction()

在服务总线上启动新事务。

String getEntityPath()

获取服务总线资源的名称。

String getFullyQualifiedNamespace()

获取完全限定的命名空间。

String getIdentifier()

获取 实例的 ServiceBusSenderAsyncClient标识符。

Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext)

回滚给定 ServiceBusTransactionContext的事务。

Mono<Long> scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime)

将计划消息发送到此发件人连接到Azure 服务总线实体。

Mono<Long> scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext)

将计划消息发送到此发件人连接到Azure 服务总线实体。

Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime)

将一批计划消息发送到此发件人连接到Azure 服务总线实体。

Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext)

将计划消息发送到此发件人连接到Azure 服务总线实体。

Mono<Void> sendMessage(ServiceBusMessage message)

将消息发送到服务总线队列或主题。

Mono<Void> sendMessage(ServiceBusMessage message, ServiceBusTransactionContext transactionContext)

将消息发送到服务总线队列或主题。

Mono<Void> sendMessages(ServiceBusMessageBatch batch)

将消息批发送到此发件人连接到Azure 服务总线实体。

Mono<Void> sendMessages(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext)

将消息批发送到此发件人连接到Azure 服务总线实体。

Mono<Void> sendMessages(Iterable<ServiceBusMessage> messages)

使用批处理方法将一组消息发送到服务总线队列或主题。

Mono<Void> sendMessages(Iterable<ServiceBusMessage> messages, ServiceBusTransactionContext transactionContext)

使用批处理方法将一组消息发送到服务总线队列或主题。

方法继承自 java.lang.Object

方法详细信息

cancelScheduledMessage

public Mono cancelScheduledMessage(long sequenceNumber)

取消计划消息的排队(如果尚未排队)。

Parameters:

sequenceNumber - 要取消的计划消息的 。

Returns:

Mono 服务总线资源上完成此操作的 。

cancelScheduledMessages

public Mono cancelScheduledMessages(Iterable sequenceNumbers)

取消已安排的消息排队(如果尚未排队)。

Parameters:

sequenceNumbers - 要取消的计划消息的 。

Returns:

Mono 服务总线资源上完成此操作的 。

close

public void close()

释放 ServiceBusSenderAsyncClient。 如果客户端具有专用连接,则基础连接也会关闭。

commitTransaction

public Mono commitTransaction(ServiceBusTransactionContext transactionContext)

提交给定 ServiceBusTransactionContext的事务。 这会调用服务总线。

Parameters:

transactionContext - 要提交。

Returns:

Mono 服务总线资源上完成此操作的 。

createMessageBatch

public Mono createMessageBatch()

创建一个 ServiceBusMessageBatch 可以容纳传输允许的任意数量的消息的 。

Returns:

一个 ServiceBusMessageBatch ,它可容纳传输允许的任意数量的消息。

createMessageBatch

public Mono createMessageBatch(CreateMessageBatchOptions options)

ServiceBusMessageBatch创建配置了指定选项的 。

Parameters:

options - 一组用于配置 的选项 ServiceBusMessageBatch

Returns:

使用给定选项配置的新 ServiceBusMessageBatch

createTransaction

public Mono createTransaction()

在服务总线上启动新事务。 ServiceBusTransactionContext应随需要在此事务中的所有操作一起ServiceBusReceivedMessage传递 。

Returns:

getEntityPath

public String getEntityPath()

获取服务总线资源的名称。

Returns:

服务总线资源的名称。

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

获取完全限定的命名空间。

Returns:

完全限定的命名空间。

getIdentifier

public String getIdentifier()

获取 实例的 ServiceBusSenderAsyncClient标识符。

Returns:

可以标识 实例的 ServiceBusSenderAsyncClient标识符。

rollbackTransaction

public Mono rollbackTransaction(ServiceBusTransactionContext transactionContext)

回滚给定 ServiceBusTransactionContext的事务。 这会调用服务总线。

Parameters:

transactionContext - 要回滚的事务。

Returns:

Mono 服务总线资源上完成此操作的 。

scheduleMessage

public Mono scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime)

将计划消息发送到此发件人连接到Azure 服务总线实体。 计划的消息已排队,并且仅在计划的排队时间提供给接收方。

Parameters:

message - 要发送到服务总线队列的消息。
scheduledEnqueueTime - OffsetDateTime,消息应出现在服务总线队列或主题中。

Returns:

可用于取消消息计划的计划消息的序列号。

scheduleMessage

public Mono scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext)

将计划消息发送到此发件人连接到Azure 服务总线实体。 计划的消息已排队,并且仅在计划的排队时间提供给接收方。

Parameters:

message - 要发送到服务总线队列的消息。
scheduledEnqueueTime - OffsetDateTime,消息应出现在服务总线队列或主题中。
transactionContext - 在发送到服务总线之前,对消息进行设置。

Returns:

可用于取消消息计划的计划消息的序列号。

scheduleMessages

public Flux scheduleMessages(Iterable messages, OffsetDateTime scheduledEnqueueTime)

将一批计划消息发送到此发件人连接到Azure 服务总线实体。 计划的消息已排队,并且仅在计划的排队时间提供给接收方。

Parameters:

messages - 要发送到服务总线队列或主题的消息。
scheduledEnqueueTime - OffsetDateTime,消息应出现在服务总线队列或主题中。

Returns:

可用于取消消息的计划消息的序列号。

scheduleMessages

public Flux scheduleMessages(Iterable messages, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext)

将计划消息发送到此发件人连接到Azure 服务总线实体。 计划的消息已排队,并且仅在计划的排队时间提供给接收方。

Parameters:

messages - 要发送到服务总线队列的消息。
scheduledEnqueueTime - OffsetDateTime,消息应出现在服务总线队列或主题中。
transactionContext - 要与操作关联的事务。

Returns:

可用于取消消息的计划消息的序列号。

sendMessage

public Mono sendMessage(ServiceBusMessage message)

将消息发送到服务总线队列或主题。

Parameters:

message - 要发送到服务总线队列或主题的消息。

Returns:

Mono 服务总线资源上完成此操作。

sendMessage

public Mono sendMessage(ServiceBusMessage message, ServiceBusTransactionContext transactionContext)

将消息发送到服务总线队列或主题。

Parameters:

message - 要发送到服务总线队列或主题的消息。
transactionContext - 在发送到服务总线之前,对批处理消息进行设置。

Returns:

Mono 服务总线资源上完成此操作。

sendMessages

public Mono sendMessages(ServiceBusMessageBatch batch)

将消息批发送到此发件人连接到Azure 服务总线实体。

Parameters:

batch - 的消息,允许客户端为一批消息发送允许的最大大小。

Returns:

Mono 服务总线资源上完成此操作。

sendMessages

public Mono sendMessages(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext)

将消息批发送到此发件人连接到Azure 服务总线实体。

Parameters:

batch - 的消息,允许客户端为一批消息发送允许的最大大小。
transactionContext - 在发送到服务总线之前,对批处理消息进行设置。

Returns:

Mono 服务总线资源上完成此操作。

sendMessages

public Mono sendMessages(Iterable messages)

使用批处理方法将一组消息发送到服务总线队列或主题。 如果消息的大小超过单个批的最大大小,将触发异常,并且发送将失败。 默认情况下,消息大小是链接上允许的最大数量。

Parameters:

messages - 要发送到服务总线队列或主题的消息。

Returns:

Mono 所有消息都已发送到服务总线资源时完成的 。

sendMessages

public Mono sendMessages(Iterable messages, ServiceBusTransactionContext transactionContext)

使用批处理方法将一组消息发送到服务总线队列或主题。 如果消息的大小超过单个批的最大大小,将触发异常,并且发送将失败。 默认情况下,消息大小是链接上允许的最大数量。

Parameters:

messages - 要发送到服务总线队列或主题的消息。
transactionContext - 在发送到服务总线之前,对批处理消息进行设置。

Returns:

Mono 所有消息都已发送到服务总线资源时完成的 。

适用于