Поделиться через


Service Bus: BeginSend is no magic async pixie dust

I just got off the call with a customer and had a bit of a déjà vu from a meeting at the beginning of the week, so it looks like the misconception I'll explain here is a bit more common than I expected.

In both cases, the folks I talked to, had the about equivalent of the following code in their app:

var qc = factory.CreateQueueClient(…);
for( int i = 0; i < 1000; i++ )
{
… create message …
qc.BeginSend( msg, null, null );
}
qc.Close();

In both cases, the complaint was that messages were lost and strange exceptions occurred in the logs – which is because, well, this doesn't do what they thought it does.

BeginSend in the Service Bus APIs or other networking APIs as much as BeginWrite on the file system isn't really doing the work that is requested. It is putting a job into a job queue – the job queue of the I/O thread scheduler.

That means that once the code reaches qc.Close() and you have also been mighty lucky, a few messages may indeed have been sent, but the remaining messages will now still sit in that job queue and scheduled for an object that the code just forced to close. With the result that every subsequent send operation that is queued but hasn't been scheduled yet will throw as you're trying to send on a disposed object. Those messages will fail out and be lost inside the sender's process.

What's worse is that writing such code stuffs a queue that is both out of the app's control and out of the app's sight and that all the arguments (which can be pretty big when we talk about messages) dangle on those jobs filling up memory. Also, since the app doesn't call EndSend() , the application also doesn't pick up whatever exceptions are potentially raised by the Send operation and flies completely blind. If there is an EndXXX method for an async operation, you _must_ call that method even if it doesn't return any values, because it might quite well throw you back what went wrong.

So how should you do it? Don't throw messages blindly into the job queue. It's ok to queue up a few to make sure there's a job in the queue as another one completes (which is just slightly trickier than what I want to illustrate here), but generally you should make subsequent sends depend on previous sends completing. In .NET 4.5 with async/await that's a lot easier now:

var qc = factory.CreateQueueClient(…);
for( int i = 0; i < 1000; i++ )
{
… create message …
await task.Factory.FromAsync(qc.BeginSend, qc.EndSend, msg, null );
}
qc.Close();

Keep in mind that the primary goal of async I/O is to not waste threads and lose time through excessive thread switching as threads hang on I/O operations. It's not making the I/O magically faster per-se. We achieve that in the above example as the compiler will break up that code into distinct methods where the loop continues on an I/O thread callback once the Send operation has completed.

Summary:

  1. Don't stuff the I/O scheduler queue with loads of blind calls to BeginXXX without consideration for how the work gets done and completed and that it can actually fail
  2. Always call End and think about how many operations you want to have in flight and what happens to the objects that are attached to the in-flight jobs

Comments

  • Anonymous
    October 20, 2012
    Putting the await inside the loop will perform the sends one at a time (albeit concurrent with the caller). To perform all the sends concurrently with each other one could use the task APIs directly: var sendTasks = theMessages.Select(m => Task.Factory.FromAsync(qc.BeginSend, qc.EndSend, m, null)).ToArray(); await Task.Factory.ContinueWhenAll(sendTasks, () => { qc.Close() });

  • Anonymous
    October 21, 2012
    Thank for the comment, rjcox. Sadly, your counterexample is missing the majority of the points I make here. First, your expression "theMessages.Select(m => Task.Factory.FromAsync(qc.BeginSend, qc.EndSend, m, null)).ToArray();" loads up the I/O threadpool in the same unfortunate fashion as the loop I have in my first example. Second, there is no such thing as a "concurrent send" on something that's inherently serial such as a network connection. I actually wrote in the post that await causes the sends to be dependent and that you could indeed load a few sends into the threadpool queue so that the next send job is immediately available when the singleton gate on the sending client become available for the next message, but that will also make the code a lot more complex.