Freigeben über


Writing a resource manager that supports promotable transactions (or Promotable Single Phase Enlistment aka PSPE) in System.Transactions

A key feature that targets performance in System.Transactions is the Promotable Single Phase Enlistment. It allows a durable resource manager (RM) to host and "own" a transaction that can later be promoted to a distributed transaction (or MSDTC transaction) if necessary. This specific resource manager usually has its own internal non distributed transactions and it needs to support changing those transactions to distributed transactions at runtime.

The overview of steps involved in PSPE is:
1. The RM proxy enlists for PSPE using Transaction.EnlistPromotableSinglePhase. If the enlistment succeeds, the RM usually creates its internal transaction and associates it with the System.Transactions transaction. The notification are later sent to the RM proxy using the IPromotableSinglePhaseNotification interface
2. If the System.Transactions transaction never requires a promotion (see previous post https://blogs.msdn.com/florinlazar/archive/2005/05/12/416805.aspx) then, when the transaction is committed, the RM proxy will receive a SinglePhaseCommit, at which point it can commit the internal transaction that was initially created
3. If the System.Transactions transaction needs to be promoted (to support multiple RMs for instance), then System.Transactions will ask the RM to promote the transaction to a distributed transaction. The RM will have to promote the internal transaction to an MSDTC transaction, and associate it with the work already done. Later, when System.Transactions will commit its transaction, it will send a SinglePhaseCommit notification to the RM proxy and the RM will have to commit the distributed transaction that it created during promotion.

Please note that at step 1, I had an "if the PSPE enlistment succeeds" condition. This means that PSPE is not always allowed by System.Transactions. The cases when this can happen are: 1) the transaction is already a distributed transaction or 2) another RM has already done a PSPE enlistment. If the PSPE enlistment fails, the RM will have to follow the regular rules for enlistment (marshal the transaction to the RM and enlist using DurableEnlist).

Let's look at code now. I will have a client and a server communicating using remoting:

<client.cs>

 using System;
using System.Runtime.Remoting;
using System.Runtime.Remoting.Channels;
using System.Runtime.Remoting.Channels.Tcp;
using System.Transactions;

namespace PSPEClient
{
    class Client
    {
        static void Main(string[] args)
        {
            using (TransactionScope ts = new TransactionScope())
            {
                /*section1begin
                DurableRM durableRM = new DurableRM();
                durableRM.OpenConnection();
                durableRM.DoWork();
                /*section1end*/
                
                DatabaseProxy dbProxy = new DatabaseProxy();
                dbProxy.OpenConnection();
                dbProxy.DoWork();
                
                /*section2begin
                DurableRM durableRM = new DurableRM();
                durableRM.OpenConnection();
                durableRM.DoWork();
                /*section2end*/
                
                ts.Complete();
            }
            System.Console.ReadLine();
        }
        
        class DatabaseProxy
        {
            private PSPEServer.PSPEDatabaseServer db;
            private InternalRM internalRM;
            
            public void OpenConnection()
            {
                System.Console.WriteLine("DatabaseProxy.OpenConnection");
                // connecting to the remote database
                TcpChannel tcpChannel = new TcpChannel();
                ChannelServices.RegisterChannel(tcpChannel);
                this.db = (PSPEServer.PSPEDatabaseServer)Activator.GetObject(
                    typeof(PSPEServer.PSPEDatabaseServer), "tcp://localhost:8085/MyDatabase");
                if (null == db)
                {
                    System.Console.WriteLine("Cannot connect to the server");
                }
                else
                {
                    System.Console.WriteLine("Internal tx id:" + db.Connect());
                }
                
                // enlisting in the transaction
                if (null != this.internalRM)
                {
                    throw new System.Exception("we don't support multiple connections, this is just a sample");
                }
                this.internalRM = new InternalRM(db);
                this.internalRM.Enlist();
            }
            
            public void DoWork()
            {
                System.Console.WriteLine("DatabaseProxy.DoWork");
                db.DoWork();
            }
            
