Поделиться через


Large Message Transfer with WCF-Adapters Part 1

Introduction

This is the first in a series of articles that will introduce and explore a couple a patterns to transfer and process large messages using WCF-Adapters. Generally speaking, an end to end BizTalk solution adopts the FTP Adapter or one of its variations (SFTP/FTPS) when it needs to exchange large messages with external parties over the internet, while it uses the FILE adapter for moving large batch files over the intranet. Large input files can be debatched by a receive pipeline into smaller discrete messages. This technique removes the need to worry about a large message and enables processing to be parallelized. The main recommendation to bear in mind when dealing with large messages might seem obvious, but it is often overlooked by developers: you must avoid any operations that require the entire message to be loaded into memory and adopt a streaming approach, especially in those cases where the application can concurrently receive and process multiple large messages in the same time. BizTalk itself adopts a streaming approach whereby the message is read through the use of a stream, meaning that the entire message doesn’t have to be present in memory at any point in time. This streaming approach is used by all the built-in pipeline components such as the Flat File Disassembler, XML Disassembler, etc. Therefore, it’s extremely important that you adopt the same approach when designing and developing custom pipeline components, orchestrations or helper classes for processing large messages.

The problems associated with large messages can be divided into the following categories:

  • Out of memory errors: Certain types of message processing can require the entire message to be loaded into memory. BizTalk Server operates a streaming approach whereby the message is read through the use of a stream, meaning that the entire message doesn't have to be present in memory at any point in time. For example, BizTalk Server 2006 introduced the VirtualStream class in the Messaging Runtime and if the message size is over a certain, configurable threshold (1MB is the default), the document is written to a temporary file, thus keeping the memory usage flat during mapping. Of course, this swapping incurs performance overhead, but this prevents the host process from running out of memory. Any custom pipeline components, orchestrations, or helper classes should follow the same streaming approach and avoid using an XmlDocument to load the entire message into memory.
  • Performance problems for messages that are not loaded into memory: Messages that are not required to be loaded into memory are streamed to the MessageBox database using an .NET XmlReader object. While they are not subject to any size limitations, there are some important factors that impact how BizTalk Server processes messages that are streamed to the MessageBox database.

The original message size, message format, and type of message processing are the main factors that affect how BizTalk Server processes large messages.

  • Original message size: The size of the message received by BizTalk Server is the most visible indication of how large the message will be when it is processed by BizTalk Server. The original size of a message has a much greater impact on performance if the entire message is loaded into memory than if the message is streamed to the MessageBox database.
  • Message format: Messages are received into BizTalk Server in one of two primary formats: XML files or flat files.
    • XML files: In order for BizTalk Server to perform any processing on a message other than pass through routing, the message must be in the XML file format. If the files to be processed are received in XML format then they will retain their original size for the most part.
    • Flat files: Flat files must be parsed into an XML format before BizTalk Server can perform any processing (for instance message transformation) other than pass through routing. Parsing a flat file into an XML file can significantly increase the size of the file. Flat files do not contain XML tags with descriptive information about their data. By contrast, XML files wrap all of their data in descriptive XML tags. In some scenarios parsing can increase the size of a flat file by a factor of 10 or more, depending on how much descriptive data is contained in the XML tags for the file.
  • Type of message processing: In BizTalk Server, there are two types of message processing: Routing only and Mapping. The performance variables tied to the type of message processing that is performed are message size and whether the message is loaded into memory.
    • Routing: If BizTalk Server is only used only for routing messages based upon promoted properties, then the Message Agent uses the stream opened by the receive adapter to publish the message to the BizTalkMsgBoxDb database using the .NET XmlReader interface, and message parts are not individually loaded into memory. In this scenario, out of memory errors are not an issue and the primary consideration is the amount of time that is required to write very large messages (over 100 MB) into the BizTalkMsgBoxDb database. In fact, this operation can have a severe impact on overall performance, especially when the same BizTalk installation is used to host also low latency applications. The BizTalk Server development team has successfully tested the processing of messages up to 1 GB in size when performing routing only.
    • Mapping: Transforming a document with a map is a memory-intensive operation. When a document is transformed by a map, BizTalk Server passes the message stream to the .Net XslTransform class, which then loads the document into a .NET XPathDocument object for processing. Loading the document into the .NET XPathDocument can potentially expand the original file size in memory by a factor of 10 or more. This expansion may be more pronounced when mapping flat files because flat files must be parsed into XML before they can be transformed. Almost 4 months ago I wrote an article on how boosting message transformation using the XslCompiledTransform class in place of the XslTransform.

For more information about large message transfer and processing with BizTalk Server, you can read the following topics:

Problem Statement

In this article we’ll try to provide an answer to the following questions:

  • How can I use BizTalk and WCF to transfer a large message in a reliable and performant way?
  • What is the best Message Encoder (Binary, MTOM, Text) and Transfer Mode (Buffered, Streamed) that I can use to transmit a large message to a WCF Receive Location?

Besides, in this series of articles I’ll discuss the following topics:

  • How to use the InboundHeaders context property exposed by WCF Adapters to access the SOAP headers of incoming WCF messages.
  • How to use the TransferMode = Streamed.
  • How to process in a streaming fashion within a pipeline component.
  • How to transfer a large message in a transactional way using a client-side initiated transaction.
  • How to stage the incoming document to a folder or SQL.
  • How to use a table with a column of type “varbinary(max) FILESTREAM” to stage a large file on SQL Server 2008.
  • How to use the API of the Transaction NTFS (TxF) to write a large file to a shared folder in reliable, transactional way.
  • How to hook the distributed transaction used by the Message Agent within a pipeline component.
  • How to use a custom stream on the send port to wrap the original stream and execute closing tasks after message transmission.
  • How to initiate delivery notifications within a pipeline component and handle ACK/NACK messages within an orchestration.

