A custom BizTalk 2004 disassembling pipeline component (3)
The real work happens here. This is the implementation of IDisassemblerComponent. At this point, it is probably useful to review the definition of this interface at https://msdn.microsoft.com/library/en-us/sdk/htm/frlrfmicrosoftbiztalkcomponentinteropidisassemblercomponentmemberstopic.asp?frame=true. Basically, the messaging engine calls Disassemble() and then GetNext() until GetNext() returns null. One input message can then be disassembled into more than one message.
The approach here is to remember all tables and their name/value pairs into a structure in memory. Then when GetNext() is called, we rebuild the XML document we need. Observe that we maintain the original's message context's by copying the properties. This way, we claim that we actually received that XML message. If we did not copy the properties, we would have received an error message at runtime “this message has no subscription“.
public IBaseMessage GetNext(IPipelineContext pContext)
{
IBaseMessage newMsg = null;
// We have something to return if there was at least one table built
if ((tablesArray != null) && (tablesArray.Count >= 1))
{
// Creates a new message
newMsg = pContext.GetMessageFactory().CreateMessage();
// Get a new stream so we can prepare the XML and connect it to an XMLTextWriter
MemoryStream outputStream = new MemoryStream();
XmlTextWriter xmlWriter = new XmlTextWriter(outputStream, Encoding.UTF8);
// Prepare the message: We combine all tables into one XML document
// like the following:
// <Tables>
// <Table name = "<name>">
// <Field name = "<field_name>" value="<fieldvalue>" />
// [...]
// </Table>
// [...]
// </Tables>
xmlWriter.Formatting = Formatting.Indented;
xmlWriter.WriteStartDocument(true);
xmlWriter.WriteStartElement("", "Tables", "");
foreach (object entry in tablesArray)
{
Table tableEntry = (Table) entry;
xmlWriter.WriteStartElement("", "Table", "");
xmlWriter.WriteAttributeString("name", tableEntry.Name);
// Emit all name/value pairs
foreach (object key in tableEntry.Keys)
{
xmlWriter.WriteStartElement("", "Field", "");
xmlWriter.WriteAttributeString("name", key.ToString());
xmlWriter.WriteAttributeString("value", tableEntry[(string) key].ToString());
xmlWriter.WriteEndElement();
}
xmlWriter.WriteEndElement();
}
// Close the document
xmlWriter.WriteEndElement();
xmlWriter.WriteEndDocument();
xmlWriter.Flush();
// Rewind the stream so it is ready for use by the messaging engine
outputStream.Seek(0, SeekOrigin.Begin);
// Add one part to it. This part will be the "body"
IBaseMessagePart newMsgPart = pContext.GetMessageFactory().CreateMessagePart();
newMsgPart.Charset = "UTF-8";
newMsgPart.ContentType = "text/xml";
newMsgPart.Data = outputStream;
newMsg.AddPart("body", newMsgPart, true);
// Copy the original message context to the newly created message
for (int iProp = 0; iProp < originalMsgContext.CountProperties; iProp++)
{
string strName;
string strNSpace;
object val = originalMsgContext.ReadAt(iProp, out strName, out strNSpace);
// If the property has been promoted, respect the settings
if (originalMsgContext.IsPromoted(strName, strNSpace))
newMsg.Context.Promote(strName, strNSpace, val);
else
newMsg.Context.Write(strName, strNSpace, val);
}
// There is a new stream so let's make it visible to the resourceTracker
pContext.ResourceTracker.AddResource(outputStream);
// We returned this message so drop our internal table representation
tablesArray = null;
}
return newMsg;
}
Disassembling is shown below. Two important things here: we clone the original's message context and we also use the “SetErrorInfo()“ method to report errors when disassembling this message.
/// <summary>
/// Called once per incoming document by the Messaging engine.
/// </summary>
/// <param name="pContext">IPipelineContext: context for this pipeline.</param>
/// <param name="pInMsg">IBaseMessage: Base message received.</param>
public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)
{
// Consider this document only if it has parts
if (pInMsg.PartCount >= 1)
{
IBaseMessagePart bodyPart = pInMsg.BodyPart;
// Clone the message context so we can associate it with the new message later
originalMsgContext = PipelineUtil.CloneMessageContext(pInMsg.Context);
// Consider this only if this is not null
if (bodyPart != null)
{
Stream docStream = bodyPart.GetOriginalDataStream();
string lineRead = null;
long lineNum = 1;
// We got the original data stream (not a copy) so rewind the stream
docStream.Seek(0, SeekOrigin.Begin);
// Use a stream reader to facilitate reading line by line
StreamReader streamReader = new StreamReader(docStream);
// Parse all tables and all values in betwen them
lineRead = streamReader.ReadLine();
while (lineRead != null)
{
// Strip empty lines
if (regExpBlankLine.Match(lineRead).Success)
{
// Did we encounter a Table definition entry?
Match match = regExpTableParser.Match(lineRead);
if (match.Success)
{
// Create a new table structure
tablesArray.Add(new Table(match.Groups["TableName"].Value));
}
else
{
// We are parsing a name/value pair for the current table
Match fieldMatch = regExpLineParser.Match(lineRead);
if (fieldMatch.Success)
{
string fieldName = fieldMatch.Groups["FieldName"].Value;
string fielSep = fieldMatch.Groups["Separator"].Value;
// NOTE: The value field is not exactly what one might want
// for cases like "Status@C Completed". "C Completed" will be
// seen as the value while one might want to have "Completed".
// This could be fixed easily by changing the regualr expression.
string fieldVal = fieldMatch.Groups["FieldValue"].Value;
// Insert into the current table
((Table) tablesArray[tablesArray.Count - 1])[fieldName] = fieldVal;
}
else
{
// Wrong file format: report error.
string errMsg = resMgr.GetString(wrongFlatFileRsrcID).Replace("%{LineNum}%",
lineNum.ToString());
pInMsg.SetErrorInfo(new Exception(errMsg));
tablesArray = null;
break;
}
}
}
// Get the next line
lineRead = streamReader.ReadLine();
lineNum++;
}
}
}
}
IPropertyBag's implemetation is shown below. Since we do not expose any properties, the implementation is a no-operation.
/// <summary>
/// Initializes a new Property Bag.
/// </summary>
public void InitNew()
{
}
/// <summary>
/// Returns the ClassID of this component for usage from native code.
/// </summary>
/// <param name="classID">Guid: ClassID of the component.</param>
public void GetClassID(out Guid classID)
{
classID = GetType().GUID;
}
/// <summary>
/// Load data from a persisted object into a new object.
/// </summary>
/// <param name="propertyBag">Property bag to load.</param>
/// <param name="errorLog">Error log.</param>
public void Load(IPropertyBag propertyBag, int errorLog)
{
}
/// <summary>
/// Saves properties to the property bag.
/// </summary>
/// <param name="propertyBag">The proeprty bag to manipulate.</param>
/// <param name="clearDirty">Indicates if we should reset our "isDirty" flag.</param></param>
/// <param name="saveAllProperties">Should all properties be saved?</param>
public void Save(IPropertyBag propertyBag, bool clearDirty, bool saveAllProperties)
{
}
Comments
- Anonymous
July 06, 2004
The comment has been removed - Anonymous
July 08, 2004
The comment has been removed