How to Develop BizTalk Custom Pipeline Components – Part2
In my current series, I am talking about developing custom pipeline components in detail. In part1, we talked about pipeline component types and about developing general pipeline component with example. I encourage you to refer part1 article to have a logical start in this article series.
In this part, I am talking about disassemble pipeline component development.
Developing Disassemble Pipeline Component
Disassemble components primarily do two tasks – Split (disassemble) messages and promote properties.
Disassemble pipeline component implements following interfaces –
c. IDisassemblerComponent Interface
d. IPersistPropertyBag (Optional. Required when pipeline design time properties are to be defined)
All pipelines implement this interface. Interface provides members which can be implemented to provide information about pipeline.
Members |
Usage |
Description |
Property. Used to specify small description about pipeline component. Description is visible on pipeline properties page at design time. |
Name |
Property used to specify name of pipeline component. Name is visible on pipeline properties page at design time. |
Version |
Property used to specify version (example 1.0.0.0) of pipeline component. Visible on pipeline properties page at design time. |
All pipeline components must implement this interface. Members of this interface provide design time support.
Members |
Usage |
Icon |
Property used to provide icon associated with pipeline component. |
Validate |
Method. This is called by pipeline designer before pipeline compilation to verify that all configuration properties are correctly set. |
IDisassemblerComponent Interface
This is core interface. Disassemble stage specific operations/logics are implemented here.
Members |
Usage |
Disassemble |
Used to split or break incoming message document |
GetNext |
Returns messages resulting from the disassemble method execution |
IPersistPropertyBag
Interface provides members which can be used to save and load pipeline design time properties with property bag. This interface is implemented only when pipeline’s design time properties are to be defined.
Members |
Usage |
GetClassID |
Method. Retrieves the component's globally unique identifying value. |
InitNew |
Method. Initializes any objects needed by the component to use the persisted properties. |
Load |
Method. Used to load property from property bag. |
Save |
Method. Used to save property to property bag. |
Code Walk Though with Sample
Scenario taken for disassemble component is simple. We will write a component which will break a message into batch of multiple messages. Batch size can be configured at pipeline design time.
If batch size is configured as 3 at pipeline design and incoming message is -
<ns0:sample xmlns:ns0='https://Demo.LoanRequest'>
<data1><name>name1</name></data1>
<data1><name>name2</name></data1>
<data1><name>name3</name></data1>
<data1><name>name4</name></data1>
<data1><name>name5</name></data1>
<data1><name>name6</name></data1>
</ns0:sample>
It will break this message into 2 messages each having 3 records –
<ns0:sample xmlns:ns0='https://Demo.LoanRequest'>
<data1><name>name1</name></data1>
<data1><name>name2</name></data1>
<data1><name>name3</name></data1>
</ns0:sample>
<ns0:sample xmlns:ns0='https://MGSIBREDemo.LoanRequest'>
<data1><name>name4</name></data1>
<data1><name>name5</name></data1>
<data1><name>name6</name></data1>
</ns0:sample>
VS 2005 is used for development.
1. Create a class library project say MessageBatchPipelineCompoent.csproj. When project is created, rename Class1.cs to DisassemblePipeline.cs.
2. Configure strong name key file in project property so that assembly can be signed.
3. Add following reference
Microsoft.BizTalk.Pipeline.dll
References can be found in folder “C:\Program Files\Microsoft BizTalk Server 2006” depending on location of BTS installation.
4. Add following namespaces.
using System.Xml;
using System.IO;
using Microsoft.BizTalk.Message.Interop;
using Microsoft.BizTalk.Component.Interop;
5. Implement following interfaces-
IBaseComponent,IDisassemblerComponent,IComponentUI,IPersistPropertyBag
6. Add following attributes to pipeline component class
[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
[ComponentCategory(CategoryTypes.CATID_DisassemblingParser)]
[System.Runtime.InteropServices.Guid("6118B8F0-8684-4ba2-87B4-8336D70BD4F7")]
public class DisassemblePipeline : IBaseComponent,
IDisassemblerComponent,
IComponentUI,
IPersistPropertyBag
private System.Collections.Queue qOutputMsgs = new System.Collections.Queue();
Disassemble components can only be used in disassemble stage. Component Category attribute is used to specify disassemble nature (remember stage affinity) of component.
We will learn about use of qOutputMsgs variable later. It is playing very important role.
7. Implement members of IBaseComponent interface.
public string Description
{
get
{
return "Component to batch (break) large message into multiple small messages";
}
}
public string Name
{
get
{
return " MessageBatchComponent";
}
}
public string Version
{
get
{
return "1.0.0.0";
}
}
Member properties are overriden to provide description, name and version of pipeline component.
8. Implement members of IComponentUI interface.
public IntPtr Icon
{
get
{
return new System.IntPtr();
}
}
public System.Collections.IEnumerator Validate(object projectSystem)
{
return null;
}
Again, I have not provided any pipeline design time icon and validation logic. And in actuall implementation, you can add an icon in piepline resouce file and point that in impelemtation. Similarily, you can also verify design time configuration/properties in Validate method and can return all inconsistancies in collection of errors.
9. Implement members of IPersistPropertyBag interface.
private int _BatchSize;
public int BatchSize
{
get { return _BatchSize; }
set { _BatchSize = value; }
}
public void GetClassID(out Guid classID)
{
classID = new Guid("ACC3F15A-C389-4a5d-8F8E-2A951CDC4C19");
}
public void InitNew()
{
}
public void Load(IPropertyBag propertyBag, int errorLog)
{
object val = null;
try
{
propertyBag.Read("BatchSize", out val, 0);
}
catch (Exception ex)
{
throw new ApplicationException("Error reading propertybag: " + ex.Message);
}
if (val != null)
_BatchSize = (int)val;
else
_BatchSize = 1;
}
public void Save(IPropertyBag propertyBag, bool clearDirty, bool saveAllProperties)
{
object val = (object)BatchSize;
propertyBag.Write("BatchSize", ref val);
}
We created “BatchSize” property which is used to configure message batch size at design time.
“Load” method loads value from property bag to pipeline property. “Save” method saves value to property bag from pipeline property. “GetClassID” returns component's unique identifying value in terms of GUID. “InitNew” is used to initialize object to be persisted in component properties. I have not implemented it because I don’t require.
10. Implement members of IDisassemblerComponent interface.
public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)
{
string originalDataString;
try
{
//fetch original message
Stream originalMessageStream = pInMsg.BodyPart.GetOriginalDataStream();
byte[] bufferOriginalMessage = new byte[originalMessageStream.Length];
originalMessageStream.Read(bufferOriginalMessage, 0, Convert.ToInt32(originalMessageStream.Length));
originalDataString = System.Text.ASCIIEncoding.ASCII.GetString(bufferOriginalMessage);
}
catch (Exception ex)
{
throw new ApplicationException(ex.Message);
}
XmlDocument originalMessageDoc = new XmlDocument();
StringBuilder messageString;
try
{
//load original message
originalMessageDoc.LoadXml(originalDataString);
//fetch namespace and root element
string namespaceURI = originalMessageDoc.DocumentElement.NamespaceURI;
string rootElement = originalMessageDoc.DocumentElement.Name;
//start batching messages
int counter = 0;
messageString = new StringBuilder();
messageString.Append("<" + rootElement + " xmlns:ns0='" + namespaceURI + "'>");
foreach (XmlNode childNode in originalMessageDoc.DocumentElement.ChildNodes)
{
counter = counter + 1;
if (counter > BatchSize)
{
messageString.Append("</" + rootElement + ">");
//Queue message
CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement);
counter = 1;
messageString.Remove(0, messageString.Length);
messageString.Append("<" + rootElement + " xmlns:ns0='" + namespaceURI + "'>");
messageString.Append(childNode.OuterXml);
}
else
{
messageString.Append(childNode.OuterXml);
}
}
messageString.Append("</" + rootElement + ">");
CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement);
}
catch (Exception ex)
{
throw new ApplicationException(ex.Message);
}
finally
{
messageString = null;
originalMessageDoc = null;
}
}
public IBaseMessage GetNext(IPipelineContext pContext)
{
if (qOutputMsgs.Count > 0)
return (IBaseMessage)qOutputMsgs.Dequeue();
else
return null;
}
private void CreateOutgoingMessage(IPipelineContext pContext, String messageString, string namespaceURI, string rootElement)
{
IBaseMessage outMsg;
try
{
//create outgoing message
outMsg = pContext.GetMessageFactory().CreateMessage();
outMsg.AddPart("Body", pContext.GetMessageFactory().CreateMessagePart(), true);
outMsg.Context.Promote("MessageType", systemPropertiesNamespace, namespaceURI + "#" + rootElement.Replace("ns0:", ""));
byte[] bufferOoutgoingMessage = System.Text.ASCIIEncoding.ASCII.GetBytes(messageString);
outMsg.BodyPart.Data = new MemoryStream(bufferOoutgoingMessage);
qOutputMsgs.Enqueue(outMsg);
}
catch (Exception ex)
{
throw new ApplicationException("Error in queueing outgoing messages: " + ex.Message);
}
}
Disassemble and GetNext methods of IDisassemblerComponent interfaces are implemented.
Following is code flow explanation -
Disassemble method flow
This method breaks single incoming message into multiple messages. These multiple messages are stored into qOutputMsgs queue which is further utilized by GetNext method.
- First read original message data from body part …= pInMsg.BodyPart.GetOriginalDataStream();
- Convert message data into string … = System.Text.ASCIIEncoding.ASCII.GetString(bufferOriginalMessage);
- Load Xml message. Fetch root name and namespace from it.
originalMessageDoc.LoadXml(originalDataString);
string namespaceURI = originalMessageDoc.DocumentElement.NamespaceURI;
string rootElement = originalMessageDoc.DocumentElement.Name;
- Loop into XML message and based on batch size create message string of records.
messageString.Append("<" + rootElement + " xmlns:ns0='" + namespaceURI + "'>");
foreach (XmlNode childNode …
- Then pass XML message (batch records) to CreateOutgoingMessage method
CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement);
- CreateOutgoingMessage method creates BizTalk compatible message (IBaseMessage) from batch records message string and stores in qOutputMsgs queue. When creating message, we also promote message type property into context which is used for routing messages.
outMsg = pContext.GetMessageFactory().CreateMessage();
outMsg.AddPart("Body", pContext.GetMessageFactory().CreateMessagePart(), true);
…
outMsg.BodyPart.Data = new MemoryStream(bufferOoutgoingMessage);
qOutputMsgs.Enqueue(outMsg);
…
- That’s it.
GetNext method flow
GetNext method returns all the messages created out of Disassemple method. Returned message it passed to next pipeline stage or message box as configured. GetNext method is called repeatedly as long as it returns some IBaseMessage type. Repeated calls stop only when GetNext returns null.
To implement above mentioned algorithm, in each repetitive call, GetNext method returns IBaseMessage type batch message from queue. When Queue gets empty, it returns null causing repetitive calls to end.
11. Coding is over. Build project.
Deploy Component and Use
To deploy pipeline component, Copy MessageBatchPipelineCompoent.dll to “C:\Program Files\Microsoft BizTalk Server 2006\Pipeline Components”.
Pipeline component is now ready to use. In BTS receive pipeline project, add this pipeline component dll in toolbar and then use in disassemble stage. You will find that you can set value of “BatchSize” property at pipeline design time.
Summary
This was a simple example to explain disassemble pipeline component. More complex scenarios can be implemented in similar fashion. I have attached code zip file MessageBatchPipelineCompoent.zip for further reference.
MessageBatchPipelineCompoent.zip
Comments
Anonymous
January 17, 2007
Thanks ! You gave me a headstart with this article.Anonymous
January 28, 2007
The comment has been removedAnonymous
March 07, 2007
The comment has been removedAnonymous
March 12, 2007
The comment has been removedAnonymous
April 10, 2007
The comment has been removedAnonymous
April 15, 2007
The comment has been removedAnonymous
April 24, 2007
The comment has been removedAnonymous
April 24, 2007
Hi,I checked subscription using subscription viewer, its there "http://Demo.LoanRequest#sample".But, still i don't have any clue to solve.My xml schema file contains the following :<?xml version="1.0" encoding="utf-16" ?><xs:schema xmlns:b="http://schemas.microsoft.com/BizTalk/2003" xmlns="http://Demo.LoanRequest" targetNamespace="http://Demo.LoanRequest" xmlns:xs="http://www.w3.org/2001/XMLSchema"><xs:annotation> <xs:appinfo> <b:schemaInfo root_reference="sample" xmlns:b="http://schemas.microsoft.com/BizTalk/2003" /> </xs:appinfo> </xs:annotation> <xs:element name="sample"> <xs:complexType> <xs:sequence> <xs:element maxOccurs="unbounded" form="unqualified" name="data1"> <xs:complexType> <xs:sequence> <xs:element form="unqualified" name="name" type="xs:string" /> </xs:sequence> </xs:complexType> </xs:element> </xs:sequence> </xs:complexType> </xs:element> </xs:schema> I am using this schema in orchestration message type (for receive and also send).Pls reply me.Anonymous
April 30, 2007
HiI guess your problem might be bcoz of send port.try doing this.Go to your Send port Properties. There you will find Filters.In Filters try to assign Porperty BTS.ReceivePortName == "Your Revieve Port name"Group By Andhope this works.Anonymous
May 16, 2007
getting the same subscription error when i am trying to send the message. Did anyone solve this? I checked the subscription details and everything looks good.Anonymous
June 21, 2007
Had the same subscription error as described above. Managed to resolve this by copying the original message context through to the outgoingmessages in the CreateOutgoingMessage... outMsg = pContext.GetMessageFactory().CreateMessage(); outMsg.Context = pInMsg.Context; outMsg.AddPart("Body", pContext.GetMessageFactory().CreateMessagePart(), true);...Had to add the incoiming message as a parameter to this method. Once I'd done this the batched messages started coming through to the sendport.Anonymous
July 16, 2007
The comment has been removedAnonymous
October 03, 2007
The comment has been removedAnonymous
February 14, 2008
Hi,I want to use a custom send pipeline and before the assemble stage I want to use a Mapper. Could you please let me know the steps to perform the same?Please do reply on my mail id: akash.jindal@gmail.comThanks in advanceAkashAnonymous
March 31, 2008
The published message could not be routed because no subscribers were found. This error occurs if the subscribing orchestration or send port has not been enlisted, or if some of the message properties necessary for subscription evaluation have not been promoted. Please use the Biztalk Administration console to troubleshoot this failure.Prakash,Apart from the Bts.ReceivPortName= your receive port name andBTS.MessageType= the message type which you are expectingexampleBTS.MessageType==http://PipelineTest.Mytest#RootTestAnonymous
April 22, 2008
I have tried your sample but i encounter an error like, "Data at the root is invalid". My schema generates like this one:<sample xmlns="http://Demo.LoanRequest"><data1 xmlns=""> <name>name1</name> </data1> <data1 xmlns=""> <name>name2</name> </data1> <data1 xmlns=""> <name>name3</name> </data1> <data1 xmlns=""> <name>name4</name> </data1> <data1 xmlns=""> <name>name5</name> </data1> <data1 xmlns=""> <name>name6</name> </data1> <data1 xmlns=""> <name>name7</name> </data1> <data1 xmlns=""> <name>name8</name> </data1> <data1 xmlns=""> <name>name9</name> </data1> </sample> The sample input text file is like this:name1name2name3name4name5name6name7name8name9Any idea about the error?Appreciate your response