WCF Message Encoders and Transfer Modes

When I started to design this demo, my will was comparing the performance of the various message encoders and transfers modes provided out of the box by WCF and find the best combination to adopt when transferring large message over the network. WCF includes three types of encoding for SOAP messages:

  • Text Message Encoding: The text encoding represented by the TextMessageEncodingBindingElement is the most interoperable, but the least efficient encoder for XML messages. Web service or Web service client can generally understand textual XML. However, transmitting large blocks of binary data as text is not efficient.
  • Binary Message Encoding: The BinaryMessageEncodingBindingElement is the binding element that specified the .NET Binary Format for XML should be used for encoding messages, and has options to specify the character encoding and the SOAP and WS-Addressing version to be used. Binary encoding is most efficient but least interoperable of the encoding options.
  • MTOM Message Encoding: The MTOMMessageEncodingBindingElement represents the binding element that specifies the character encoding and message versioning and other settings used for messages using a Message Transmission Optimization Mechanism (MTOM) encoding. (MTOM) is an efficient technology for transmitting binary data in WCF messages. The MTOM encoder attempts to create a balance between efficiency and interoperability. The MTOM encoding transmits most XML in textual form, but optimizes large blocks of binary data by transmitting them as-is, without conversion to their base64 encoded format.

WCF transport channels support two modes for transferring messages in each direction:

  • Buffered: transmissions hold the entire message in a memory buffer until the transfer is complete. On the service side message processing cannot start until the entire message has been received.
  • Streamed: transmissions only buffer the message headers and expose the message body as a stream, from which smaller portions can be read at a time.

The TransferMode property exposed by transport protocol channel (e.g.HttpTransportBindingElement, TcpTransportBindingElement, etc.) and bindings (BasicHttpBinding, NetTcpBinding, etc.) allows to indicate whether messages are sent buffered or streamed. Streamed transfers can improve the scalability of a service by eliminating the need for large memory buffers. Whether changing the transfer mode actually improves scalability in practice depends on the size of the messages being transferred. Improvements in scalability should be most evident when large messages use streamed instead of buffered transfers. Therefore we’ll exploit the streamed transfer mode in for transferring large messages using BizTalk and WCF Adapters.
Note: By default, the HTTP, TCP/IP and Named Pipe transports use buffered message transfers. If you plan to exploit the streamed transfer mode when receiving a large message through a WCF Receive Location, you have to use the WCF-Custom/WCF-CustomIsolated Adapter. In fact, these latter allow developer to select  a certain binding (e.g. BasicHttpBinding, NetTcpBinding, NetNamedPipeBinding, etc.) and then change the value of its TransferMode property to Streamed. The other WCF Adapters (e.g. WCF-BasicHttp, WCF-NetTcp, etc.) do not provide this possibility.

For more information on the WCF Transfer Modes, you can read the following topic:

  • “TransferMode Enumeration” on MSDN.

Client Application

I created a WinForm application that is capable to upload multiple documents at the same time. The UI of the driver application allows to select a list of files to be sent in parallel to BizTalk and which service endpoint to use for this purpose. Six different WCF Receive Locations have been created, one for each possible combination of message encoders and transfer modes. Therefore, the configuration file of the client application contains a different service endpoint, one for each WCF Receive Location. When you push the Submit button on the main form, the click event handler method (btnSubmit_Click) creates a a list of calls and then using the ThreadPool it start an asynchronous thread for each file selected on the UI. At this point, if the selected service endpoint uses a transaction-enabled binding, the application will call the SubmitFileWithTxn method, otherwise the SubmitFileWithoutTxn method.

The code of the 2 functions is quite similar, the only difference between the two is that the SubmitFileWithTxn method initiates a transaction that will be flowed to the WCF Receive Location. Let’s analyze the code of this latter method. The SubmitFileWithTxn starts opening a FileStream to read the selected file from the disk and I wrap this latter with a custom stream called ProgressStream. As the name suggests, the main scope of this custom stream is to update a progress bar on the UI as the file is being transmitted. This feature is particularly interesting when we use a binding which TransferMode is equal to Streamed. The ProgressStream class exposes an event called ProgressChanged  that can be used to invoke a method that updates the progress bar. Since the progress bar control is owned by the main thread, while the application uses a ThreadPool worker thread to transmit the message to BizTalk, in order to change the value of the progress bar is necessary to call the Invoke method exposed by the ProgressForm object with a delegate. This technique enables the worker thread to ask the main thread to execute the delegate.
Then, the method creates an UploadMessage. If you look at the code of the class, you will note that I decorated the class with the MessageContractAttribute. Then I decorated all the properties representing transmission metadata like the name of the sender or the file size with the MessageHeaderAttribute attribute. This way I declare that this data will be transmitted as custom headers in the SOAP envelope.
Instead, the property called Data that will contain the content of the source file stream has been decorated with the MessageBodyMember attribute. Hence data will be transmitted within the body of the SOAP envelope.
Finally, the method creates a ChannelFactory using the IUploadMessageTxn interface as service contract. If you look at the code, you will note that this interface has been decorated with the TransactionFlowAttribute. In particular, the TransactionFlowOption.Mandatory value indicates that the service endpoint requires the client application to start and flow a distributed transaction.
For this reason, the SubmitFileWithTxn method invoke the underlying  WCF Receive Location within a transaction scope. The driver application allows to control the outcome of the transaction selecting one between the Commit or Abort option buttons on the UI. Selecting abort, you can simulate a transaction failure. This possibility is extremely useful when you configure the WCF receive location to use a transactional binding. In fact, in this case the Message Agent will use the transaction initiated by the client application to post a placeholder document to the BizTalkMsgBoxDb (as you will see in the remainder of the article, the original message will be staged in a folder or SQL database and replaced by a placeholder document which contains its location and metadata). Therefore, if the client application aborts the transaction, no message will be published to the BizTalkMsgBoxDb. Transferring a large message in the context of WS-AT or OleTransactions transaction greatly improves transmission reliability, but it has a severe impact on overall performance, so this technique is recommended only in those cases where resilience is the primary concern.
For sake of readability, I omitted most of the code and I reported below only the classes and methods that can be helpful to understand the application.

