发送适配器的交换模式

发送适配器是从 BizTalk 消息引擎传递的消息,通过网络传输。 可以使用单向或双向消息交换模式发送这些消息。 处理此双向消息交换模式的适配器称为Solicit-Response适配器。

阻止与非阻塞传输

消息传送引擎使用 IBTTransmitter.TransmitMessage 方法或 IBTTransmitterBatch.TransmitMessage 方法将消息传送到发送适配器,具体取决于适配器是否具有批感知性。 这两个版本的方法都有一个布尔返回值,该值指示适配器如何传输消息。 如果适配器返回 true,则它在返回之前已完全发送消息。 在这种情况下,消息引擎代表适配器从 MessageBox 数据库中删除消息。 如果适配器返回 false,它将启动消息传输,并在传输完成之前返回。 在这种情况下,适配器不仅负责从其应用程序队列中删除消息,还负责处理需要重试消息进行传输或挂起的传输失败。

返回 false 的适配器是一个非阻止调用,这意味着 TransmitMessage 实现代码不会阻止消息引擎的调用线程。 适配器只需将消息添加到内存中队列,即可传输,然后返回。 适配器应有自己的线程池,用于服务内存中队列、传输消息,然后通知引擎传输结果。

消息引擎的线程通常比用于通过网络发送数据的线程更多的 CPU 绑定。 混合这两种类型的线程会对性能产生负面影响。 非阻塞发送可实现这两种类型的线程的分离,并且对阻塞调用产生显著的性能改进。

下图显示了适配器的线程池,这些线程池可能由 I/O 操作绑定。 BizTalk Server消息引擎的线程池更受 CPU 处理的约束。 通过保留两个不同的线程池,并且不混合同一类型的线程,系统可以更高效地运行。

Diagram that shows the adapter's thread pool which can tend to be bound by I/O operations.

性能提示: 为了获得最佳性能,发送适配器应为非阻止和批感知。 当 BizTalk 文件适配器从阻止和非批处理感知更改为非阻止和批感知时,实现了三倍的性能提升。

故障排除提示: 阻止传输可能会导致整个主机实例的性能下降。 如果适配器在 TransmitMessage 中出现过多阻塞,则会阻止引擎线程将消息传送到其他适配器。

非批处理发送

无法批量识别的适配器应实现 IBTTransmitter ,如 异步发送适配器接口中所述。 对于适配器需要传输消息引擎调用 IBTTransmitter.TransmitMessage 的每个消息。 下面的对象交互关系图详细介绍了传输消息的典型方法,其中包括以下步骤:

  1. 引擎将消息传送到适配器。

  2. 适配器将消息排队到内存中队列,以便传输。

  3. 适配器线程池中的线程会取消队列中的消息排队,读取消息的配置,并通过网络传输消息。

  4. 适配器从引擎中获取新批。

  5. 适配器在批处理上调用 DeleteMessage ,并传入刚刚传输的消息。

  6. 适配器在批处理上调用 “完成 ”。

  7. 引擎处理批处理并从应用程序队列中删除消息。

  8. 引擎调用适配器以通知适配器 DeleteMessage 操作的结果。

    Image that shows the adapter deleting a single message from the application queue.

    前面的对象交互图显示了从应用程序队列中删除单个消息的适配器。 理想情况下,适配器会批处理消息操作,而不是一次对单个消息进行操作。

批处理发送

批量感知的适配器应实现 IBTBatchTransmitterIBTTransmitterBatch ,如 发送适配器接口中所述。 当引擎有用于适配器传输的消息时,引擎通过调用 IBTBatchTransmitter.GetBatch 从适配器获取新批。 适配器返回实现 IBTTransmitterBatch 的新批处理对象。 然后,引擎通过调用 IBTTransmitterBatch.BeginBatch 启动批处理。 此 API 具有一个 out 参数,该参数允许适配器指定要在批处理上接受的最大消息数。 适配器可以选择返回 DTC 事务。 然后,引擎调用 IBTTransmitterBatch.TransmitMessage 一次,以便将每个传出消息添加到批处理中。 调用此项的次数大于零,但小于或等于适配器指示的批的最大大小。 将所有消息添加到批处理后,适配器将调用 IBTTransmitterBatch.Done。 此时,适配器通常会将批处理中的所有消息排队到其内存中队列。 适配器从其自己的线程池中的线程或线程传输消息,就像非批处理感知适配器一样。 然后,适配器会通知引擎传输结果。

