发送适配器的交换模式

发送适配器是从 BizTalk 消息引擎传递的消息,要通过线路传输。 可以使用单向或双向消息交换模式发送这些消息。 处理这种双向消息交换模式的适配器称为 Solicit-Response 适配器。

阻止传输与非阻塞传输

消息引擎使用 IBTTransmitter.TransmitMessage 方法或 IBTTransmitterBatch.TransmitMessage 方法将消息传送到发送适配器,具体取决于适配器是否是批量感知的。 方法的两个版本都有一个布尔返回值,该值指示适配器如何传输消息。 如果适配器返回 true,则它在返回之前已完全发送消息。 在这种情况下,消息引擎代表适配器从 MessageBox 数据库中删除消息。 如果适配器返回 false,它将启动消息传输并在传输完成之前返回。 在这种情况下,适配器不仅负责从其应用程序队列中删除消息,还负责处理需要重试或暂停消息的传输失败。

返回的 false 适配器是一个非阻止调用,这意味着 TransmitMessage 实现代码不会阻止消息引擎的调用线程。 适配器只需将消息添加到准备传输的内存中队列,然后返回。 适配器应有自己的线程池,该池为内存中队列提供服务、传输消息,然后通知引擎传输结果。

消息引擎的线程通常比用于通过线路发送数据的线程更多的 CPU 限制。 混合这两种类型的线程会对性能产生负面影响。 非阻塞发送可实现这两种类型的线程的分离,并且与阻塞调用相较,可显著提高性能。

下图显示了适配器的线程池,这些线程池往往受 I/O 操作的约束。 BizTalk Server消息传送引擎的线程池更受 CPU 处理的约束。 通过保留两个不同的线程池和不混合相同类型的线程,系统可以更有效地运行。

显示适配器的线程池的示意图,该线程池通常受 I/O 操作的约束。

性能提示: 为获得最佳性能,发送适配器应是非阻止和批量感知的。 当 BizTalk 文件适配器从阻止和非批处理感知更改为非阻止和批处理感知时,实现了三倍的性能提升。

故障排除提示: 阻止传输可能会导致整个主机实例的性能下降。 如果适配器在 TransmitMessage 中过度阻塞,则会阻止引擎线程将消息传送到其他适配器。

非批处理发送

无法识别批处理的适配器应实现 IBTTransmitter ,详见 异步发送适配器的接口。 对于适配器需要传输的每条消息,消息引擎调用 IBTTransmitter.TransmitMessage。 下面的对象交互关系图详细介绍了传输消息的典型方法,其中包括以下步骤:

  1. 引擎将消息传送到适配器。

  2. 适配器将消息排入队列,以准备传输的内存中队列。

  3. 适配器线程池中的线程将消息从队列中取消排队,读取消息的配置,并通过网络传输消息。

  4. 适配器从引擎中获取新批。

  5. 适配器对批调用 DeleteMessage ,并传入刚刚传输的消息。

  6. 适配器在批处理上调用 Done

  7. 引擎处理批处理并从应用程序队列中删除消息。

  8. 引擎回调适配器以通知其 DeleteMessage 操作的结果。

    显示适配器从应用程序队列中删除单个消息的图像。

    前面的对象交互关系图显示了适配器从应用程序队列中删除单个消息。 理想情况下,适配器会批处理消息操作,而不是一次对单个消息进行操作。

批量发送

批量感知的适配器应实现 IBTBatchTransmitterIBTTransmitterBatch, 详见 发送适配器的接口。 当引擎具有要传输的适配器的消息时,引擎通过调用 IBTBatchTransmitter.GetBatch 从适配器获取新批。 适配器返回实现 IBTTransmitterBatch 的新批处理对象。 然后,引擎通过调用 IBTTransmitterBatch.BeginBatch 启动批处理。 此 API 具有一个 out 参数,该参数允许适配器指定它将在批处理上接受的最大消息数。 适配器可以选择返回 DTC 事务。 然后,引擎对要添加到批的每个传出消息调用 IBTTransmitterBatch.TransmitMessage 一次。 调用此的次数大于零,但小于或等于适配器指示的最大批大小。 将所有消息添加到批后,适配器将调用 IBTTransmitterBatch.Done。 此时,适配器通常会将批中的所有消息排入其内存队列。 适配器从一个或多个线程在其自己的线程池中传输消息,就像使用非批量感知适配器一样。 然后,适配器将传输结果通知引擎。