MainForm class

 #region Copyright//-------------------------------------------------// Author:  Paolo Salvatori// Email:   paolos@microsoft.com// History: 2009-01-14 Created//-------------------------------------------------#endregion#region Using Directivesusing System;using System.IO;using System.Collections.Generic;using System.Configuration;using System.ComponentModel;using System.Transactions;using System.Data;using System.Drawing;using System.Text;using System.Xml;using System.Xml.XPath;using System.Threading;using System.Windows.Forms;using System.ServiceModel;using System.ServiceModel.Channels;using System.ServiceModel.Configuration;#endregionnamespace Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client{    public partial class MainForm : Form    {        ...        private void btnSubmit_Click(object sender, EventArgs e)        {            try            {                if (shellView.SelectedItems != null &&                    shellView.SelectedItems.Length > 0)                {                    FileInfo fileInfo = null;                    List<CallParameters> callList = new List<CallParameters>();                    for (int i = 0; i < shellView.SelectedItems.Length; i++)                    {                        if (File.Exists(shellView.SelectedItems[i].FileSystemPath))                        {                            fileInfo = new FileInfo(shellView.SelectedItems[i].FileSystemPath);                            callList.Add(new CallParameters(i,                                                            (int)fileInfo.Length,                                                            shellView.SelectedItems[i].FileSystemPath,                                                            cboEndpoint.Text));                        }                        else                        {                            WriteToLog(string.Format(FileDoesNotExist, shellView.SelectedItems[i].FileSystemPath));                        }                    }                    count = callList.Count;                    completed = 0;                    progressForm = new ProgressForm(callList);                    progressForm.Show();                    for (int i = 0; i < shellView.SelectedItems.Length; i++)                    {                        ThreadPool.QueueUserWorkItem(new WaitCallback(SubmitFile), new StartState(callList[i], this));                    }                }                else                {                    WriteToLog(NoFilesSelected);                }            }            catch (Exception ex)            {                WriteToLog(ex.Message);            }        }        private void SubmitFile(Object stateInfo)        {            StartState state = stateInfo as StartState;            if (TxnBinding.ContainsKey(state.Parameters.Endpoint) &&                TxnBinding[state.Parameters.Endpoint])            {                SubmitFileWithTxn(state);            }            else            {                SubmitFileWithoutTxn(state);            }        }        private void SubmitFileWithTxn(StartState startState)        {            ChannelFactory<IUploadMessageTxn> channelFactory = null;            try            {                if (startState != null &&                    startState.Parameters != null &&                    !string.IsNullOrEmpty(startState.Parameters.Endpoint) &&                    !string.IsNullOrEmpty(startState.Parameters.Filename) &&                    File.Exists(startState.Parameters.Filename))                {                    FileInfo fileInfo = new FileInfo(startState.Parameters.Filename);                    using (FileStream fileStream = File.Open(startState.Parameters.Filename,                                                              FileMode.Open,                                                              FileAccess.Read,                                                              FileShare.Read))                    {                        ProgressStream progressStream = new ProgressStream(startState.Parameters.Index, fileStream);                        progressStream.ProgressChanged +=                                  new EventHandler<ProgressChangedEventArgs>(progressForm.ProgressStream_ProgressChanged);                        UploadMessage uploadMessage = new UploadMessage(Guid.NewGuid().ToString(),                                                                        txtSender.Text,                                                                        fileInfo.Name,                                                                        DateTime.Now,                                                                        fileInfo.Length,                                                                        progressStream);                        channelFactory = new ChannelFactory<IUploadMessageTxn>(startState.Parameters.Endpoint);                        channelFactory.Endpoint.Contract.SessionMode = SessionMode.Allowed;                        IUploadMessageTxn channel = channelFactory.CreateChannel();                        DateTime start = DateTime.Now;                        bool committed = true;                        string transactionId = string.Empty;                        using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))                        {                            channel.UploadMessageTxn(uploadMessage);                            transactionId = Transaction.Current.TransactionInformation.DistributedIdentifier.ToString();                            WriteToLog(string.Format(TransactionIdFormat, transactionId));                            if (commitRadioButton.Checked)                            {                                scope.Complete();                            }                            else                            {                                committed = false;                                Thread.Sleep(200);                            }                        }                        DateTime end = DateTime.Now;                        TimeSpan time = end.Subtract(start);                        channelFactory.Close();                        double seconds = time.TotalSeconds;                        WriteToLog(string.Format(MessageSuccessfullyUploaded, fileInfo.Name,                                                  startState.Parameters.Endpoint, seconds));                        if (committed)                        {                            WriteToLog(string.Format(TransactionCommittedFormat, transactionId));                        }                        else                        {                            WriteToLog(string.Format(TransactionAbortedFormat, transactionId));                        }                    }                }                else                {                    WriteToLog(string.Format(FileDoesNotExist, startState.Parameters.Filename));                }            }            catch (FaultException ex)            {                WriteToLog(ex.Message);                if (channelFactory != null)                {                    channelFactory.Abort();                }            }            catch (CommunicationException ex)            {                WriteToLog(ex.Message);                if (channelFactory != null)                {                    channelFactory.Abort();                }            }            catch (TimeoutException ex)            {                WriteToLog(ex.Message);                if (channelFactory != null)                {                    channelFactory.Abort();                }            }            catch (Exception ex)            {                WriteToLog(ex.Message);                if (channelFactory != null)                {                    channelFactory.Abort();                }            }            finally            {                try                {                    if (startState.Form != null)                    {                        startState.Form.ThreadTerminated();                    }                    if (channelFactory != null)                    {                        channelFactory.Close();                    }                }                catch (Exception)                {                }            }        }        ...    }}

