BizTalk Custom Receive Pipeline Component to Batch Messages
Introduction:
Pipelines in BizTalk are used to do specific processing on incoming and outgoing messages. Each pipeline has some stages in which stage specific components can be employed to do certain tasks.
“Disassemble” is the second stage of the receive pipeline which is primarily used to disassemble incoming messages, validate schema and promote properties. To disassemble (break) messages it uses an envelope schema and it breaks the message record-wise. There is no provision to break a set of records (batches).
To break messages as per a configurable batch size, we need to write a custom receive pipeline. For example -
If batch size is configured to 5 and incoming message is -
<ns0:sample xmlns:ns0='https://MGSIBREDemo.LoanRequest'>
<data1><name>name1</name></data1>
<data1><name>name2</name></data1>
<data1><name>name3</name></data1>
<data1><name>name4</name></data1>
<data1><name>name5</name></data1>
<data1><name>name6</name></data1>
<data1><name>name7</name></data1>
<data1><name>name8</name></data1>
<data1><name>name9</name></data1>
<data1><name>name10</name></data1>
</ns0:sample>
Custom receive pipeline will break this message into 2 messages each having 5 records –
<ns0:sample xmlns:ns0='https://MGSIBREDemo.LoanRequest'>
<data1><name>name1</name></data1>
<data1><name>name2</name></data1>
<data1><name>name3</name></data1>
<data1><name>name4</name></data1>
<data1><name>name5</name></data1>
</ns0:sample>
<ns0:sample xmlns:ns0='https://MGSIBREDemo.LoanRequest'>
<data1><name>name6</name></data1>
<data1><name>name7</name></data1>
<data1><name>name8</name></data1>
<data1><name>name9</name></data1>
<data1><name>name10</name></data1>
</ns0:sample>
Sample Code:
Following is a high level guidance and code snippet (.net 2.0 based) to write such pipeline -
1. Create a class library project.
2. Add reference to Microsoft.BizTalk.Pipeline.dll.
3. Rename default class1.cls to DisassemblePipeline.cs.
4. Use following code
using System;
using System.Collections.Generic;
using System.Text;
using System.IO;
using System.Xml;
using System.ComponentModel;
using Microsoft.BizTalk.Component.Interop;
using Microsoft.BizTalk.Message.Interop;
namespace MessageBatchPipelineCompoent
{
[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
[ComponentCategory(CategoryTypes.CATID_DisassemblingParser)]
[System.Runtime.InteropServices.Guid("6118B8F0-8684-4ba2-87B4-8336D70BD4F7")]
public class DisassemblePipeline : IBaseComponent,
IDisassemblerComponent,
IComponentUI,
IPersistPropertyBag
{
//Used to hold disassembled messages
private System.Collections.Queue qOutputMsgs = new System.Collections.Queue();
private string systemPropertiesNamespace = @"https://schemas.microsoft.com/BizTalk/2003/system-properties";
/// <summary>
/// Batch size used to batch records
/// </summary>
private int _BatchSize;
public int BatchSize
{
get { return _BatchSize; }
set { _BatchSize = value; }
}
/// <summary>
/// Default constructor
/// </summary>
public DisassemblePipeline()
{
}
/// <summary>
/// Description of pipeline
/// </summary>
public string Description
{
get
{
return "Component to batch (break) large message into multiple small messages";
}
}
/// <summary>
/// Name of pipeline
/// </summary>
public string Name
{
get
{
return "MessageBatchComponent";
}
}
/// <summary>
/// Pipeline version
/// </summary>
public string Version
{
get
{
return "1.0.0.0";
}
}
/// <summary>
/// Returns collecton of errors
/// </summary>
public System.Collections.IEnumerator Validate(object projectSystem)
{
return null;
}
/// <summary>
/// Returns icon of pipeline
/// </summary>
public System.IntPtr Icon
{
get
{
return new System.IntPtr();
}
}
/// <summary>
/// Class GUID
/// </summary>
public void GetClassID(out Guid classID)
{
classID = new Guid("ACC3F15A-C389-4a5d-8F8E-2A951CDC4C19");
}
/// <summary>
/// InitNew
/// </summary>
public void InitNew()
{
}
/// <summary>
/// Load property from property bag
/// </summary>
public void Load(IPropertyBag propertyBag, int errorLog)
{
object val = null;
try
{
propertyBag.Read("BatchSize", out val, 0);
}
catch (Exception ex)
{
throw new ApplicationException("Error reading propertybag: " + ex.Message);
}
if (val != null)
_BatchSize = (int)val;
else
_BatchSize = 1;
}
/// <summary>
/// Write property to property bag
/// </summary>
public void Save(IPropertyBag propertyBag, bool clearDirty, bool saveAllProperties)
{
object val = (object)BatchSize;
propertyBag.Write("BatchSize", ref val);
}
/// <summary>
/// Disassembles (breaks) message into small messages as per batch size
/// </summary>
public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)
{
string originalDataString;
try
{
//fetch original message
Stream originalMessageStream = pInMsg.BodyPart.GetOriginalDataStream();
byte[] bufferOriginalMessage = new byte[originalMessageStream.Length];
originalMessageStream.Read(bufferOriginalMessage, 0, Convert.ToInt32(originalMessageStream.Length));
originalDataString = System.Text.ASCIIEncoding.ASCII.GetString(bufferOriginalMessage);
}
catch (Exception ex)
{
throw new ApplicationException("Error in reading original message: " + ex.Message);
}
XmlDocument originalMessageDoc = new XmlDocument();
StringBuilder messageString;
try
{
//load original message
originalMessageDoc.LoadXml(originalDataString);
//fetch namespace and root element
string namespaceURI = originalMessageDoc.DocumentElement.NamespaceURI;
string rootElement = originalMessageDoc.DocumentElement.Name;
//start batching messages
int counter = 0;
messageString = new StringBuilder();
messageString.Append("<" + rootElement + " xmlns:ns0='" + namespaceURI + "'>");
foreach (XmlNode childNode in originalMessageDoc.DocumentElement.ChildNodes)
{
counter = counter + 1;
if (counter > BatchSize)
{
messageString.Append("</" + rootElement + ">");
//Queue message
CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement);
counter = 1;
messageString.Remove(0, messageString.Length);
messageString.Append("<" + rootElement + " xmlns:ns0='" + namespaceURI + "'>");
messageString.Append(childNode.OuterXml);
}
else
{
messageString.Append(childNode.OuterXml);
}
}
messageString.Append("</" + rootElement + ">");
CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement);
}
catch (Exception ex)
{
throw new ApplicationException("Error in writing outgoing messages: " + ex.Message);
}
finally
{
messageString = null;
originalMessageDoc = null;
}
}
/// <summary>
/// Used to pass output messages`to next stage
/// </summary>
public IBaseMessage GetNext(IPipelineContext pContext)
{
if (qOutputMsgs.Count > 0)
return (IBaseMessage)qOutputMsgs.Dequeue();
else
return null;
}
/// <summary>
/// Queue outgoing messages
/// </summary>
private void CreateOutgoingMessage(IPipelineContext pContext, String messageString, string namespaceURI, string rootElement)
{
IBaseMessage outMsg;
try
{
//create outgoing message
outMsg = pContext.GetMessageFactory().CreateMessage();
outMsg.AddPart("Body", pContext.GetMessageFactory().CreateMessagePart(), true);
outMsg.Context.Promote("MessageType", systemPropertiesNamespace, namespaceURI + "#" + rootElement.Replace("ns0:", ""));
byte[] bufferOoutgoingMessage = System.Text.ASCIIEncoding.ASCII.GetBytes(messageString);
outMsg.BodyPart.Data = new MemoryStream(bufferOoutgoingMessage);
qOutputMsgs.Enqueue(outMsg);
}
catch (Exception ex)
{
throw new ApplicationException("Error in queueing outgoing messages: " + ex.Message);
}
}
}
}
5. Sign and then build project.
6. Copy MessageBatchPipelineCompoent.dll to C:\Program Files\Microsoft BizTalk Server 2006\Pipeline Components.
7. Pipeline component is now ready to use. In BTS pipeline project, add this pipeline component dll in toolbar and then use in disassemble stage.
Usage Scenario
Sometimes in BizTalk orchestrations/messaging, incoming messages come with huge chunks of records where as a part of business process each record is processed individually one by one. Publishing this huge record set in message box and further their processing consumes a lot of resources because orchestration takes a longer time to process each record individually.
Using this custom pipeline, huge messages can be broken into multiple messages of smaller records. And when published in a message box, orchestrations can run in parallel to process more than one message (broken ones) at the same time. Result is better performance and small processing time.
If process area (host) is scaled out, then broken messages are processed in different machines to give more scalable and fast execution.
Comments
- Anonymous
December 10, 2006
Why don't you use envelopes? - Anonymous
December 10, 2006
The comment has been removed - Anonymous
August 03, 2007
The comment has been removed - Anonymous
December 16, 2010
Getting below error after deployment and message is receive at the receive port:The published message could not be routed because no subscribers were found. This error occurs if the subscribing orchestration or send port has not been enlisted, or if some of the message properties necessary for subscription evaluation have not been promoted. Please use the Biztalk Administration console to troubleshoot this failure.Resolution: Added filter condition on send port BTS.MessageType = bauschlomb.biztalk.bts_debatch_pipeline.debatchschemaBut does not work with Receiving and Sending with Orchestration.