            class InternalRM : IPromotableSinglePhaseNotification
            {
                #region IPromotableSinglePhaseNotification Members
                
                // This member will be called during the call to EnlistPromotableSinglePhase
                // The RM will usually allocate its internal transaction state here
                public void Initialize()
                {
                    System.Console.WriteLine("InternalRM.Initialize");
                }
                
                // This method will be called if the RM should Rollback the
                // transaction.  Note that this method will be called even if
                // the transaction has been promoted to a distributed transaction.
                public void Rollback(SinglePhaseEnlistment singlePhaseEnlistment)
                {
                    System.Console.WriteLine("InternalRM.Rollback");
                    db.RollbackWork();
                    singlePhaseEnlistment.Aborted();
                }
                
                // This method will be called when the RM should Commit the
                // transaction.  Note that this method will be called even if
                // the transaction has actually been promoted to a distributed
                // transaction.
                public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
                {
                    System.Console.WriteLine("InternalRM.SinglePhaseCommit");
                    db.CommitWork();
                    singlePhaseEnlistment.Committed();
                }
                
                #endregion
            
                #region ITransactionPromoter Members
                
                // This method will be called if System.Transactions
                // determines that the transaction actually needs the support of
                // a fully distributed transaction manager.  The return value of
                // this method is a promoted representation of the transaction
                // usually in the form of transmitter/receiver propagation token
                public byte[] Promote()
                {
                    System.Console.WriteLine("InternalRm.Promote");
                    return db.Promote();
                }
                
                #endregion
            
                private PSPEServer.PSPEDatabaseServer db;
                
                public InternalRM(PSPEServer.PSPEDatabaseServer db)
                {
                    this.db = db;
                }
                
                public void Enlist()
                {
                    System.Console.WriteLine("InternalRM.Enlist");
                    if (null != Transaction.Current)
                    {
                        if (!Transaction.Current.EnlistPromotableSinglePhase(this))
                        {
                            System.Console.WriteLine("PSPE failed, doing regular Enlist");
                            // PSPE failed; we need to use the regular enlistment
                            db.Enlist(TransactionInterop.GetTransmitterPropagationToken(Transaction.Current));
                        }
                    }
                }
            }
        }
     
        class DurableRM : IEnlistmentNotification
        {
            
            #region IEnlistmentNotification Members
            
            public void Commit(Enlistment enlistment)
            {
                System.Console.WriteLine("DurableRM.Commit");
                enlistment.Done();
            }
            
            public void InDoubt(Enlistment enlistment)
            {
                System.Console.WriteLine("DurableRM.InDoubt");
                throw new Exception("The method or operation is not implemented.");
            }
            
            public void Prepare(PreparingEnlistment preparingEnlistment)
            {
                System.Console.WriteLine("DurableRM.Prepare");
                // first a durable RM will log preparingEnlistment.RecoveryInformation(), but this is just a sample
                preparingEnlistment.Prepared();
            }
            
            public void Rollback(Enlistment enlistment)
            {
                System.Console.WriteLine("DurableRM.Rollback");
                enlistment.Done();
            }
            
            #endregion
            
            public void OpenConnection()
            {
                System.Console.WriteLine("DurableRM.OpenConnection and enlist durable");
                if (null != Transaction.Current)
                {
                    Transaction.Current.EnlistDurable(Guid.NewGuid(), this, EnlistmentOptions.None);
                }
            }
            public void DoWork()
            {
                System.Console.WriteLine("DurableRM - DoWork");
            }
        }
    }
}

</client.cs>
<server.cs>

 using System;
using System.Runtime.Remoting;
using System.Runtime.Remoting.Channels;
using System.Runtime.Remoting.Channels.Tcp;
using System.Transactions;
using System.Diagnostics;
 