ProgressStream class

 #region Copyright//-------------------------------------------------// Author:  Paolo Salvatori// Email:   paolos@microsoft.com// History: 2009-06-30 Created//-------------------------------------------------#endregion#region Using Directivesusing System;using System.IO;#endregionnamespace Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client{    public class ProgressStream : Stream    {        #region Private Fields        private Stream stream;        private int index;        private long bytesRead;        private long totalLength;         #endregion        #region Public Handler        public event EventHandler<ProgressChangedEventArgs> ProgressChanged;         #endregion        #region Public Constructor        public ProgressStream(int index, Stream file)        {            this.index = index;            this.stream = file;            this.totalLength = file.Length;            this.bytesRead = 0;        }         #endregion        #region Public Properties        public override bool CanRead        {            get            {                return this.stream.CanRead;            }        }        public override bool CanSeek        {            get            {                return this.stream.CanSeek;            }        }        public override bool CanWrite        {            get            {                return this.stream.CanWrite;            }        }        public override void Flush()         {            this.stream.Flush();        }        public override long Length        {            get             {                 return this.stream.Length;             }        }        public override long Position        {            get             {                 return this.stream.Position;             }            set             {                this.stream.Position = value;             }        }         #endregion        #region Public Methods        public override int Read(byte[] buffer, int offset, int count)        {            int result = stream.Read(buffer, offset, count);            bytesRead += result;            if (ProgressChanged != null)            {                try                {                    ProgressChanged(this, new ProgressChangedEventArgs(index, bytesRead, totalLength));                }                catch (Exception)                {                    ProgressChanged = null;                }            }            return result;        }        public override long Seek(long offset, SeekOrigin origin)        {            return this.stream.Seek(offset, origin);        }        public override void SetLength(long value)        {            this.stream.SetLength(value);        }        public override void Write(byte[] buffer, int offset, int count)        {            this.stream.Write(buffer, offset, count);        }         #endregion    }    public class ProgressChangedEventArgs : EventArgs    {        #region Private Fields        private int index;        private long bytesRead;        private long totalLength;         #endregion        #region Public Constructor        public ProgressChangedEventArgs(int index, long bytesRead, long totalLength)        {            this.index = index;            this.bytesRead = bytesRead;            this.totalLength = totalLength;        }         #endregion        #region Public properties        public int Index        {            get            {                return this.index;            }            set            {                this.index = value;            }        }        public long BytesRead        {            get            {                return this.bytesRead;            }            set            {                this.bytesRead = value;            }        }        public long TotalLength        {            get            {                return this.totalLength;            }            set            {                this.totalLength = value;            }        }        #endregion    }}

IUploadMessageTxn interface

 #region Copyright//-------------------------------------------------// Author:  Paolo Salvatori// Email:   paolos@microsoft.com// History: 2009-01-14 Created//-------------------------------------------------#endregion#region Using Directivesusing System;using System.IO;using System.ServiceModel;#endregionnamespace Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client{    [ServiceContract(Namespace = "https://microsoft.biztalk.cat/10/samples/ssisintegration/uploadservice")]    public interface IUploadMessageTxn    {        #region Contract Operations                [OperationContract(Action = "UploadMessageTxn", ReplyAction = "*")]        [TransactionFlow(TransactionFlowOption.Mandatory)]        [XmlSerializerFormat()]        void UploadMessageTxn(UploadMessage message);         #endregion    }}

UploadMessage class

 #region Copyright//-------------------------------------------------// Author:  Paolo Salvatori// Email:   paolos@microsoft.com// History: 2009-01-14 Created//-------------------------------------------------#endregion#region Using Directivesusing System;using System.IO;using System.ServiceModel;using System.Net.Security;#endregionnamespace Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client{    [MessageContract]    public class UploadMessage    {        #region Private Fields        private string id;        private string sender;        private string filename;        private DateTime dateTime;        private long size;        private Stream data;        #endregion        #region Public Constructors        public UploadMessage()        {            this.id = null;            this.sender = null;            this.filename = null;            this.dateTime = DateTime.Now;            this.size= 0;            this.data = null;        }        public UploadMessage(string id,                             string sender,                             string filename,                             DateTime dateTime,                             long size,                             Stream data)        {            this.id = id;            this.sender = sender;            this.filename = filename;            this.dateTime = dateTime;            this.size = size;            this.data = data;        }        #endregion        #region Public Properties        [MessageHeader(Name = "id",                       Namespace = "https://microsoft.biztalk.cat/10/samples/ssisintegration/uploadservice",                       ProtectionLevel = ProtectionLevel.None)]        public string Id        {            get            {                return this.id;            }            set            {                this.id = value;            }        }        [MessageHeader(Name = "sender",                       Namespace = "https://microsoft.biztalk.cat/10/samples/ssisintegration/uploadservice",                       ProtectionLevel = ProtectionLevel.None)]        public string Sender        {            get            {                return this.sender;            }            set            {                this.sender = value;            }        }        [MessageHeader(Name = "filename",                       Namespace = "https://microsoft.biztalk.cat/10/samples/ssisintegration/uploadservice",                       ProtectionLevel = ProtectionLevel.None)]        public string Filename        {            get            {                return this.filename;            }            set            {                this.filename = value;            }        }        [MessageHeader(Name = "dateTime",                       Namespace = "https://microsoft.biztalk.cat/10/samples/ssisintegration/uploadservice",                       ProtectionLevel = ProtectionLevel.None)]        public DateTime DateTime        {            get            {                return this.dateTime;            }            set            {                this.dateTime = value;            }        }        [MessageHeader(Name = "size",                       Namespace = "https://microsoft.biztalk.cat/10/samples/ssisintegration/uploadservice",                       ProtectionLevel = ProtectionLevel.None)]        public long Size        {            get            {                return this.size;            }            set            {                this.size = value;            }        }        [MessageBodyMember(Name = "data",                           Namespace = "https://microsoft.biztalk.cat/10/samples/ssisintegration/uploadservice",                           ProtectionLevel = ProtectionLevel.None)]        public Stream Data        {            get            {                return this.data;            }            set            {                this.data = value;            }        }         #endregion    }}

