你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
ServiceBusSenderAsyncClient 类
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusSenderAsyncClient
- com.
实现
public final class ServiceBusSenderAsyncClient
implements AutoCloseable
用于将消息发送到服务总线资源的 异步 客户端。
本文档中显示的示例使用名为 DefaultAzureCredential 的凭据对象进行身份验证,该对象适用于大多数方案,包括本地开发和生产环境。 此外,建议在生产环境中使用 托管标识 进行身份验证。 可以在 Azure 标识文档中找到有关不同身份验证方式及其相应凭据类型的详细信息。
示例:创建发送方的实例
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusSenderAsyncClient asyncSender = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sender()
.queueName(queueName)
.buildAsyncClient();
// When users are done with the sender, they should dispose of it.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
asyncSender.close();
示例:将消息发送到服务总线资源
// `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
// operation. Users should use the callbacks on `subscribe` to understand the status of the send operation.
asyncSender.createMessageBatch().flatMap(batch -> {
batch.tryAddMessage(new ServiceBusMessage("test-1"));
batch.tryAddMessage(new ServiceBusMessage("test-2"));
return asyncSender.sendMessages(batch);
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred while sending batch:" + error);
}, () -> {
System.out.println("Send complete.");
});
示例:使用大小限制 ServiceBusMessageBatch 的服务总线资源发送消息
Flux<ServiceBusMessage> telemetryMessages = Flux.just(firstMessage, secondMessage);
// Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
// In this case, all the batches created with these options are limited to 256 bytes.
CreateMessageBatchOptions options = new CreateMessageBatchOptions()
.setMaximumSizeInBytes(256);
AtomicReference<ServiceBusMessageBatch> currentBatch = new AtomicReference<>();
// Sends the current batch if it is not null and not empty. If the current batch is null, sets it.
// Returns the batch to work with.
Mono<ServiceBusMessageBatch> sendBatchAndGetCurrentBatchOperation = Mono.defer(() -> {
ServiceBusMessageBatch batch = currentBatch.get();
if (batch == null) {
return asyncSender.createMessageBatch(options);
}
if (batch.getCount() > 0) {
return asyncSender.sendMessages(batch).then(
asyncSender.createMessageBatch(options)
.handle((ServiceBusMessageBatch newBatch, SynchronousSink<ServiceBusMessageBatch> sink) -> {
// Expect that the batch we just sent is the current one. If it is not, there's a race
// condition accessing currentBatch reference.
if (!currentBatch.compareAndSet(batch, newBatch)) {
sink.error(new IllegalStateException(
"Expected that the object in currentBatch was batch. But it is not."));
} else {
sink.next(newBatch);
}
}));
} else {
return Mono.just(batch);
}
});
// The sample Flux contains two messages, but it could be an infinite stream of telemetry messages.
Flux<Void> sendMessagesOperation = telemetryMessages.flatMap(message -> {
return sendBatchAndGetCurrentBatchOperation.flatMap(batch -> {
if (batch.tryAddMessage(message)) {
return Mono.empty();
} else {
return sendBatchAndGetCurrentBatchOperation
.handle((ServiceBusMessageBatch newBatch, SynchronousSink<Void> sink) -> {
if (!newBatch.tryAddMessage(message)) {
sink.error(new IllegalArgumentException(
"Message is too large to fit in an empty batch."));
} else {
sink.complete();
}
});
}
});
});
// `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
// operation. Users should use the callbacks on `subscribe` to understand the status of the send operation.
Disposable disposable = sendMessagesOperation.then(sendBatchAndGetCurrentBatchOperation)
.subscribe(batch -> {
System.out.println("Last batch should be empty: " + batch.getCount());
}, error -> {
System.err.println("Error sending telemetry messages: " + error);
}, () -> {
System.out.println("Completed.");
// Continue using the sender and finally, dispose of the sender.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
asyncSender.close();
});
示例:将消息发送到已启用会话的队列
以下代码片段演示如何将消息发送到已启用 服务总线会话的 队列。 将属性设置为 setMessageId(String messageId) “greetings”会将消息发送到 ID 为“greetings”的服务总线会话。
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
.sender()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
.setSessionId("greetings");
// `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
// operation. Users should use the callbacks on `subscribe` to understand the status of the send operation.
sender.sendMessage(message).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred publishing batch: " + error);
}, () -> {
System.out.println("Send complete.");
});
// Continue using the sender and finally, dispose of the sender.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
sender.close();
方法摘要
方法继承自 java.lang.Object
方法详细信息
cancelScheduledMessage
public Mono
取消计划消息的排队(如果尚未排队)。
Parameters:
Returns:
cancelScheduledMessages
public Mono
取消已安排的消息排队(如果尚未排队)。
Parameters:
Returns:
close
public void close()
释放 ServiceBusSenderAsyncClient。 如果客户端具有专用连接,则基础连接也会关闭。
commitTransaction
public Mono
提交给定 ServiceBusTransactionContext的事务。 这会调用服务总线。
Parameters:
Returns:
createMessageBatch
public Mono
创建一个 ServiceBusMessageBatch 可以容纳传输允许的任意数量的消息的 。
Returns:
createMessageBatch
public Mono
ServiceBusMessageBatch创建配置了指定选项的 。
Parameters:
Returns:
createTransaction
public Mono
在服务总线上启动新事务。 ServiceBusTransactionContext应随需要在此事务中的所有操作一起ServiceBusReceivedMessage传递 。
Returns:
getEntityPath
public String getEntityPath()
获取服务总线资源的名称。
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
获取完全限定的命名空间。
Returns:
getIdentifier
public String getIdentifier()
获取 实例的 ServiceBusSenderAsyncClient标识符。
Returns:
rollbackTransaction
public Mono
回滚给定 ServiceBusTransactionContext的事务。 这会调用服务总线。
Parameters:
Returns:
scheduleMessage
public Mono
将计划消息发送到此发件人连接到Azure 服务总线实体。 计划的消息已排队,并且仅在计划的排队时间提供给接收方。
Parameters:
Returns:
scheduleMessage
public Mono
将计划消息发送到此发件人连接到Azure 服务总线实体。 计划的消息已排队,并且仅在计划的排队时间提供给接收方。
Parameters:
Returns:
scheduleMessages
public Flux
将一批计划消息发送到此发件人连接到Azure 服务总线实体。 计划的消息已排队,并且仅在计划的排队时间提供给接收方。
Parameters:
Returns:
scheduleMessages
public Flux
将计划消息发送到此发件人连接到Azure 服务总线实体。 计划的消息已排队,并且仅在计划的排队时间提供给接收方。
Parameters:
Returns:
sendMessage
public Mono
将消息发送到服务总线队列或主题。
Parameters:
Returns:
sendMessage
public Mono
将消息发送到服务总线队列或主题。
Parameters:
Returns:
sendMessages
public Mono
将消息批发送到此发件人连接到Azure 服务总线实体。
Parameters:
Returns:
sendMessages
public Mono
将消息批发送到此发件人连接到Azure 服务总线实体。
Parameters:
Returns:
sendMessages
public Mono
使用批处理方法将一组消息发送到服务总线队列或主题。 如果消息的大小超过单个批的最大大小,将触发异常,并且发送将失败。 默认情况下,消息大小是链接上允许的最大数量。
Parameters:
Returns:
sendMessages
public Mono
使用批处理方法将一组消息发送到服务总线队列或主题。 如果消息的大小超过单个批的最大大小,将触发异常,并且发送将失败。 默认情况下,消息大小是链接上允许的最大数量。
Parameters:
Returns: