Jaa


Managed Threading: QueueUserWorkItem with Callback

Technorati Tags: .Net,Asynchronous programming,Managed Threading

Hi There,

I am Syam Pinnaka, Dev in IAM services team at Microsoft.

Managed threading in .Net provide many different ways of creating threads and processing the tasks in separate threads and thus gain all the advantages that comes with multi threading. One way of creating a separate thread is by using “ThreadPool.QueueUserWorkItem” which has two overloads. First one takes the WaitCallback which takes the method to execute in the ThreadPool thread. Second one takes a WaitCallback along with a state object which can be used to provide the input information required for the ThreadPool thread. The first overload works fine if all the information required for the ThreadPool thread is present in the method itself or in the classes that it has access to. There could be a complex scenario where in you will need to provide some custom information to the ThreadPool thread like lets say the Callback to call after the ThreadPool thread completes its task. This post provides the information and example code required to handle this scenario. i.e. Provide custom information to the ThreadPool thread which includes a Callback reference and TheadPool thread uses to this Callback to notify the client when its work is completed.

Lets assume that we would like to handle a series of web page download requests using thread pool threads. Lets also assume that we would like each Threadpool thread to notify the client when its done downloading the web page. Lets see how we can handle this scenario using ThreadPool threads in an example.

Here is a simple class which downloads a web page given its Uri.

 

     class WebPageDownloader
    {
        public WebPageDownloader() { }

        public string PageDownload(Uri pageUri)
        {
            WebRequest request = null;
            HttpWebResponse response = null;
            Stream dataStream = null;
            StreamReader reader = null;
            string responseFromServer = string.Empty;
            try
            {
                request = WebRequest.Create(pageUri);
                request.Credentials = CredentialCache.DefaultCredentials;
                response = (HttpWebResponse)request.GetResponse();
                // Get the stream containing content returned by the server.
                dataStream = response.GetResponseStream();
                // Open the stream using a StreamReader for easy access.
                reader = new StreamReader(dataStream);
                // Read the content.
                responseFromServer = reader.ReadToEnd();
            }
            finally
            {
                reader.Close();
                dataStream.Close();
                response.Close();
            }

            return responseFromServer;
        }
    }

In order to make this as an ASync and execute the “PageDownload” in a separate thread, we can call ThreadPool.QueueUserWorkItem(new WaitCallback(PageDownload)) . But we would like this new thread to notify the client when the download is complete by passing in the callback method information to this ThreadPool thread. Thats two pieces of information now that we need to pass in. 1) Page Uri to download and 2) Callback to notify the client. This would lead us to modify PageDownload method to take a custom object which can have both Uri and the callback. Return type will change to void from string as we now use the callback to return the downloaded web page. Our new PageDownload method looks like below.

         private void PageDownload(object state)
        {
            Uri pageUri = ((PageDownloadInfo)state).uri;
            EndDownloadCallback endDownloadCallback = ((PageDownloadInfo)state).endDownloadCallback;

            if (endDownloadCallback == null)
                throw new ArgumentNullException("endDownloadCallback");

            WebRequest request = null;
            HttpWebResponse response = null;
            Stream dataStream = null;
            StreamReader reader = null;
            string responseFromServer = string.Empty;
            try
            {
                request = WebRequest.Create(pageUri);
                request.Credentials = CredentialCache.DefaultCredentials;
                response = (HttpWebResponse)request.GetResponse();
                // Get the stream containing content returned by the server.
                dataStream = response.GetResponseStream();
                // Open the stream using a StreamReader for easy access.
                reader = new StreamReader(dataStream);
                // Read the content.
                responseFromServer = reader.ReadToEnd();
            }
            finally
            {
                reader.Close();
                dataStream.Close();
                response.Close();
            }

            endDownloadCallback(responseFromServer);
        }

Please note in the above code that we have created and using a new type called PageDownloadInfo to pass in the required input information. PageDownloadInfo looks like below.

     /// <summary>
    /// Class to contain information about page to download.
    /// </summary>
    internal class PageDownloadInfo
    {
        public Uri uri;
        /// <summary>
        /// Callback to use after the download completes.
        /// </summary>
        public EndDownloadCallback endDownloadCallback;

        public PageDownloadInfo(Uri pageUri, EndDownloadCallback callback)
        {
            uri = pageUri;
            endDownloadCallback = callback;
        }
    }

We also need two other methods to initiate the ThreadPool thread to start downloading the web page asynchronously and another method to receive the callback. Those two would look something like below.

         public void StartDownloadPageAsync(Uri pageUri)
        {
            PageDownloadInfo pageInfo = new PageDownloadInfo(pageUri, new EndDownloadCallback(EndDownloadPage));
            ThreadPool.QueueUserWorkItem(new WaitCallback(PageDownload), pageInfo);
        }

        public void EndDownloadPage(string page)
        {
            PageDownloadedEventArgs downloadedPage = new PageDownloadedEventArgs();
            downloadedPage.Page = page;
            pageDownloaded(this, downloadedPage);
        }

StartDownloadPageAsync” is the Async call initiator. It creates a new ThreadPool thread using ThreadPool.QueueUserWorkItem by passing in PageDownload method and pageInfo. PageDownload method will be queued up with this call to download the web page and call the EndDownloadPage using the Callback provided in pageInfo.

Note that EndDownloadPage has some code which in turn notifies its clients about downloaded page and its done using pageDownloaded event. pageDownloaded is a custom event that’s created to notify and provide the downloaded web page information to the client.

     /// <summary>
    /// Event arguments to use with page downloaded event.
    /// </summary>
    internal class PageDownloadedEventArgs : EventArgs
    {
        public string Page { get; set; }
    }


        /// <summary>
        /// Event to fire to trigger the download complete notificatio to clients.
        /// </summary>
        public event EventHandler<PageDownloadedEventArgs> pageDownloaded;

 

With all the above plumbing in place we now have a fully functional page download Class which can processes the incoming requests asynchronously. The clients can now subscribe to pageDownloaded event and make a call StartDownloadPageAsync to start downloading the page and get notified when its done.

Complete code looks like below and enjoy the full power of Managed threading. Happy coding!

 using System;
using System.IO;
using System.Net;
using System.Threading;

namespace AsyncExercises
{
    /// <summary>
    /// Declare the delegate to use when download completes.
    /// </summary>
    /// <param name="page"></param>
    public delegate void EndDownloadCallback(string page);

    /// <summary>
    /// Event arguments to use with page downloaded event.
    /// </summary>
    internal class PageDownloadedEventArgs : EventArgs
    {
        public string Page { get; set; }
    }

    /// <summary>
    /// Class to contain information about page to download.
    /// </summary>
    internal class PageDownloadInfo
    {
        public Uri uri;
        /// <summary>
        /// Callback to use after the download completes.
        /// </summary>
        public EndDownloadCallback endDownloadCallback;

        public PageDownloadInfo(Uri pageUri, EndDownloadCallback callback)
        {
            uri = pageUri;
            endDownloadCallback = callback;
        }
    }

    /// <summary>
    /// Class that downloads a web page and returns the contents as a string.
    /// </summary>
    class WebPageDownloader
    {
        /// <summary>
        /// Event to fire to trigger the download complete notificatio to clients.
        /// </summary>
        public event EventHandler<PageDownloadedEventArgs> pageDownloaded;

        /// <summary>
        /// Constructor
        /// </summary>
        public WebPageDownloader()
        {
        }

        /// <summary>
        /// Async download start, This will start a new thread and schedule page download task.
        /// </summary>
        /// <param name="pageUri"></param>
        public void StartDownloadPageAsync(Uri pageUri)
        {
            PageDownloadInfo pageInfo = new PageDownloadInfo(pageUri, new EndDownloadCallback(EndDownloadPage));    
            ThreadPool.QueueUserWorkItem(new WaitCallback(PageDownload), pageInfo);
        }

        /// <summary>
        /// Method that will be called by the thread pool thread when download completes.
        /// </summary>
        /// <param name="page"></param>
        public void EndDownloadPage(string page)
        {
            PageDownloadedEventArgs downloadedPage = new PageDownloadedEventArgs();
            downloadedPage.Page = page;
            pageDownloaded(this, downloadedPage);
        }

        /// <summary>
        /// Method which downloads the web page given its Uri and return the downloaded page using callback provided.
        /// </summary>
        /// <param name="state"></param>
        private void PageDownload(object state)
        {
            Uri pageUri = ((PageDownloadInfo)state).uri;
            EndDownloadCallback endDownloadCallback = ((PageDownloadInfo)state).endDownloadCallback; 

            if (endDownloadCallback == null)
                throw new ArgumentNullException("endDownloadCallback");

            WebRequest request = null;
            HttpWebResponse response = null;
            Stream dataStream = null;
            StreamReader reader = null;
            string responseFromServer = string.Empty;
            try
            {
                request = WebRequest.Create(pageUri);
                request.Credentials = CredentialCache.DefaultCredentials;
                response = (HttpWebResponse)request.GetResponse();
                // Get the stream containing content returned by the server.
                dataStream = response.GetResponseStream();
                // Open the stream using a StreamReader for easy access.
                reader = new StreamReader(dataStream);
                // Read the content.
                responseFromServer = reader.ReadToEnd();
            }
            finally
            {
                reader.Close();
                dataStream.Close();
                response.Close();
            }

            endDownloadCallback(responseFromServer);
        }
    }
}