App.Config

 <?xml version="1.0" encoding="utf-8" ?><configuration>    <system.diagnostics>        <sources>            <source name="System.ServiceModel.MessageLogging" switchValue="Warning, ActivityTracing">                <listeners>                    <add type="System.Diagnostics.DefaultTraceListener" name="Default">                        <filter type="" />                    </add>                    <add name="ServiceModelMessageLoggingListener">                        <filter type="" />                    </add>                </listeners>            </source>        </sources>        <sharedListeners>            <add initializeData="C:\ssisintegration_client.svclog"                 type="System.Diagnostics.XmlWriterTraceListener, System, Version=2.0.0.0, Culture=neutral,                        PublicKeyToken=b77a5c561934e089"                 name="ServiceModelMessageLoggingListener"                 traceOutputOptions="Timestamp">                <filter type="" />            </add>        </sharedListeners>    </system.diagnostics>    <system.serviceModel>        <diagnostics>            <messageLogging logEntireMessage="false"                            logMalformedMessages="false"                            logMessagesAtServiceLevel="false"                            logMessagesAtTransportLevel="false"                            maxSizeOfMessageToLog="1000000" />        </diagnostics>        <bindings>            <customBinding>                <binding name="netTcpMtomStreamedBinding"                          receiveTimeout="01:00:00"                          sendTimeout="01:00:00">                    <mtomMessageEncoding maxBufferSize="1048576"                                          maxReadPoolSize="1024"                                         maxWritePoolSize="1024">                        <readerQuotas maxDepth="1048576"                                       maxStringContentLength="1048576"                                       maxArrayLength="104857600"                                       maxBytesPerRead="1048576"                                       maxNameTableCharCount="1048576" />                    </mtomMessageEncoding>                    <tcpTransport maxBufferPoolSize="1048576"                                   maxReceivedMessageSize="104857600"                                   connectionBufferSize="1048576"                                   maxBufferSize="1048576"                                   maxPendingConnections="200"                                   maxPendingAccepts="200"                                   transferMode="Streamed"                                   listenBacklog="200">                        <connectionPoolSettings maxOutboundConnectionsPerEndpoint="200" />                    </tcpTransport>                </binding>                <binding name="netTcpBinaryStreamedBinding"                         receiveTimeout="01:00:00"                         sendTimeout="01:00:00">                    <binaryMessageEncoding maxSessionSize="1048576">                        <readerQuotas maxDepth="1048576"                                      maxStringContentLength="1048576"                                      maxArrayLength="104857600"                                      maxBytesPerRead="1048576"                                      maxNameTableCharCount="1048576" />                    </binaryMessageEncoding>                    <tcpTransport maxBufferPoolSize="1048576"                                  maxReceivedMessageSize="104857600"                                  connectionBufferSize="1048576"                                  maxBufferSize="1048576"                                  maxPendingConnections="200"                                  maxPendingAccepts="200"                                  transferMode="Streamed"                                  listenBacklog="200">                        <connectionPoolSettings maxOutboundConnectionsPerEndpoint="200" />                    </tcpTransport>                </binding>                <binding name="netTcpTextStreamedBinding"                         receiveTimeout="01:00:00"                         sendTimeout="01:00:00">                    <textMessageEncoding maxWritePoolSize="64">                        <readerQuotas maxDepth="1048576"                                      maxStringContentLength="1048576"                                      maxArrayLength="104857600"                                      maxBytesPerRead="1048576"                                      maxNameTableCharCount="1048576" />                    </textMessageEncoding>                    <tcpTransport maxBufferPoolSize="1048576"                                  maxReceivedMessageSize="104857600"                                  connectionBufferSize="1048576"                                  maxBufferSize="1048576"                                  maxPendingConnections="200"                                  maxPendingAccepts="200"                                  transferMode="Streamed"                                  listenBacklog="200">                        <connectionPoolSettings maxOutboundConnectionsPerEndpoint="200" />                    </tcpTransport>                </binding>                <binding name="netTcpMtomBufferedBinding"                         receiveTimeout="01:00:00"                         sendTimeout="01:00:00">                    <mtomMessageEncoding maxBufferSize="1048576">                        <readerQuotas maxDepth="1048576"                                      maxStringContentLength="1048576"                                      maxArrayLength="104857600"                                      maxBytesPerRead="1048576"                                      maxNameTableCharCount="1048576" />                    </mtomMessageEncoding>                    <tcpTransport maxBufferPoolSize="1048576"                                   maxReceivedMessageSize="1048576"                                   connectionBufferSize="1048576"                                  maxBufferSize="104857600"                                   maxPendingConnections="200"                                   maxPendingAccepts="200"                                   transferMode="Buffered"                                   listenBacklog="200">                        <connectionPoolSettings maxOutboundConnectionsPerEndpoint="200" />                    </tcpTransport>                </binding>                <binding name="netTcpBinaryBufferedBinding"                         receiveTimeout="01:00:00"                         sendTimeout="01:00:00">                    <binaryMessageEncoding maxSessionSize="1048576">                        <readerQuotas maxDepth="1048576"                                      maxStringContentLength="1048576"                                      maxArrayLength="104857600"                                      maxBytesPerRead="1048576"                                      maxNameTableCharCount="1048576" />                    </binaryMessageEncoding>                    <tcpTransport maxBufferPoolSize="1048576"                                   maxReceivedMessageSize="104857600"                                   connectionBufferSize="1048576"                                  maxBufferSize="104857600"                                   maxPendingConnections="200"                                   maxPendingAccepts="200"                                   transferMode="Buffered"                                   listenBacklog="200">                        <connectionPoolSettings maxOutboundConnectionsPerEndpoint="200" />                    </tcpTransport>                </binding>                <binding name="netTcpTextBufferedBinding"                         receiveTimeout="01:00:00"                         sendTimeout="01:00:00">                    <textMessageEncoding maxWritePoolSize="64">                        <readerQuotas maxDepth="1048576"                                      maxStringContentLength="1048576"                                      maxArrayLength="104857600"                                      maxBytesPerRead="1048576"                                      maxNameTableCharCount="1048576" />                    </textMessageEncoding>                    <tcpTransport maxBufferPoolSize="1048576"                                   maxReceivedMessageSize="104857600"                                  connectionBufferSize="1048576"                                  maxBufferSize="104857600"                                   maxPendingConnections="200"                                   maxPendingAccepts="200"                                   transferMode="Buffered"                                   listenBacklog="200">                        <connectionPoolSettings maxOutboundConnectionsPerEndpoint="200" />                    </tcpTransport>                </binding>                <binding name="netTcpMtomStreamedTxnBinding"                         receiveTimeout="01:00:00"                         sendTimeout="01:00:00">                    <transactionFlow transactionProtocol="OleTransactions"/>                    <mtomMessageEncoding maxBufferSize="1048576"                                         maxReadPoolSize="1024"                                         maxWritePoolSize="1024">                        <readerQuotas maxDepth="1048576"                                      maxStringContentLength="1048576"                                      maxArrayLength="104857600"                                      maxBytesPerRead="1048576"                                      maxNameTableCharCount="1048576" />                    </mtomMessageEncoding>                    <tcpTransport maxBufferPoolSize="1048576"                                  maxReceivedMessageSize="104857600"                                  connectionBufferSize="1048576"                                  maxBufferSize="1048576"                                  maxPendingConnections="200"                                  maxPendingAccepts="200"                                  transferMode="Streamed"                                  listenBacklog="200">                        <connectionPoolSettings maxOutboundConnectionsPerEndpoint="200" />                    </tcpTransport>                </binding>            </customBinding>          <netNamedPipeBinding>            <binding name="netNamedPipeBinding"                      transferMode="Streamed"                      maxBufferPoolSize="1048576"                      maxBufferSize="1048576"                      maxConnections="200"                      maxReceivedMessageSize="104857600">              <security mode="None" />            </binding>          </netNamedPipeBinding>        </bindings>        <client>            <endpoint address="net.tcp://localhost:11550/ssisintegration"                      binding="customBinding"                      bindingConfiguration="netTcpMtomStreamedBinding"                      contract="Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client.IUploadMessage"                      name="netTcpMtomStreamedEndpoint" />            <endpoint address="net.tcp://localhost:11551/ssisintegration"                      binding="customBinding"                      bindingConfiguration="netTcpBinaryStreamedBinding"                      contract="Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client.IUploadMessage"                      name="netTcpBinaryStreamedEndpoint" />            <endpoint address="net.tcp://localhost:11552/ssisintegration"                      binding="customBinding"                      bindingConfiguration="netTcpTextStreamedBinding"                      contract="Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client.IUploadMessage"                      name="netTcpTextStreamedEndpoint" />            <endpoint address="net.tcp://localhost:11553/ssisintegration"                      binding="customBinding"                      bindingConfiguration="netTcpMtomBufferedBinding"                      contract="Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client.IUploadMessage"                      name="netTcpMtomBufferedEndpoint" />            <endpoint address="net.tcp://localhost:11554/ssisintegration"                      binding="customBinding"                      bindingConfiguration="netTcpBinaryBufferedBinding"                      contract="Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client.IUploadMessage"                      name="netTcpBinaryBufferedEndpoint" />            <endpoint address="net.tcp://localhost:11555/ssisintegration"                      binding="customBinding"                      bindingConfiguration="netTcpTextBufferedBinding"                      contract="Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client.IUploadMessage"                      name="netTcpTextBufferedEndpoint" />            <endpoint address="net.tcp://localhost:11556/ssisintegration"                      binding="customBinding"                      bindingConfiguration="netTcpMtomStreamedTxnBinding"                      contract="Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client.IUploadMessageTxn"                      name="netTcpMtomStreamedTxnEndpoint" />      <endpoint address="net.tcp://localhost:11557/ssisintegration"                      binding="customBinding"                      bindingConfiguration="netTcpBinaryStreamedBinding"                      contract="Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client.IUploadMessage"                      name="netTcpSynchronousSSISPackageEndpoint" />      <endpoint address="net.tcp://localhost:11558/ssisintegration"                      binding="customBinding"                      bindingConfiguration="netTcpBinaryStreamedBinding"                      contract="Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client.IUploadMessage"                      name="netTcpAsynchronousSSISPackageEndpoint" />      <endpoint address="net.pipe://localhost/ssiintegration"                      binding="netNamedPipeBinding"                      bindingConfiguration="netNamedPipeBinding"                      contract="Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client.IUploadMessage"                      name="netNamedPipeStreamedEndpoint" />            <endpoint address="net.tcp://localhost:11557/ssisintegration"                      binding="customBinding"                      bindingConfiguration="netTcpMtomStreamedBinding"                      contract="Microsoft.BizTalk.CAT.Samples.SSISIntegration.Client.IUploadMessage"                      name="directServiceEndpoint" />        </client>    </system.serviceModel></configuration>