以下对象交互图演示了通过批处理发送适配器传输两条消息。

Diagram that shows the transmission of two messages by a batched send adapter.

处理传输失败

下图演示了用于传输失败的建议语义。 这些只是建议,不会由消息引擎强制执行。 如果这样做有有效原因,则可以开发一个适配器,使其偏离这些准则,但在这种情况下应小心。 例如,一般情况下,适配器应始终将所有重试用尽后将消息移动到备份传输。

更常见的是传输可能需要使用比配置的重试次数更多。 虽然这略有不同,但被认为是可以接受的,因为传输层的复原能力正在增加。 通常,消息引擎公开的 API 旨在尽可能为适配器提供最大控制。 通过这种控制,责任级别更高。

Diagram that shows the process for handling transmission failures.

适配器通过检查系统上下文属性 RetryCount 来确定消息上可用的重试次数。 适配器每次重试尝试调用 一次 Resubmit API,并传入要重新提交的消息。 连同消息一起,它会传递时间戳,指示引擎何时应将消息传递回适配器。 此值通常应为当前时间加上 RetryInterval 的值。 RetryInterval 是一个系统上下文属性,其单位为分钟数。 消息上下文中的 RetryCountRetryInterval 都是在发送端口上配置的值。 请考虑横向扩展部署,其中包含在多台计算机上部署的同一 BizTalk 主机的实例。 如果消息在重试间隔过期后传递,则消息可能会传递到任何配置为运行的计算机上的任一主机实例。 因此,适配器不应保留与重试尝试中使用的消息关联的任何状态,因为不能保证适配器的同一实例将在以后负责传输。

适配器应仅在重试计数小于或等于零后尝试将消息移动到备份传输。 如果没有为端口配置备份传输,则尝试将消息移动到备份传输将失败。 在这种情况下,应挂起消息。

以下代码片段演示了如何确定消息上下文中的重试计数和间隔,以及后续重新提交或移动到备份传输。

using Microsoft.XLANGs.BaseTypes;  
using Microsoft.BizTalk.Message.Interop;  
using Microsoft.BizTalk.TransportProxy.Interop;  
 …  
// RetryCount and RetyInterval are system context properties...  
private static readonly PropertyBase RetryCountProperty =   
 new BTS.RetryCount();  
private static readonly PropertyBase RetryIntervalProperty =   
 new BTS.RetryInterval();  

public void HandleRetry(IBaseMessage msg, IBTTransportBatch batch)  
{  
int retryCount = 0;  
int retryInterval = 0;  

// Get the RetryCount and RetryInterval off the msg ctx...  
 GetMessageRetryState(msg, out retryCount, out retryInterval);  

// If we have retries available resubmit, else move to   
 // backup transport...  
 if ( retryCount > 0 )  
batch.Resubmit(  
 msg, DateTime.Now.AddMinutes(retryInterval));  
else  
batch.MoveToNextTransport(msg);  
}  

public void GetMessageRetryState(  
 IBaseMessage msg,   
 out int retryCount,   
 out int retryInterval )  
{  
retryCount = 0;  
retryInterval = 0;  

object obj =  msg.Context.Read(  
RetryCountProperty.Name.Name,    
RetryCountProperty.Name.Namespace);   

if ( null != obj )  
retryCount = (int)obj;  

obj =  msg.Context.Read(  
RetryIntervalProperty.Name.Name,    
RetryIntervalProperty.Name.Namespace);   

if ( null != obj )  
retryInterval = (int)obj;  
}  

从 TransmitMessage 引发异常

如果适配器对任何 API IBTTransmitter.TransmitMessage、IBTTransmitterBatch.TransmitMessageIBTTransmitterBatch.Done 引发异常,则引擎会将涉及的消息的传输视为传输失败,并为消息采取适当的操作,如如何处理适配器故障中所述。

对于批感知发送适配器,在 TransmitMessage API 上引发异常会导致清除整个批处理,并针对该批中的所有消息执行默认传输失败操作。

要求-响应