以下对象交互关系图演示了通过批处理发送适配器传输两条消息。

显示通过批处理发送适配器传输两条消息的示意图。

处理传输故障

下图演示了传输失败的建议语义。 这些只是建议,不会由消息引擎强制执行。 如果有正当理由这样做,则可以开发偏离这些准则的适配器,但在这种情况下应小心。 例如,一般情况下,适配器应始终在所有重试都用完后将消息移动到备份传输。

传输可能需要使用的重试次数比配置的次数更多更常见。 虽然这稍有不同,但被认为是可以接受的,因为传输层的复原能力正在提高。 通常,消息引擎公开的 API 旨在尽可能为适配器提供最大控制。 有了这种控制,责任就更高了。

显示传输故障处理过程的示意图。

适配器通过检查系统上下文属性 RetryCount 来确定消息上可用的重试次数。 适配器对每次重试尝试调用 一次重新提交 API,并传入要重新提交的消息。 与消息一起传递时间戳,指示引擎何时应将消息传递回适配器。 此值通常应为当前时间加上 RetryInterval 的值。 RetryInterval 是一个系统上下文属性,其单位为分钟。 消息上下文中的 RetryCountRetryInterval 都是在发送端口上配置的值。 考虑在多台计算机上部署同一 BizTalk 主机的实例的横向扩展部署。 如果消息在重试间隔到期后传递,则消息可能会传递到配置为运行这些实例的任何计算机上的任意一个主机实例。 因此,适配器不应保留与重试尝试使用的消息关联的任何状态,因为不能保证适配器的同一实例将在以后负责传输。

适配器应仅在重试计数小于或等于零后尝试将消息移动到备份传输。 如果没有为端口配置备份传输,则尝试将消息移动到备份传输将失败。 在这种情况下,应暂停消息。

以下代码片段演示了如何从消息上下文中确定重试计数和间隔,以及后续重新提交或移动到备份传输。

using Microsoft.XLANGs.BaseTypes;  
using Microsoft.BizTalk.Message.Interop;  
using Microsoft.BizTalk.TransportProxy.Interop;  
 …  
// RetryCount and RetyInterval are system context properties...  
private static readonly PropertyBase RetryCountProperty =   
 new BTS.RetryCount();  
private static readonly PropertyBase RetryIntervalProperty =   
 new BTS.RetryInterval();  

public void HandleRetry(IBaseMessage msg, IBTTransportBatch batch)  
{  
int retryCount = 0;  
int retryInterval = 0;  

// Get the RetryCount and RetryInterval off the msg ctx...  
 GetMessageRetryState(msg, out retryCount, out retryInterval);  

// If we have retries available resubmit, else move to   
 // backup transport...  
 if ( retryCount > 0 )  
batch.Resubmit(  
 msg, DateTime.Now.AddMinutes(retryInterval));  
else  
batch.MoveToNextTransport(msg);  
}  

public void GetMessageRetryState(  
 IBaseMessage msg,   
 out int retryCount,   
 out int retryInterval )  
{  
retryCount = 0;  
retryInterval = 0;  

object obj =  msg.Context.Read(  
RetryCountProperty.Name.Name,    
RetryCountProperty.Name.Namespace);   

if ( null != obj )  
retryCount = (int)obj;  

obj =  msg.Context.Read(  
RetryIntervalProperty.Name.Name,    
RetryIntervalProperty.Name.Namespace);   

if ( null != obj )  
retryInterval = (int)obj;  
}  

从 TransmitMessage 引发异常

如果适配器对任何 API IBTTransmitter.TransmitMessageIBTTransmitterBatch.TransmitMessageIBTTransmitterBatch.Done 引发异常,则引擎会将所涉及的消息传输视为传输失败,并针对消息采取相应的操作,如 如何处理适配器故障中所述。

对于批量感知发送适配器,在 TransmitMessage API 上引发异常会导致清除整个批处理,并针对该批中的所有消息执行默认传输失败操作。

要求-响应

双向发送适配器通常支持单向和双向传输。 发送适配器通过检查消息上下文中的 IsSolicitResponse 系统上下文属性来确定消息应以单向还是双向发送方式传输消息。