For more information on how using the streamed transfer mode along with WCF, you can read the following topic:

  • “How to: Enable Streaming” article on MSDN.

BizTalk Application

In order to compare the performance of the various message encoders and transfer modes when transferring large messages over the network, I created six different WCF-Custom Receive Locations, one for each possible combination of these properties:

  1. SSISIntegration.WCF-Custom.NetTcp.Mtom.Streamed.ReceiveLocation: TransferMode = Streamed, Message Encoder = MTOM.
  2. SSISIntegration.WCF-Custom.NetTcp.Binary.Streamed.ReceiveLocation: TransferMode = Streamed, Message Encoder = Binary.
  3. SSISIntegration.WCF-Custom.NetTcp.Text.Streamed.ReceiveLocation: TransferMode = Streamed, Message Encoder = Text.
  4. SSISIntegration.WCF-Custom.NetTcp.Mtom.Buffered.ReceiveLocation: TransferMode = Buffered, Message Encoder = MTOM.
  5. SSISIntegration.WCF-Custom.NetTcp.Binary.Buffered.ReceiveLocation: TransferMode = Buffered, Message Encoder = Binary.
  6. SSISIntegration.WCF-Custom.NetTcp.Text.Buffered.ReceiveLocation: TransferMode = Buffered, Message Encoder = Text.