双向发送适配器通常支持单向和双向传输。 发送适配器通过检查消息上下文中的 IsSolicitResponse 系统上下文属性来确定消息是否应作为单向或双向发送传输。

以下代码片段对此进行了演示:

private bool portIsTwoWay = false;  
private static readonly PropertyBase IsSolicitResponseProperty= new BTS.IsSolicitResponse();  

...  

 // Port is one way or two way...  
 object obj =  this.message.Context.Read(  
 IsSolicitResponseProperty.Name.Name,    
 IsSolicitResponseProperty.Name.Namespace);   

if ( null != obj )  
 this.portIsTwoWay = (bool)obj;  

在请求响应传输期间,适配器传输请求消息。 完成后,它会提交关联的响应,并告知消息引擎从 MessageBox 数据库中删除原始请求消息。 应在与提交响应消息相同的传输代理批处理中执行从应用程序队列中删除消息的操作。 这可确保删除和提交响应的原子性。 对于完整的原子性,适配器应使用 DTC 事务,这样,请求消息将传输到事务感知资源、提交响应消息和删除请求消息都位于同一 DTC 事务的上下文中。 与往常一样,我们建议使用非阻止发送传输请求消息。

以下代码片段演示双向发送的主要方面。 当引擎调用 IBTTransmitter.TransmitMessage 时,适配器会将消息排队以传输到内存中队列。 适配器返回 false 以指示它正在执行非阻塞发送。 适配器的线程池 (WorkerThreadThunk) 内存中队列服务,并取消消息排队,以将其移交给帮助程序方法。 此方法负责发送请求消息和接收响应消息。 (此帮助程序方法的实现超出了本主题的范围。) 响应消息提交到引擎中,并从应用程序队列中删除请求消息。

using System.Collections;  
using Microsoft.XLANGs.BaseTypes;  
using Microsoft.BizTalk.Message.Interop;  
using Microsoft.BizTalk.TransportProxy.Interop;  

     static private Queue _transmitQueue = new Queue();  
  static private IBTTransportProxy _transportProxy = null;   
// IBTTransmitter...  
 public bool TransmitMessage(IBaseMessage msg)  
{  
// Add message to the transmit queue...  
lock(_transmitQueue.SyncRoot)  
 {  
_transmitQueue.Enqueue(msg);  
 }  

 return false;  
}  

 // Threadpool worker thread...   
private void WorkerThreadThunk()  
{  
try  
{  
 IBaseMessage solicitMsg = null;  

 lock(_transmitQueue.SyncRoot)  
 {  
 solicitMsg =   
 (IBaseMessage)_transmitQueue.Dequeue();  
}  

 IBaseMessage responseMsg = SendSolicitResponse(   
 solicitMsg );  
Callback cb = new Callback();  

IBTTransportBatch batch = _transportProxy.GetBatch(  
 cb, null);  
batch.SubmitResponseMessage( solicitMsg, responseMsg );  
batch.DeleteMessage( solicitMsg );  
batch.Done(null);  
}  
catch(Exception)  
{  
// Handle failure....  
}  
}  

static private IBaseMessage SendSolicitResponse(  
 IBaseMessage solicitMsg )  
{  
// Helper method to send solicit message and receive   
 // response message...  
IBaseMessage responseMsg = null;  
return responseMsg;  
}  

动态发送

动态发送端口没有与之关联的适配器配置。 而是对适配器需要在动态端口上传输消息的任何默认属性使用处理程序配置。 例如,HTTP 适配器可能需要使用代理,并且需要提供凭据。 可以在适配器运行时缓存的处理程序配置中指定用户名、密码和端口。

为使引擎确定要发送动态消息的传输, OutboundTransportLocation 的前缀为适配器的别名。 适配器可以在安装时向BizTalk Server注册一个或多个别名。 引擎在运行时分析 OutboundTransportLocation 以查找匹配项。 适配器负责使用或不使用前面附加的别名来处理 OutboundTransportLocation 。 下表显示了为现用 BizTalk 适配器注册的一些别名示例。

适配器别名 适配器 OutboundTransportLocation 示例
HTTP:// HTTP http://www. MyCompany.com/bar
HTTPS:// HTTP https://www. MyCompany.com/bar
mailto: SMTP mailto:A.User@MyCompany.com
FILE:// FILE FILE://C:\MyCompany \%MessageID%.xml