Dela via


Threads, meet Transactions...

Using multi threading and transactions in .Net

OK this is a pretty simple idea but one that I don't think people grok very well. Or at least the code I keep running into doesn't make one think that the writers of said code get ambient transactions in the least bit. Basically what I am talking about here is how to rectify the use of Transactions and Multi-Threaded application programming to get the results you expect.

So what we're going to be dealing with here for this example is the world's lamest database (but one that is sufficient to illustrate the point). Here's the DDL sql:
CREATE TABLE [dbo].[Test]( [ID] [uniqueidentifier] NOT NULL ) ON [PRIMARY]
As you can see it's a single table expecting an inserted UUID into the test column. Why a UUID you might ask? Well it's because it totally proves that there is no trickery at play here because of the statistically unique values generated.

Common Code (the setup)

I am listing this bit of "utility" code here so I can reference it in the following examples. Just assume I've copied the code into the appropriate place. :)
Note: The use of DebuggerStepThrough is intentional for your debugging experience

 [DebuggerStepThrough()]
private void ExecuteSql(String SQL)
{
  using (SqlConnection conn = new SqlConnection("server=localhost;database=test;Trusted_Connection=Yes;"))
  {
    using (SqlCommand cmd = new SqlCommand(SQL, conn))
    {
      conn.Open();
      cmd.ExecuteNonQuery();
    }
  }
}

The Linear Version

So we're going to start off with the following sequential form of the code just to make it clear what we are doing as well as providing a control to our experiment.

 try
{
  using (TransactionScope trans = new TransactionScope(TransactionScopeOption.RequiresNew))
  {
    ExecuteSql("INSERT INTO TEST VALUES ('" + Guid.NewGuid() + "')");
    ExecuteSql("INSERT INTO TEST VALUES ('" + Guid.NewGuid() + "')");

    trans.Complete();
  }
  Console.WriteLine("Done");
}
catch (Exception ex)
{
    Console.WriteLine(ex.ToString());
}

So when we execute that code and then check the database you should see something similar to the following from this bit of SQL:
select Count(*) from test : 2
Exactly what we expected. If you want to confirm this then simply add another line in the transaction scope but pass a "null" instead to ExecuteSql this time. The transaction will rollback.

 

Calling ExecuteSql on another thread

In this use case we will say we have 3 SQL statements that all must commit together but can totally execute in parallel (a great case for threads in general). So we end up with the following code:

 try
{
  using (TransactionScope trans = new TransactionScope(TransactionScopeOption.RequiresNew))
  {
    Thread t1 = new Thread(delegate
      {
        //See the transaction is thread bound!
        Debug.Assert(Transaction.Current == null);
        ExecuteSql("INSERT INTO TEST VALUES ('" + Guid.NewGuid() + "')");
      });

    Thread t2 = new Thread(delegate
      {
        ExecuteSql("INSERT INTO TEST VALUES ('" + Guid.NewGuid() + "')");
      });

    Thread t3 = new Thread(delegate
      {
        //This one will generate an exception so we should rollback
        ExecuteSql(null);
      });

    t1.Start();
    t2.Start();
    t3.Start();

    t1.Join();
    t2.Join();
    t3.Join();

    trans.Complete();
  }
  Console.WriteLine("Done");
}
catch (Exception ex)
{
  Console.WriteLine(ex.ToString());
}

So do we get our transactional semantics? Well let's run the sql and see. The answer is NO! There's still two records created!. Take a look at my first delegate implementation. You see how I am checking for the presence of an ambient transaction? Well it's null because the transaction is thread bound. So how do we get the behavior we want?

 

DependentTransaction to the Rescue!

This little class it the concept of spawning off transactions that are logically grouped together. Its use is just neato I do say. So let's rewrite our code to show it in action!

 try
{
  using (TransactionScope trans = new TransactionScope(TransactionScopeOption.RequiresNew))
  {
    //Look! Using the lamba form of the same delegate
    Thread t1 = new Thread(p =>
                   {
                     DependentTransaction dTrans = (DependentTransaction)p;
                     try
                     {
                       using (TransactionScope childTrans = new TransactionScope(dTrans))
                       {
                         ExecuteSql("INSERT INTO TEST VALUES ('" + Guid.NewGuid() + "')");
                         childTrans.Complete();
                       }
                       dTrans.Complete();
                     }
                     catch (Exception ex)
                     {
                       dTrans.Rollback(ex);
                     }
                   });
    Thread t2 = new Thread(delegate(Object p)
                   {
                     DependentTransaction dTrans = (DependentTransaction)p;
                     try
                     {
                       using (TransactionScope childTrans = new TransactionScope(dTrans))
                       {
                         ExecuteSql("INSERT INTO TEST VALUES ('" + Guid.NewGuid() + "')");
                         childTrans.Complete();
                       }
                       dTrans.Complete();
                     }
                     catch (Exception ex)
                     {
                       dTrans.Rollback(ex);
                     }
                   });
    Thread t3 = new Thread(delegate(Object p)
                   {
                     DependentTransaction dTrans = (DependentTransaction) p;
                     try
                     {
                       using (TransactionScope childTrans = new TransactionScope(dTrans))
                       {
                         ExecuteSql(null);
                         childTrans.Complete();
                       }
                       dTrans.Complete();
                     }
                     catch (Exception ex)
                     {
                       dTrans.Rollback(ex);
                     }
                   });

    t1.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));
    t2.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));
    t3.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

    t1.Join();
    t2.Join();
    t3.Join();

    trans.Complete();
  }
  Console.WriteLine("Done");
}
catch (Exception ex)
{
  Console.WriteLine(ex.ToString());
}

 

So what's going on here? Well I am creating 3 threads as normal, telling them to start, then waiting for the results to finish to commit the transaction. What you will see different in this form is the use of the ParameterizedThreadStart delegate. This form simply accepts a single object of some type and passes it to the method executing on the thread. We're expecting that this object will be an instance of the DependentTransaction class. The TransactionScope class has a great constructor overload that accepts a DependentTransaction. From there you can simply use your TransactionScope just like normal.

So how does one go about acquiring the DependentTransaction? Simply go and request one from the ambient transaction in the parent thread. That can simply be passed to the caller in the parameterized thread and there you go. Note that there is an enum value expected of the Transaction.DependentClone() method. The 2 variations determine the behavior of the parent transaction blocking. If we use the BlockCommitUntilComplete option then our call to the parent transaction Commit() method blocks until all the children Commit() or Rollback(). Note this actually means the calls to Join() are actually redundant here. The other option is to use the RollbackIfNotComplete option which basically means the child transactions are forceably rolled back if they are not done by the time we commit. This is good for hard time constraints in our execution.

Caveats - from the docs

There are a few additional concurrency issues that you need to be aware of when using the DependentTransaction class:

  • If the worker thread rolls back the transaction but the parent tries to commit it, a TransactionAbortedException is thrown.
  • You should create a new dependent clone for each worker thread in the transaction. Do not pass the same dependent clone to multiple threads, because only one of them can call Complete on it.
  • If the worker thread spawns a new worker thread, make sure to create a dependent clone from the dependent clone and pass it to the new thread.

Jimmy Zimms is currently in need of some red meat