排队的应用程序使用事务来确保消息的正确性以及传递的可靠性。 但是,事务成本高昂,可以大幅降低消息吞吐量。 提高消息吞吐量的一种方法是在单个事务中让应用程序读取和处理多个消息。 权衡在于性能与恢复:随着批处理中消息数量的增加,回滚事务时所需的恢复工作量也会增加。 需要注意的是,在事务中对消息进行批处理与会话是不同的。 会话是一组由单个应用程序处理并作为单个单元提交的相关消息。 通常在需要将一组相关消息一起处理时使用会话。 例如,网上购物网站。 批处理 用于处理多个不相关的消息,从而提高消息吞吐量。 有关会话的详细信息,请参阅 对会话中的排队消息进行分组。 批处理中的消息也由单个应用程序处理,并作为单个单元提交,但批处理中的消息之间可能没有任何关系。 在事务中批处理消息是一种优化,不会更改应用程序运行方式。
进入批处理模式
TransactedBatchingBehavior终结点行为控制批处理。 将此终结点行为添加到服务终结点会通知 Windows 通讯框架(WCF)在事务中对消息进行批处理。 并非所有消息都需要事务,因此只有需要事务的消息被放在批处理中,并且只有由标记为TransactionScopeRequired
= true
和TransactionAutoComplete
= true
的操作发送的消息才会被视为批处理。 如果服务合同上的所有操作都标记为 TransactionScopeRequired
= false
和 TransactionAutoComplete
= false
,则永远不会进入批处理模式。
提交事务
根据以下因素提交批处理事务:
MaxBatchSize
。 TransactedBatchingBehavior 行为的一个属性。 此属性确定放入批处理中的消息的最大数量。 一旦达到此数量,批处理便会被提交。 该值并不是一个严格的限制,在接收到这一数量的消息之前也可以提交批处理。Transaction Timeout
。 当事务的超时时间已经过去 80% 之后,系统会提交批处理,并且创建一个新的批处理。 这表示如果为事务完成所分配的时间只剩下 20% 或更少,系统便会提交批处理。TransactionScopeRequired
。 在处理一批消息时,如果 WCF 找到消息具有TransactionScopeRequired
=false
,它将提交该批次,并在收到第一条具有TransactionScopeRequired
=true
和TransactionAutoComplete
=true
的消息时重新打开一个新批次。如果队列中不再有其他消息,则即使尚未达到
MaxBatchSize
或事务的超时时间尚未过去 80%,系统也会提交当前的批处理。
离开批处理模式
如果批处理中的消息导致事务中止,则执行以下步骤:
回滚整个消息批处理。
消息会被逐条读取,直到读取的消息数量超过最大批处理大小的两倍为止。
重新输入批处理模式。
选择批处理的大小
批处理的大小依赖于应用程序。 经验方法是达到应用程序的最佳批大小的最佳方式。 请一定牢记,在选择批次大小时,要根据应用程序的实际部署模型来选择大小。 例如,在部署应用程序时,如果需要远程计算机上的 SQL Server 以及跨越队列和 SQL 服务器的事务,则最好通过运行此确切配置来确定批大小。
并发和批处理
若要提高吞吐量,还可以同时运行多个批处理。 通过设置ConcurrencyMode.Multiple
ServiceBehaviorAttribute
启用并发批处理。
服务限制 是一种服务行为,用于指示可以对服务进行的最大并发调用数。 与批处理一起使用时,这被解释为可以运行的并发批处理数。 如果未设置服务限制,WCF 将最大并发调用默认为 16。 因此,如果默认情况下添加了批处理行为,则最多可以同时激活 16 批。 最好根据您的容量调整服务的节流和批处理。 例如,如果队列中有100条消息,并且希望处理20条消息为一批次,将最大并发调用数设置为16是没有用的,因为根据吞吐量,可能会有16个事务处于活动状态,这类似于没有启用批处理。 因此,在精调性能时,要么不使用并发批处理,要么确保并发批处理具有正确的服务节流大小。
批处理和多个终结点
终结点由地址和协定组成。 可能有多个终结点共享同一绑定。 两个终结点可以共享同一个绑定,并侦听统一资源标识符 (URI),即队列地址。 如果两个终结点从同一个队列中进行读取,并且两个终结点都添加了事务处理批处理行为,则指定的批次大小可能会发生冲突。 可以通过在两种事务处理批处理行为之间,使用指定的最小批量来实现批处理,从而解决这个问题。 在此方案中,如果其中一个终结点未指定事务处理批处理,则两个终结点都不会使用批处理。
示例:
以下示例演示如何在配置文件中指定 TransactedBatchingBehavior
。
<behaviors>
<endpointBehaviors>
<behavior name="TransactedBatchingBehavior"
maxBatchSize="100" />
</endpointBehaviors>
</behaviors>
以下示例演示如何在代码中指定 TransactedBatchingBehavior 。
using (ServiceHost serviceHost = new ServiceHost(typeof(OrderProcessorService)))
{
ServiceEndpoint sep = ServiceHost.AddServiceEndpoint(typeof(IOrderProcessor), new NetMsmqBinding(), "net.msmq://localhost/private/ServiceModelSamplesTransacted");
sep.Behaviors.Add(new TransactedBatchingBehavior(100));
// Open the ServiceHost to create listeners and start listening for messages.
serviceHost.Open();
// The service can now be accessed.
Console.WriteLine("The service is ready.");
Console.WriteLine("Press <ENTER> to terminate service.");
Console.WriteLine();
Console.ReadLine();
// Close the ServiceHostB to shut down the service.
serviceHost.Close();
}