namespace PSPEServer
{
    class Server
    {
        static void Main(string[] args)
        {
            TcpChannel tcpChannel = new TcpChannel(8085);
            ChannelServices.RegisterChannel(tcpChannel);
            RemotingConfiguration.RegisterWellKnownServiceType(Type.GetType("PSPEServer.PSPEDatabaseServer"),
                "MyDatabase", WellKnownObjectMode.Singleton);
            System.Console.WriteLine("Server running...");
            System.Console.ReadLine();
        }
    }
 
    public class PSPEDatabaseServer : MarshalByRefObject
    {
        private int internalTxID = 0;
        private CommittableTransaction tx;
        private InternalServerRM internalServerRM;

        public int Connect()
        {
            System.Console.WriteLine("client connected");
            return ++internalTxID;
        }

        public void DoWork()
        {
            System.Console.WriteLine("PSPEDBServer.DoWork");
        }

        public byte[] Promote()
        {
            System.Console.WriteLine("PSPEDBServer.Promote");
            this.tx = new CommittableTransaction();
            Debug.Assert(this.internalServerRM == null);
            // the following statement will cause the transaction to be promoted to MSDTC
            byte[] txToken = TransactionInterop.GetTransmitterPropagationToken(this.tx);
            Enlist(txToken);
            return txToken;
        }

        public void CommitWork()
        {
            System.Console.WriteLine("PSPEDBServer.CommitWork");
            if (tx != null)
            {
                // we have a distributed transaction, and so we have to commit it
                tx.Commit();
            }
            else
            {
                // we only have an internal tx
                System.Console.WriteLine("committing internal tx:" + internalTxID);
            }
        }

        public void RollbackWork()
        {
            System.Console.WriteLine("PSPEDBServer.RollbackWork");
            if (tx != null)
            {
                // we have a distributed transaction, and so we have to rollback it
                tx.Rollback();
            }
            else
            {
                // we only have an internal tx
                System.Console.WriteLine("aborting internal tx:" + internalTxID);
            }        
        }

        public void Enlist(byte[] txToken)
        {
            System.Console.WriteLine("PSPEDBServer.Enlist");
            this.internalServerRM = new InternalServerRM();
            this.internalServerRM.Enlist(txToken);
        }

        private class InternalServerRM : ISinglePhaseNotification
        {

            #region ISinglePhaseNotification Members

            public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
            {
                System.Console.WriteLine("InternalServerRM.SPC");
                singlePhaseEnlistment.Committed();
            }

            #endregion

            #region IEnlistmentNotification Members

            public void Commit(Enlistment enlistment)
            {
                System.Console.WriteLine("InternalServerRM.Commit");
                enlistment.Done();
            }

            public void InDoubt(Enlistment enlistment)
            {
                System.Console.WriteLine("InternalServerRM.InDoubt");
                throw new Exception("The method or operation is not implemented.");
            }

            public void Prepare(PreparingEnlistment preparingEnlistment)
            {
                System.Console.WriteLine("InternalServerRM.Prepare");
                // first a durable RM will log preparingEnlistment.RecoveryInformation(), but this is just a sample
                preparingEnlistment.Prepared();
            }

            public void Rollback(Enlistment enlistment)
            {
                System.Console.WriteLine("InternalServerRM.Rollback");
                enlistment.Done();
            }

            #endregion

            private Guid rmGuid = new Guid("{B14FF9BB-8419-4dbc-A78C-3C1453D60AC4}");

            public void Enlist(byte[] txToken)
            {
                System.Console.WriteLine("InternalServerRM.Enlist");
                TransactionInterop.GetTransactionFromTransmitterPropagationToken(txToken).EnlistDurable(
                    this.rmGuid, this, EnlistmentOptions.None);
            }
        }
    }
}

</server.cs>

If we run the code, with only the PSPE enlistment, System.Transactions is managing the transaction and MSDTC is never involved; we get the following outputs:
 
<client>
DatabaseProxy.OpenConnection
Internal tx id:1
InternalRM.Enlist
InternalRM.Initialize
DatabaseProxy.DoWork
InternalRM.SinglePhaseCommit
</client>
 
