为接收处理批处理消息
Batch 回调
接收适配器提交到消息引擎的批处理将异步处理。 因此,适配器需要一种机制来将回调绑定到适配器中的某个状态,以便可以通知它成功或失败并执行任何必要的清理操作。 回调语义很灵活,因此适配器可以使用一种或三种方法的组合。 它们是:
所有回调都是在实现 IBTBatchCallBack 的同一对象实例上进行的。
请求新批处理时传递到引擎中的 Cookie 用于将回调与适配器中的状态相关联。
每个实现 IBTBatchCallBack 的批处理都有不同的回调对象。 此处,每个对象都有自己的私有状态。
处理批处理后,适配器在 实现 IBTBatchCallBack.BatchComplete 时被调用回适配器。 批的总体状态由第一个参数 HRESULT 状态指示。 如果此值大于或等于零,则批处理成功,即引擎拥有数据的所有权,适配器可以自由地从线路中删除该数据。 负状态表示批处理失败:批处理中的操作均未成功,适配器负责处理失败。
如果批处理失败,适配器需要知道操作失败的项。 例如,假设适配器从磁盘读取 20 个文件,并使用单个批处理将它们提交到BizTalk Server。 如果第 10 个文件已损坏,适配器需要暂停该文件并重新提交剩余的 19 个文件。 此信息在第二个和第三个参数中可供适配器使用,
opCount
(类型为 short 和operationStatus
(类型为 BTBatchOperationStatus[)的操作计数) 。
注意
在单个批处理对象上,不应多次提交消息。 在同一批上多次提交同一消息对象将导致引擎错误。
操作计数指示批处理中有多少类型的操作 (BTBatchOperationStatus 数组) 的大小。 操作状态数组中的每个元素对应于给定的操作类型。 通过使用 BTBatchOperationStatus 数组,适配器可以通过查看 BTBatchOperationStatus.MessageStatus 数组中表示失败的 HRESULT 值来确定给定操作中的哪个项失败。 在上述方案中,适配器将创建一个新批处理,其中包含 19 个消息提交和一个消息挂起。
以下代码片段演示适配器如何通过其传输代理从引擎请求新批,并使用 Cookie 方法将单个消息提交到引擎中。 消息引擎在完成处理整批提交的消息后,调用 BatchComplete 方法作为批处理回调。
using Microsoft.BizTalk.TransportProxy.Interop;
using Microsoft.BizTalk.Message.Interop;
public class MyAdapter :
IBTTransport,
IBTTransportConfig,
IBTTransportControl,
IPersistPropertyBag,
IBaseComponent,
IBTBatchCallBack
{
private IBTTransportProxy _tp;
public void BatchComplete(
Int32 status,
Int16 opCount,
BTBatchOperationStatus[] operationStatus,
System.Object callbackCookie)
{
// Use cookie to correlate callback with work done,
// in this example the batch is to submit a single
// file the name of which will be in the
// callbackCookie
string fileName = (string)callbackCookie;
if ( status >= 0 )
// DeleteFile from disc
File.Delete(fileName);
else
// Rename file to fileName.bad
File.Move(fileName, fileName + ".bad");
}
private void SubmitMessage(
IBaseMessage msg,
string fileName)
{
// Note: Pass in the filename as the cookie
IBTTransportBatch batch =
_tp.GetBatch(this, (object)fileName);
// Add msg to batch for submitting
batch.SubmitMessage(msg);
// Process this batch
batch.Done(null);
}
}
BizTalk Server SDK 包括以下适配器的示例:文件、HTTP、MSMQ 和事务适配器。 所有这些适配器都基于名为 BaseAdapter 的通用构建基块构建。 BaseAdapter 版本 1.0.1 包含所有相关代码,用于分析操作状态并重新生成要提交的新批处理。
争用条件
解决错误和确定提交的批的最终结果这两个任务似乎很简单,但实际上它们依赖于来自不同线程的信息:
适配器根据BizTalk Server传递到适配器的 BatchComplete 回调方法的信息来处理错误。 此回调在适配器的线程上执行。
DTCCommitConfirm 是 IBTDTCCommitConfirm 对象上的一种方法。 IBTTransportBatch::D一次调用将返回 IBTDTCCommitConfirm 对象的实例。 此实例与 IBTTransportBatch::D一次 调用位于同一线程上,这与适配器的回调线程不同。
对于适配器对 IBTTransportBatch::D消息 传送引擎对单独线程中的回调方法 BatchComplete 进行相应的调用,以报告批处理提交的结果。 在 BatchComplete 中,适配器需要根据批处理传递还是失败提交或回滚事务。 在任一情况下,适配器都应调用 DTCCommitConfirm 来报告事务的状态。
可能存在争用条件,因为适配器的 BatchComplete 实现可以假定执行 BatchComplete 时,IBTTransportBatch::D one 返回的 IBTDTCCommitConfirm 对象始终可用。 但是,可以在单独的消息引擎线程中调用 BatchComplete ,甚至在 IBTTransportBatch::D one 返回之前。 当适配器尝试访问 IBTDTCCommitConfirm 对象作为 BatchComplete 实现的一部分时,可能存在访问冲突,因为该调用线程不再存在。 使用以下解决方案来避免这种情况。
在以下示例中,使用 事件解决了该问题。 在这里,通过使用事件的属性来访问接口指针。 get 始终等待集完成,然后再继续。
protected IBTDTCCommitConfirm CommitConfirm
{
set
{
this.commitConfirm = value;
this.commitConfirmEvent.Set();
}
get
{
this.commitConfirmEvent.WaitOne();
return this.commitConfirm;
}
}
protected IBTDTCCommitConfirm commitConfirm = null;
private ManualResetEvent commitConfirmEvent = new ManualResetEvent(false);
批处理状态代码
整体批处理状态代码指示批处理的结果。 操作状态提供单个邮件项的提交状态代码。 批处理可能由于各种原因而失败。 可能存在与安全相关的故障,或者在关闭引擎时可能已提交消息。 (通常,当引擎关闭时,它不接受任何新工作,但允许完成进程内工作。) 下表指示在批处理状态或操作状态中返回的一些常见 HRESULT 值。 它还指示这些代码是成功代码还是失败代码。 如何处理适配器故障中更全面地介绍了如何正确处理这些代码。
BTTransportProxy) 类中定义的代码 ( | 成功/失败代码 | 说明 |
---|---|---|
BTS_S_EPM_SECURITY_CHECK_FAILED | 成功 | 端口配置为执行安全检查并删除身份验证失败的消息。 适配器不应暂停返回此状态代码的批处理。 |
BTS_S_EPM_MESSAGE_SUSPENDED | 成功 | 指示一个或多个消息已挂起,并且引擎拥有数据的所有权。 这是一个成功代码,消息引擎已接受消息提交。 |
E_BTS_URL_DISALLOWED | 失败 | 提交的邮件包含无效的 InboundTransportLocation。 |
批处理操作
下表详细介绍了适配器用于将工作添加到给定批处理的 IBTTransportBatch 对象的成员方法和操作。
方法名称 | 操作类型 | 说明 |
---|---|---|
SubmitMessage | 提交 | 将消息提交到引擎中。 |
SubmitResponseMessage | 提交 | 将响应消息提交到引擎中。 这是请求-响应对中的响应消息。 |
DeleteMessage | 删除 | 删除适配器已使用非阻止发送通过网络成功传输的消息。 使用阻止发送的适配器不需要调用此方法,因为如果适配器从 TransmitMesssage 返回true ,消息引擎将代表适配器将其删除。 |
MoveToSuspendQ | MoveToSuspendQ | 将消息移动到挂起的队列中。 |
Resubmit | 提交 | 请求在以后重试消息以便传输。 这通常在传输尝试失败后调用。 |
MoveToNextTransport | MoveToNextTransport | 请求使用其备份传输来传输消息。 通常在所有重试都用完后调用。 如果没有备份传输,此方法将失败 |
SubmitRequestMessage | SubmitRequest | 提交请求消息。 这是请求-响应对中的请求消息。 |
CancelRequestForResponse | CancelRequestForResponse | 通知引擎适配器不再希望等待请求-响应对中的响应消息。 |
Clear | NA | 清除批处理中的所有工作。 |
已完成 | NA | 将一批消息提交到引擎进行处理。 |
Batch 管理
批处理在消息引擎的本机代码中实现。 因此,使用托管代码编写的适配器应在批处理完成后释放批处理的运行时可调用包装器 (RCW) 。 这是使用 Marshal.ReleaseComObject API 在托管代码中完成的。 请务必记住,应在 while 循环中调用此 API,直到返回的引用计数达到零。
BizTalk Server批处理中同步处理消息。 许多批处理可以同时处理,这可以通过调整应用程序域中的批大小来为适配器中的一些优化提供机会。 例如,可以 (处理 FTP 站点上的所有文件,直到) 达到某个限制。 对于 SAP,可以将单个流处理成大量消息,然后作为批提交。
适配器用于将操作传递回BizTalk Server的批处理不需要与BizTalk Server提供给适配器的消息列表完全对应。 换句话说,在执行事务性发送时,必须将重新提交操作 MoveToNextTransport 和 MoveToSuspendQ 拆分为单独的批。 许多适配器将已为多个终结点提交的一批消息分为单独的消息列表,以供进一步处理。
关键是,除了与BizTalk Server中的消息批处理关联的事务) 规则之外,没有规则 (。 批处理只是适配器在BizTalk Server中分块消息的一种特定于实现的方式。
适配器可以批处理来自多个较小批的消息,BizTalk Server已将其提供给更大的批,以便对BizTalk Server做出响应。 这可能是处理大量非常小消息的事务适配器的重大优化。 它将“每条消息的事务总数”比率降至最低。
通常BizTalk Server生成 5 到 10 条消息的发送端批。 如果这些消息非常小,则适配器在将事务性批量删除提交回BizTalk Server之前,最多可以批处理 100 条消息或更多。 这样的优化不容易实现;必须确保消息永远不会卡在适配器内存中,无休止地等待达到某个阈值。
批处理的重要性可以从 MQSeries 等高吞吐量适配器BizTalk Server的性能数字中看出。 在这些适配器中,接收消息的频率至少是发送消息的两倍。 默认情况下,接收适配器使用 100 条消息的批处理,发送适配器使用给定的默认BizTalk Server批处理。
事务性批处理
在编写创建事务对象的适配器并将其交给BizTalk Server时,需要负责编写执行以下操作的代码:
确定批处理操作的最终结果:提交或结束事务。 这可能取决于此事务范围内不依赖BizTalk Server的其他事务分支,例如写入消息队列 (MSMQ) 队列或事务数据库操作。
通过调用 IBTDTCCommitConfirm.DTCCommitConfirm 方法通知BizTalk Server最终结果。
true
返回表示事务已成功提交;返回false
表示事务失败和回滚。适配器必须通知BizTalk Server事务的最终结果,以维护其内部跟踪数据。 适配器通过调用 DTCCommitConfirm 通知BizTalk Server结果。 如果适配器不这样做,则会发生严重的内存泄漏,MSDTC 事务可能会超时并失败,尽管其操作成功完成。