Each of the above Receive Locations use a CustomBinding and the TcpTransportBindingElement. Let’s compare the Binding configuration of the Receive Locations #2 and #5. They both use the Binary message encoder but they are configured to use a different transfer mode, respectively streamed and buffered. If you look at the WCF-Custom Adapter configuration of the SSISIntegration.WCF-Custom.NetTcp.Binary.Streamed.ReceiveLocation receive location (see the picture below), you will note that:

  • The value of the MaxReceivedMessageSize property has been increased to almost 1 GB to handle large messages.
  • The value of the MaxBufferSize property has been set to 1 MB. You can eventually arrange a larger memory buffer.
  • The value of the TransferMode property has been set to Streamed.
  • I increased the value of several properties like MaxPendingAccepts (default value = 1), MaxPendingConnections (default value = 10), ListenBacklog (default value = 10), MaxOutboundConnectionsPerEndpoint (default value = 10). 200 is probably too high, but it’s important to note here that when using the WCF-Custom/WCF-CustomIsolated Adapter in conjunction with the CustomBinding, you have can fine tune the value of all the properties exposed by the individual binding elements which compose your binding.

WCF-CustomBinaryStreamed

By contrast, the SSISIntegration.WCF-Custom.NetTcp.Binary.Buffered.ReceiveLocation receive location has been  configured as follows:

  • The value of the MaxReceivedMessageSize property has been increased to almost 1 GB to handle large messages.

  • The value of the MaxBufferSize property is identical to the value of the MaxReceivedMessageSize. When using the Buffered transfer mode, the incoming message must be entirely received in a memory buffer before message processing can start. Therefore, the value of the MaxBufferSize property must necessarily be equal to the value of the MaxReceivedMessageSize property. If you try to specify a different value for the 2 properties, the WCF-Custom Receive Location will be immediately disabled and the following error message will be written in the Application Log:

    The Messaging Engine failed to add a receive location "<Receive Location Name>" with URL "<Receive Location URL>" to the adapter "WCF-Custom". Reason: "System.ArgumentException: For TransferMode.Buffered, MaxReceivedMessageSize and MaxBufferSize must be the same value.

  • The value of the TransferMode property has been set to Streamed.

  • The value of other performance related has been tuned up.

WCF-CustomBinaryBuffered

All the above Receive Locations have been configured to use the ServiceThrottlingBehavior. The default value of the 3 properties exposed by the service behavior have been increased from the default value to 200. Indeed, as I explained in my article called Customizing and Extending the BizTalk WCF Adapters, all incoming messages to a certain WCF Receive Location are received and processed by a singleton instance of the BizTalkServiceInstance class. Thus, the value you specify for the MaxConcurrentInstances property is irrelevant as a single service instance will be used for each WCF Receive Location. Also the value you assign to the MaxConcurrentSessions  property is no relevant if you don’t use a session-aware binding. As a consequence, the only property that you should properly set is the MaxConcurrentCalls that specifies the maximum number of messages concurrently processed by the WCF Receive location.

 Behavior

If we enable WCF tracing on the client application and we try to send a file to BizTalk, we can note that the SOAP message sent by the client application to BizTalk has the following format (the namespace of custom headers have been removed for simplicity).

  
 <s:Envelope xmlns:s="https://www.w3.org/2003/05/soap-envelope" xmlns:a="https://www.w3.org/2005/08/addressing">    <s:Header>        <a:Action s:mustUnderstand="1">UploadMessage</a:Action>        <h:dateTime>2010-05-17T17:14:47.0058023+02:00</h:dateTime>        <h:filename>config.xml</h:filename>        <h:id>93204315-aeee-48d0-8922-09ef521c27f2</h:id>        <h:sender>Paolo Salvatori</h:sender>        <h:size>3617</h:size>        <a:MessageID>urn:uuid:d8f87456-cef5-4302-b760-f1765483e4f1</a:MessageID>        <a:ReplyTo>        <a:Address>https://www.w3.org/2005/08/addressing/anonymous</a:Address>        </a:ReplyTo>        <a:To s:mustUnderstand="1">net.tcp://localhost:11553/ssisintegration</a:To>    </s:Header>    <s:Body>        <UploadMessage xmlns="https://microsoft.biztalk.cat/10/samples/ssisintegration/uploadservice">            <data>...</data>        </UploadMessage>    </s:Body></s:Envelope>