以下代码片段对此进行了演示:

private bool portIsTwoWay = false;  
private static readonly PropertyBase IsSolicitResponseProperty= new BTS.IsSolicitResponse();  

...  

 // Port is one way or two way...  
 object obj =  this.message.Context.Read(  
 IsSolicitResponseProperty.Name.Name,    
 IsSolicitResponseProperty.Name.Namespace);   

if ( null != obj )  
 this.portIsTwoWay = (bool)obj;  

在请求-响应传输期间,适配器传输请求消息。 完成此操作后,它会提交关联的响应,并告知消息引擎从 MessageBox 数据库中删除原始请求消息。 从应用程序队列中删除消息的操作应在与提交响应消息相同的传输代理批处理中执行。 这可确保删除和提交响应的原子性。 为了完全实现原子性,适配器应使用 DTC 事务,在该事务中,请求消息传输到事务感知资源、提交响应消息和删除请求消息均在同一 DTC 事务的上下文中。 与往常一样,我们建议使用非阻止发送来传输请求消息。

以下代码片段演示双向发送main方面。 当引擎调用 IBTTransmitter.TransmitMessage 时,适配器会将要传输到内存中队列的消息排入队列。 适配器返回 false 以指示它正在执行非阻止发送。 适配器的线程池 (WorkerThreadThunk) 为内存中队列提供服务,并将消息取消排队以将其传递给帮助程序方法。 此方法负责发送请求消息和接收响应消息。 (此帮助程序方法的实现超出了本主题的范围。) 响应消息提交到引擎中,请求消息将从应用程序队列中删除。

using System.Collections;  
using Microsoft.XLANGs.BaseTypes;  
using Microsoft.BizTalk.Message.Interop;  
using Microsoft.BizTalk.TransportProxy.Interop;  

     static private Queue _transmitQueue = new Queue();  
  static private IBTTransportProxy _transportProxy = null;   
// IBTTransmitter...  
 public bool TransmitMessage(IBaseMessage msg)  
{  
// Add message to the transmit queue...  
lock(_transmitQueue.SyncRoot)  
 {  
_transmitQueue.Enqueue(msg);  
 }  

 return false;  
}  

 // Threadpool worker thread...   
private void WorkerThreadThunk()  
{  
try  
{  
 IBaseMessage solicitMsg = null;  

 lock(_transmitQueue.SyncRoot)  
 {  
 solicitMsg =   
 (IBaseMessage)_transmitQueue.Dequeue();  
}  

 IBaseMessage responseMsg = SendSolicitResponse(   
 solicitMsg );  
Callback cb = new Callback();  

IBTTransportBatch batch = _transportProxy.GetBatch(  
 cb, null);  
batch.SubmitResponseMessage( solicitMsg, responseMsg );  
batch.DeleteMessage( solicitMsg );  
batch.Done(null);  
}  
catch(Exception)  
{  
// Handle failure....  
}  
}  

static private IBaseMessage SendSolicitResponse(  
 IBaseMessage solicitMsg )  
{  
// Helper method to send solicit message and receive   
 // response message...  
IBaseMessage responseMsg = null;  
return responseMsg;  
}  

动态发送

动态发送端口没有与之关联的适配器配置。 相反,它们将处理程序配置用于适配器在动态端口上传输消息所需的任何默认属性。 例如,HTTP 适配器可能需要使用代理,并且需要提供凭据。 可以在适配器在运行时缓存的处理程序配置中指定用户名、密码和端口。

要使引擎确定要发送动态消息的传输, OutboundTransportLocation 会以适配器的别名作为前缀。 适配器可以在安装时向 BizTalk Server 注册一个或多个别名。 引擎在运行时分析 OutboundTransportLocation 以查找匹配项。 适配器负责处理带有或不带前缀别名的 OutboundTransportLocation 。 下表显示了为现成的 BizTalk 适配器注册的别名的一些示例。

适配器别名 适配器 OutboundTransportLocation 示例
HTTP:// HTTP http://www. MyCompany.com/bar
HTTPS:// HTTP https://www. MyCompany.com/bar
mailto: SMTP mailto:A.User@MyCompany.com
文件:// 文件 FILE://C:\ MyCompany \%MessageID%.xml