<server>
Server running...
client connected
PSPEDBServer.DoWork
PSPEDBServer.CommitWork
committing internal tx:1
</server>
 

If we enlist a durable RM after the PSPE enlistment (uncomment section 2), the transaction is promoted to MSDTC when the durable enlistment is done and the outputs are:
 
<client>
DatabaseProxy.OpenConnection
Internal tx id:1
InternalRM.Enlist
InternalRM.Initialize
DatabaseProxy.DoWork
DurableRM.OpenConnection and enlist durable
InternalRm.Promote
DurableRM - DoWork
InternalRM.SinglePhaseCommit
DurableRM.Prepare
DurableRM.Commit
</client>
 
<server>
Server running...
client connected
PSPEDBServer.DoWork
PSPEDBServer.Promote
PSPEDBServer.Enlist
InternalServerRM.Enlist
PSPEDBServer.CommitWork
InternalServerRM.Prepare
InternalServerRM.Commit
</server>
 

And finally, if we enlist a durable RM before the PSPE enlistment (uncomment section 1), the PSPE enlistment will fail and thus the RM will have to follow the regular enlistment procedures. The outputs in this case are:

<client>
DurableRM.OpenConnection and enlist durable
DurableRM - DoWork
DatabaseProxy.OpenConnection
Internal tx id:1
InternalRM.Enlist
PSPE failed, doing regular Enlist
DatabaseProxy.DoWork
DurableRM.Prepare
DurableRM.Commit
</client>
 
<server>
Server running...
client connected
PSPEDBServer.Enlist
InternalServerRM.Enlist
PSPEDBServer.DoWork
InternalServerRM.Prepare
InternalServerRM.Commit
</server>
 
NOTE: the goal of my sample was to show the use of PSPE. For simplicity, I ignored dealing with the recovery (I will follow up with another post where I will target Durable Enlistments and recovery).

Comments

  • Anonymous
    May 18, 2005
    FANTASTIC !!! SIMPLY AWESOME !!! :-)

  • Anonymous
    May 19, 2005
    In the first senario(uncomment section 1,2),
    You mean DatabaseProxy.DoWork method call is
    Transactional?
    I don't know why...

  • Anonymous
    May 19, 2005
    I made a mistake.
    Senario 1 means comment section 1 and 2.

  • Anonymous
    May 20, 2005
    To: arnold

    DatabaseProxy.DoWork is transactional in all 3 scenarios. For scenario 1, on way in which you can see this by adding another Volatile enlistment.

  • Anonymous
    April 05, 2006
    Following links will help you to write resource manager by using SYSTX classes
    &amp;nbsp;
    http://msdn2.microsoft.com/en-us/library/ms229975(VS.80).aspx...

  • Anonymous
    July 04, 2006
    Great article!

  • Anonymous
    July 22, 2006
    Florin, have you written the preparingEnlistment.RecoveryInformation() sample/blog?

  • Anonymous
    October 01, 2006
    Hi Florin,I see the remoting endpoint (PSPEDatabase) is a singleton and the PSPEDatabase class is not thread safe since the member-level variables "internalServerRM" and "tx" are not protected. Obviously, changing the endpoint from Singleton to SingleCall won't work. What is your recommendation for this same sample implemented using WCF?Thanks,Michael

  • Anonymous
    October 03, 2006
    To: Michael For a recovery sample, see my posts at http://forums.microsoft.com/MSDN/ShowPost.aspx?PostID=607513&SiteID=1

  • Anonymous
    October 03, 2006
    To: Michael Primeaux I've used remoting because it was the simplest way for this sample to communicate between two processes. You can use anything you want to communicate between the client and server.

  • Anonymous
    February 10, 2007
    Remember Phase 0 that I described in my previous posts: http://blogs.msdn.com/florinlazar/archive/2006/01/29/msdtc-the-magic-of-phase-zero-phase0-or-when-using-2pc-transactions-is-not-enough.aspx