As a consequence, in order to extract data from the Body element, it’s necessary to properly configure the Inbound BizTalk message body section of all the WCF Receive Locations:

  • The expression “/*[local-name()='UploadMessage' and namespace-uri()='<targetNamespace’']/*[local-name()='data' and namespace-uri()='targetNamespace']”  must be assigned to the Body path expression field.
  • Base64 must be selected as Node encoding.

Messages

Scenario

To avoid publishing the incoming large message to the MessageBox, I created a custom receive pipeline and in particular a custom pipeline component called StageFilePipeComponent that receives and stage the incoming file a folder or to a custom SQL database using a streaming approach and finally replace the  original file with a placeholder document which contains the location of the staged file and its metadata. When published to the MessageBox, the placeholder document is consumed by a Send Port and processed by a custom send pipeline that reads the location of the original message and substitutes the placeholder document with this latter. Besides, the custom pipeline running within the Receive Location promotes the AckRequired context property. This will cause the send port to generate ACK and NACK messages when the adapter reports a message as successfully delivered or in the case of a NACK, when the adapter has failed and a message will be suspended. These messages are special and will only be published to the MessageBox, if there is someone subscribing to them. In my case, ACK and NACK messages are consumed by a special orchestration which role is to remove the original message from the staging repository (Folder, SQL Server) once the original message has been successfully transmitted by the send port or to eventually resubmit a suspended resumable message when the send port runs out of retries. In the next article of the series, I’ll show the code of the pipeline components and orchestration. For now I will only describe the data flow of the demo and discuss the results in terms of performance:

Non Transactional Case

NonTransactionalDemo 

  1. The user selects one or multiple files on the UI of the driver application, then selects a service endpoint that corresponds to a given WCF Receive Location and finally click the Submit button. For simplicity, let’s assume  that the user chose to send a single file. This latter is received by one of the six WCF Receive Locations.
  2. The custom StageFileReceivePipeline reads the message metadata from the custom headers using the InboundHeaders context property and then writes the original file to a staging repository that can be a folder or a table in custom SQL database which contains a varbinary(max) FILSTREAM column. Besides, the pipeline promotes the AckRequired context property to enable delivery notifications and substitutes the original file with a small placeholder document which contains the metadata and actual location of the original message.
  3. The placeholder document is published to the BizTalkMsgBoxDb by the Message Agent.
  4. The inbound request is consumed by a FILE Send Port.
  5. The RestoreFileSendPipeline reads the content of the placeholder document and replaces this latter with the original message. In particular, the RestoreFilePipelineComponent pipeline assign a different type of stream to the outbound IBaseMessage depending on the location of the original message:
    • Folder: the component assign a FileStream object to the Data property of the message body part.
    • SQL Server: the component opens a SqlFileStream to read the content of the original file from SQL Server and wraps this object within a WrapperStream object. The pipeline component opens a connection and initiate a SQL transaction to initiate the SqlFileStream. The content of this latter is not meant to be read by the pipeline component, but  by the send adapter, therefore the connection and transaction need to remain open. The scope of the WrapperStream object is committing the transaction and closing the connection when the adapter has successfully transmitted the message. In fact, when the adapter has finished processing, the BizTalk messaging runtime invokes the Close method of the WrapperStream object which contains the code for committing the transaction and closing the connection.
  6. The WCF-Custom Send Port  write the original message to the target folder (Out).
  7. The BizTalk Messaging Runtime generates an ACK/NACK message depending on whether the transmission was successful or not.
  8. The ACK/NACK message is consumed by a special orchestration called NotificationHandler. This latter executes a different action depending on the type of the inbound document:
    • ACK: the orchestration deletes the original file from the staging repository.
    • NACK: the orchestration invokes a BRE policy to decide how to proceed. Depending on the information contained in the policy, the orchestration can execute one of the following actions:
      • Terminate the suspended service instance.
      • Resume the suspended service instance.

Results

The two tables below report the performance data in terms of latency and memory consumption that I measured during my tests. The size of messages used during tests was around 50 MB. The following findings emerge from the analysis of the figures:

  1. Binary Encoder > Mtom Encoder > Text Encoder, where “>”  stands for faster.
  2. TransferMode= Streamed is faster than the TransferMode= Buffered (the default) when transmitting large messages.
  3. If the WCF Receive Location uses the TransferMode= Streamed the Working Set of the BizTalk process hosting the Receive Location remains pretty low when receiving multiple large messages at the same time.
  4. If the WCF Receive Location uses the TransferMode= Buffered the Working Set of the BizTalk process hosting the Receive Location grows almost linearly with the number of concurrent incoming messages.

Tables

Conclusions

This article introduced a powerful technique to transfer and handle large files and provided a comparison in terms of performance and memory consumption between the message encoders and transfer modes supplied by WCF. In the next article, I’ll provide more details on the solution implementation and I’ll explain how to configure a WCF-Custom Receive Location and the StageFileReceivePipeline to stage the incoming file to the file system (using the Transactional NTFS or TxF) or to SQL Server in a transactional way using the same transaction utilized by the Message Agent to post the placeholder document to the MessageBox. In the meantime, you can download the code from my site in 2 flavors:

  • LargeMessageTransmission: this is the original version with no SSIS integration.
  • SSISIntegration: this is a sort of experimental version where the StageFilePipelineComponent has been extended to allow the possibility to synchronously or asynchronously invoke a SSIS Package against the staged file. To this purpose, I extended the Helpers library with a class called PackageHelper that provides the possibility to call an SSIS Package from a custom component.

As always, please provide feedbacks, problems and suggestions.