Delen via


Ronin Building Blocks - Circuit Breaker

I finished my retries building block, that should be good for basic handling of transient error conditions. When retries are used during throttling related to excessive load it creates the undesired side effect of increasing the load during the future retry attempt. This can result in prolonged throttling and in some cases unrecoverable situations. This sounds like a no win scenario but its not, the right answer is often to stop sending requests to the resource. This is where the circuit breaker pattern shines.

Much like the circuit breaker you find in your house today this pattern executes one set of logic, such as directing requests to the resource, when healthy and a second set of logic when unhealthy. Obviously the definition of healthy is contextual so there is no one size fits all solution. Some of the more common techniques for determining health I have seen include

  • < X consecutive failures in a row
  • >= X percent of requests  in a Y second sliding time period

After changing to the unhealthy operations mode test probes are sent at intervals to the healthy logic to monitor for recovery. A common way would be to do something like send a request every X seconds or Y requests. If the health probe is successful the next request will also go to the test logic to speed the recovery speed. The decision to return to a healthy mode of operation is contextual and has a similar algorithm to the ones outlined above.

The following terms are used for the remainder of the posting.

Primary State: when the breaker is executing it's healthy logic block

Secondary State: when the breaker is executing it's unhealthy logic block

Trip: the action of moving from Primary State to Secondary State

Reset: the action of moving from Secondary State to Primary State

Policy: an algorithm used to identify a healthy or unhealthy state (trip policy or reset policy).

Many circuit breaker policies will require temporal aspects in their algorithm which can be difficult to implement properly. In order to simplify the implementations they will use Reactive Extensions against an observable stream of execution records. The execution record is a simple container object that contains the results of an attempt to execute the breaker logic, whether it is primary, secondary or test. The container class is a simple POCO as seen next. By adding in properties to indicate an attempt ran long or required retries provides the infrastructure to build more complex and often better policy algorithms.

 namespace Ronin.CircuitBreakers
{
    #region Using Clauses
    using System;
    #endregion

    /// <summary>
    /// Represents the outcome of the execution of a logic block
    /// </summary>
    public sealed class ExecutionRecord
    {
        #region Constructors
        /// <summary>
        /// Initializes a new instance of the type
        /// </summary>
        /// <param name="timestamp">The timestamp that the action took place at</param>
        /// <param name="success">True if the execution block was successful</param>
        /// <param name="retryCount">The number of retries that occurred</param>
        /// <param name="longRunning">True if the execution took longer than expected</param>
        /// <param name="mode">The execution mode the logic was run in</param>
        internal ExecutionRecord(DateTimeOffset timestamp, bool success, int retryCount, bool longRunning, ExecutionMode mode)
        {
            Timestamp = timestamp;
            Success = success;
            RetryCount = retryCount;
            LongRunning = longRunning;
            Mode = mode;
        }
        #endregion
        #region Properties
        /// <summary>
        /// The timestamp that the action took place at
        /// </summary>
        public DateTimeOffset Timestamp { get; }

        /// <summary>
        /// True if the execution block was successful
        /// </summary>
        public bool Success { get; }

        /// <summary>
        /// The number of retries that occurred
        /// </summary>
        public int RetryCount { get; }

        /// <summary>
        /// True if the execution took longer than expected
        /// </summary>
        public bool LongRunning { get; }

        /// <summary>
        /// The execution mode the logic was run in
        /// </summary>
        public ExecutionMode Mode { get; }
        #endregion
        #region Execution Mode Enumeration
        /// <summary>
        /// The mode that the logic being reported on was executed in
        /// </summary>
        public enum ExecutionMode
        {
            /// <summary>
            /// The logic was primary mode
            /// </summary>
            Primary,

            /// <summary>
            /// The logic was secondary mode
            /// </summary>
            Secondary,

            /// <summary>
            /// The logic was test mode
            /// </summary>
            Test
        }
        #endregion
    }
}

The main class is the breaker class that has several internal classes and enumerations. One of the internal classes is the BreakerLogicManager that is responsible for publishing the results of every attempt to execute primary, secondary or test logic. This class manages the state of the breaker, switching between trip and reset policies as well as exposing the function and action methods used by the consumer.

 namespace Ronin.CircuitBreakers
{
    #region Using Clauses
    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;
    using Parameters;
    using Core;
    using System.Linq;
    #endregion

    /// <summary>
    /// The core circuit breaker
    /// </summary>
    public class Breaker : IDisposable
    {
        #region Delegates
        /// <summary>
        /// Defines a block of code, primary or fallback to be executed as part of the circuit breaker pattern.
        /// </summary>
        /// <typeparam name="TResult">The resulting type of the method being executed.</typeparam>
        /// <param name="feedback">An object that can be set to indicate failures etc.</param>
        /// <param name="cancellationToken">A cancellation token exposed to the logic block.</param>
        public delegate void LogicFunctionBlock<TResult>(ExecutionFeedbackClass<TResult> feedback, CancellationToken cancellationToken = default(CancellationToken));

        /// <summary>
        /// Defines a block of code, primary or fallback to be executed as part of the circuit breaker pattern.
        /// </summary>
        /// <param name="feedback">An object that can be set to indicate failures etc.</param>
        /// <param name="cancellationToken">A cancellation token exposed to the logic block.</param>
        public delegate void LogicActionBlock(ExecutionFeedbackClass<object> feedback, CancellationToken cancellationToken = default(CancellationToken));

        /// <summary>
        /// Defines a block of code, primary or fallback to be executed as part of the circuit breaker pattern.
        /// </summary>
        /// <typeparam name="TResult">The resulting type of the method being executed.</typeparam>
        /// <param name="feedback">An object that can be set to indicate failures etc.</param>
        /// <param name="cancellationToken">A cancellation token exposed to the logic block.</param>
        public delegate Task LogicFunctionBlockAsync<TResult>(ExecutionFeedbackClass<TResult> feedback, CancellationToken cancellationToken = default(CancellationToken));

        /// <summary>
        /// Defines a block of code, primary or fallback to be executed as part of the circuit breaker pattern.
        /// </summary>
        /// <param name="feedback">An object that can be set to indicate failures etc.</param>
        /// <param name="cancellationToken">A cancellation token exposed to the logic block.</param>
        public delegate Task LogicActionBlockAsync(ExecutionFeedbackClass<object> feedback, CancellationToken cancellationToken = default(CancellationToken));
        #endregion
        #region Variables
        /// <summary>
        /// The number of times the instance has been initialized
        /// </summary>
        private int _initializationCount;

        /// <summary>
        /// A synchronization control used to manage lifecycle events
        /// </summary>
        private readonly SemaphoreSlim _lifecycleControl = new SemaphoreSlim(1);

        /// <summary>
        /// The breaker policy used to determine when to trip to the secondary logic
        /// </summary>
        private readonly BreakerTripPolicy _tripPolicy;

        /// <summary>
        /// The breaker policy used to determine when to reset to the primary logic
        /// </summary>
        private readonly BreakerResetPolicy _resetPolicy;

        /// <summary>
        /// The logic manager for the breaker that reports to observers
        /// </summary>
        private BreakerLogicManager _logicManager;

        /// <summary>
        /// A control object used to access transtion state change
        /// </summary>
        private readonly object _transitionControl = new object();

        /// <summary>
        /// The number of times the breaker has been disposed of
        /// </summary>
        private int _disposalCount;

        /// <summary>
        /// The number of times the tests have been successful in a row
        /// </summary>
        private int _testSuccessCount;

        /// <summary>
        /// True if the state of the breaker is being forced in a direction
        /// </summary>
        private bool _overrideState;

        /// <summary>
        /// True if the forced state is primary, false for secondary
        /// </summary>
        private bool _overrideWithPrimary;

        /// <summary>
        /// A synchronization object used to control access to the override
        /// </summary>
        private readonly object _overrideControl = new object();
        #endregion
        #region Constructors

        /// <summary>
        /// Initializes a new instance of the type
        /// </summary>
        /// <param name="tripPolicy">The breaker policy used to determine when to trip to the secondary logic</param>
        /// <param name="resetPolicy">The breaker policy used to determine when to reset to the primary logic</param>
        public Breaker(BreakerTripPolicy tripPolicy, BreakerResetPolicy resetPolicy)
        {
            Guard.NotNull(nameof(tripPolicy), tripPolicy);
            Guard.NotNull(nameof(resetPolicy), resetPolicy);

            State = 0;
            _tripPolicy = tripPolicy;
            _resetPolicy = resetPolicy;
        }

        #endregion
        #region Properties

        /// <summary>
        /// Returns true if the breaker is currently tripped
        /// </summary>
        public BreakerState State { get; private set; }

        #endregion
        #region Breaker Lifecycle Control Methods
        /// <summary>
        /// Initializes the breaker and will only be called one time. Throws an invalid operation exception ifi the instance has already been initialized
        /// </summary>
        /// <param name="timeout">The time to wait for each of the policies and the breaker to wait.</param>
        /// <param name="cancellationToken">The token to monitor and if cancelled attempt to abort operations.</param>
        /// <returns>True if the object is initialized successfully</returns>
        public async Task<bool> InitializeAsync(TimeSpan? timeout = null, CancellationToken? cancellationToken = null)
        {
            var success = false;

            try
            {
                if (
                    await
                        _lifecycleControl.WaitAsync(timeout ?? TimeSpan.FromSeconds(30),
                            cancellationToken ?? CancellationToken.None).ConfigureAwait(false))
                {
                    if (Interlocked.Increment(ref _initializationCount) == 1)
                    {
                        _logicManager = new BreakerLogicManager();

                        await
                            _tripPolicy.InitializePolicyAsync(this, _logicManager,
                                cancellationToken ?? CancellationToken.None).ConfigureAwait(false);
                        await
                            _resetPolicy.InitializePolicyAsync(this, _logicManager,
                                cancellationToken ?? CancellationToken.None).ConfigureAwait(false);

                        State = BreakerState.Initialized;
                        success = true;
                    }
                    else
                    {
                        throw new InvalidOperationException("Breaker already initialized.");
                    }
                }
            }
            finally
            {
                _lifecycleControl.Release();
            }

            return success;
        }

        /// <summary>
        /// Starts the primary breaker logic if it is initialized and not already running.
        /// </summary>
        /// <param name="timeout">The time to wait for each of the policies and the breaker to wait.</param>
        /// <param name="cancellationToken">The token to monitor and if cancelled attempt to abort operations.</param>
        /// <returns>True if the breaker is started successfully</returns>
        public async Task<bool> Start(TimeSpan? timeout = null, CancellationToken? cancellationToken = null)
        {
            var success = false;

            try
            {
                if (await _lifecycleControl.WaitAsync(timeout ?? TimeSpan.FromSeconds(30), cancellationToken ?? CancellationToken.None).ConfigureAwait(false))
                {
                    if (State == BreakerState.Initialized)
                    {
                        await _tripPolicy.StartMonitoringAsync(cancellationToken ?? CancellationToken.None).ConfigureAwait(false);
                        State = BreakerState.Primary;
                        success = true;
                    }
                    else
                    {
                        throw new InvalidOperationException("Invalid start state.");
                    }
                }
            }
            finally
            {
                _lifecycleControl.Release();
            }

            return success;
        }

        /// <summary>
        /// Stops the breaker from running any trip or reset logic.
        /// </summary>
        /// <param name="timeout">The time to wait for each of the policies and the breaker to wait.</param>
        /// <param name="cancellationToken">The token to monitor and if cancelled attempt to abort operations.</param>
        /// <returns>True if the breaker is stopped successfully</returns>
        public async Task<bool> Stop(TimeSpan? timeout = null, CancellationToken? cancellationToken = null)
        {
            var success = false;

            try
            {
                if (
                    await
                        _lifecycleControl.WaitAsync(timeout ?? TimeSpan.FromSeconds(30),
                            cancellationToken ?? CancellationToken.None).ConfigureAwait(false))
                {
                    if (State != BreakerState.Uninitialized && State != BreakerState.Initialized)
                    {
                        await
                            _tripPolicy.StopMonitoringAsync(cancellationToken ?? CancellationToken.None)
                                .ConfigureAwait(false);
                        await
                            _resetPolicy.StopMonitoringAsync(cancellationToken ?? CancellationToken.None)
                                .ConfigureAwait(false);

                        State = BreakerState.Initialized;
                        success = true;
                    }
                    else
                    {
                        throw new InvalidOperationException("Invalid stop state.");
                    }
                }
            }
            finally
            {
                _lifecycleControl.Release();
            }

            return success;
        }

        /// <summary>
        /// Overrides the state of the breaker, often used for maintenance mode.
        /// </summary>
        /// <param name="overrideState">True to override the state of the circuit breaker forcing processing to primary or secondary. False release the breaker to control the mode</param>
        /// <param name="primary">True if the primary logic is to be executed, false if secondary</param>
        public void SetStateOverride(bool overrideState, bool primary)
        {
            lock (_overrideControl)
            {
                _overrideWithPrimary = primary;
                _overrideState = overrideState;
            }
        }
        #endregion
        #region Execution Methods
        /// <summary>
        /// A method called to execute code using the circuit breaker
        /// </summary>
        /// <typeparam name="TResult">The return type of the logic block</typeparam>
        /// <param name="primaryLogic">The logic block to execute when the circuit breaker is in normal state</param>
        /// <param name="secondaryLogic">The logic block to execute when the circuit breaker is in tripped mode</param>
        /// <param name="cancellationToken">A cancellation token exposed to the logic block.</param>
        /// <returns>The results of the query</returns>
        public TResult Execute<TResult>(LogicFunctionBlock<TResult> primaryLogic, LogicFunctionBlock<TResult> secondaryLogic, CancellationToken cancellationToken = default(CancellationToken))
        {
            var startTime = DateTimeOffset.UtcNow;
            ExecutionFeedbackClass<TResult> feedback;
            ExecutionRecord.ExecutionMode operationsMode;

            if (GetExecutionMode(out operationsMode))
            {
                feedback = new ExecutionFeedbackClass<TResult>(operationsMode);

                try
                {
                    if (operationsMode == ExecutionRecord.ExecutionMode.Primary || operationsMode == ExecutionRecord.ExecutionMode.Test)
                    {
                        primaryLogic(feedback, cancellationToken);
                    }
                    else
                    {
                        secondaryLogic(feedback, cancellationToken);
                    }
                }
                finally
                {
                    if (operationsMode == ExecutionRecord.ExecutionMode.Test)
                    {
                        if (feedback.Success && !feedback.LongRunning && feedback.RetryCount == 0)
                        {
                            Interlocked.Increment(ref _testSuccessCount);
                        }
                        else
                        {
                            Interlocked.Exchange(ref _testSuccessCount, 0);
                        }
                    }

                    _logicManager.ReportFeedback(startTime, feedback);
                }
            }
            else
            {
                throw new InvalidOperationException("Breaker has not been started.");
            }

            return feedback.Result;
        }

        /// <summary>
        /// A method called to execute code using the circuit breaker
        /// </summary>
        /// <param name="primaryLogic">The logic block to execute when the circuit breaker is in normal state</param>
        /// <param name="secondaryLogic">The logic block to execute when the circuit breaker is in tripped mode</param>
        /// <param name="cancellationToken">A cancellation token exposed to the logic block.</param>
        /// <returns>The results of the query</returns>
        public void Execute(LogicActionBlock primaryLogic, LogicActionBlock secondaryLogic, CancellationToken cancellationToken = default(CancellationToken))
        {
            Execute<object>((feedback, token) => { primaryLogic(feedback, token); }, (feedback, token) => { secondaryLogic(feedback, token); }, cancellationToken);
        }

        /// <summary>
        /// A method called to execute code using the circuit breaker
        /// </summary>
        /// <typeparam name="TResult">The return type of the logic block</typeparam>
        /// <param name="primaryLogic">The logic block to execute when the circuit breaker is in normal state</param>
        /// <param name="secondaryLogic">The logic block to execute when the circuit breaker is in tripped mode</param>
        /// <param name="cancellationToken">A cancellation token exposed to the logic block.</param>
        /// <returns>The results of the query</returns>
        public async Task<TResult> ExecuteAsync<TResult>(LogicFunctionBlockAsync<TResult> primaryLogic, LogicFunctionBlockAsync<TResult> secondaryLogic, CancellationToken cancellationToken = default(CancellationToken))
        {
            var startTime = DateTimeOffset.UtcNow;
            ExecutionFeedbackClass<TResult> feedback;
            ExecutionRecord.ExecutionMode operationsMode;

            if (GetExecutionMode(out operationsMode))
            {
                feedback = new ExecutionFeedbackClass<TResult>(operationsMode);

                try
                {
                    if (operationsMode == ExecutionRecord.ExecutionMode.Primary || operationsMode == ExecutionRecord.ExecutionMode.Test)
                    {
                        await primaryLogic(feedback, cancellationToken).ConfigureAwait(false);
                    }
                    else
                    {
                        await secondaryLogic(feedback, cancellationToken).ConfigureAwait(false);
                    }
                }
                finally
                {
                    if (operationsMode == ExecutionRecord.ExecutionMode.Test)
                    {
                        if (feedback.Success && !feedback.LongRunning && feedback.RetryCount == 0)
                        {
                            Interlocked.Increment(ref _testSuccessCount);
                        }
                        else
                        {
                            Interlocked.Exchange(ref _testSuccessCount, 0);
                        }
                    }

                    _logicManager.ReportFeedback(startTime, feedback);
                }
            }
            else
            {
                throw new InvalidOperationException("Breaker has not been started.");
            }

            return feedback.Result;
        }

        /// <summary>
        /// A method called to execute code using the circuit breaker
        /// </summary>
        /// <param name="primaryLogic">The logic block to execute when the circuit breaker is in normal state</param>
        /// <param name="secondaryLogic">The logic block to execute when the circuit breaker is in tripped mode</param>
        /// <param name="cancellationToken">A cancellation token exposed to the logic block.</param>
        /// <returns>The results of the query</returns>
        public Task ExecuteAsync(LogicActionBlockAsync primaryLogic, LogicActionBlockAsync secondaryLogic, CancellationToken cancellationToken = default(CancellationToken))
        {
            return ExecuteAsync<object>((feedback, token) => primaryLogic(feedback, cancellationToken), (feedback, token) => secondaryLogic(feedback, cancellationToken), cancellationToken);
        }

        /// <summary>
        /// A method that returns the execution mode that should be used.
        /// </summary>
        /// <returns>The execution mode of the logic</returns>
        private bool GetExecutionMode(out ExecutionRecord.ExecutionMode mode)
        {
            var currentState = State;
            var isStarted = currentState == BreakerState.Primary || currentState == BreakerState.Secondary;

            if (_overrideState)
            {
                mode = _overrideWithPrimary ? ExecutionRecord.ExecutionMode.Primary : ExecutionRecord.ExecutionMode.Secondary;
            }
            else if (currentState == BreakerState.Primary)
            {
                mode = ExecutionRecord.ExecutionMode.Primary;
            }
            else if (_testSuccessCount > 0 || _resetPolicy.ShouldTest())
            {
                mode = ExecutionRecord.ExecutionMode.Test;
            }
            else
            {
                mode = ExecutionRecord.ExecutionMode.Secondary;
            }

            return isStarted;
        }
        #endregion
        #region Callbacks
        /// <summary>
        /// A method that is called by the policies when a transition occurs.
        /// </summary>
        /// <param name="id">The id of the policy calling the method, must not be null</param>
        internal void TransitionNotification(Guid id)
        {
            lock (_transitionControl)
            {
                if (id == _tripPolicy.Id)
                {
                    State = BreakerState.Secondary;
                    _tripPolicy.StopMonitoringAsync(CancellationToken.None).GetAwaiter().GetResult();
                    _resetPolicy.StartMonitoringAsync(CancellationToken.None).GetAwaiter().GetResult();
                }
                else
                {
                    State = BreakerState.Primary;
                    _resetPolicy.StopMonitoringAsync(CancellationToken.None).GetAwaiter().GetResult();
                    _tripPolicy.StartMonitoringAsync(CancellationToken.None).GetAwaiter().GetResult();
                }
            }
        }

        /// <inheritdocs />
        public void Dispose()
        {
            if (Interlocked.Increment(ref _disposalCount) == 1)
            {
                _lifecycleControl.Dispose();

                _tripPolicy?.Dispose();
                _resetPolicy?.Dispose();
            }
        }
        #endregion
        #region Breaker State Enumeration
        /// <summary>
        /// States that the breaker can be in
        /// </summary>
        public enum BreakerState
        {
            /// <summary>
            /// The breaker has not been initialized yet
            /// </summary>
            Uninitialized = 0,

            /// <summary>
            /// The breaker is intialized but has not yet been s
            /// </summary>
            Initialized = 1,

            /// <summary>
            /// The breaker is in primary mode
            /// </summary>
            Primary = 2,

            /// <summary>
            /// The breaker has been tripped and is running secondary logic
            /// </summary>
            Secondary = 3
        }
        #endregion
        #region Logic Manager Class
        /// <summary>
        /// A class that is responsible for executing the logic and recording the results
        /// </summary>
        public sealed class BreakerLogicManager : IObservable<ExecutionRecord>
        {
            #region Variables
            /// <summary>
            /// A list of all subscribed observers.
            /// </summary>
            private readonly Dictionary<Guid, IObserver<ExecutionRecord>> _observers = new Dictionary<Guid, IObserver<ExecutionRecord>>();
            #endregion
            #region Observable Methods
            /// <inheritdocs />
            public IDisposable Subscribe(IObserver<ExecutionRecord> observer)
            {
                var unsubscriber = new BreakerLogicUnsubscriber(this);

                _observers.Add(unsubscriber.SubscriberId, observer);

                return unsubscriber;
            }

            /// <summary>
            /// Removes the subscriber
            /// </summary>
            /// <param name="subscriberId"></param>
            private void Unsubscribe(Guid subscriberId)
            {
                _observers.Remove(subscriberId);
            }
            #endregion
            #region Reporter Methods
            /// <summary>
            /// A method called to report the results of a block of logic being executed
            /// </summary>
            /// <param name="timestamp">The time the action started at</param>
            /// <param name="feedback">The feedback information</param>
            internal void ReportFeedback(DateTimeOffset timestamp, IFeedbackContainer feedback)
            {
                var observerList = _observers.Values.ToArray();

                var record = new ExecutionRecord(timestamp, feedback.Success, feedback.RetryCount, feedback.LongRunning, feedback.Mode);

                foreach (var observer in observerList)
                {
                    observer.OnNext(record);
                }
            }
            #endregion
            #region Breaker Logic Unsubscriber Class
            /// <summary>
            /// An object that when disposed of unsubscribes from an observable
            /// </summary>
            private sealed class BreakerLogicUnsubscriber : IDisposable
            {
                /// <summary>
                /// The id of the subscriber
                /// </summary>
                internal readonly Guid SubscriberId = Guid.NewGuid();

                /// <summary>
                /// The breaker that is subscribed to
                /// </summary>
                private readonly BreakerLogicManager _logicManager;

                /// <summary>
                /// Instantiates a new instance of the type.
                /// </summary>
                /// <param name="logicManager">The breaker logic manager that is subscribed to.</param>
                internal BreakerLogicUnsubscriber(BreakerLogicManager logicManager)
                {
                    _logicManager = logicManager;
                }

                /// <inheritdocs />
                public void Dispose()
                {
                    _logicManager.Unsubscribe(SubscriberId);
                }
            }

            #endregion
            #region Feedback Container Interface
            /// <summary>
            /// An interface implemented by objects that contain the feedback information to report on
            /// </summary>
            internal interface IFeedbackContainer
            {
                /// <summary>
                /// Set to true if logic executes successfully
                /// </summary>
                bool Success { get; set; }

                /// <summary>
                /// True to indicate the logic ran long
                /// </summary>
                bool LongRunning { get; set; }

                /// <summary>
                /// The number of retries required to make the logic successful
                /// </summary>
                int RetryCount { get; set; }

                /// <summary>
                /// The mode that the feedback was created for
                /// </summary>
                ExecutionRecord.ExecutionMode Mode { get; }
            }
            #endregion
        }
        #endregion
        #region Execution Feedback Class
        /// <summary>
        /// A container method to return results
        /// </summary>
        public sealed class ExecutionFeedbackClass<TResult> : BreakerLogicManager.IFeedbackContainer
        {
            /// <summary>
            /// Initializes a new instance of the type
            /// </summary>
            /// <param name="mode">The mode that the feedback was for</param>
            internal ExecutionFeedbackClass(ExecutionRecord.ExecutionMode mode)
            {
                Mode = mode;
            }

            /// <summary>
            /// Set to true if logic executes successfully
            /// </summary>
            public bool Success { get; set; }

            /// <summary>
            /// True to indicate the logic ran long
            /// </summary>
            public bool LongRunning { get; set; }

            /// <summary>
            /// The number of retries required to make the logic successful
            /// </summary>
            public int RetryCount { get; set; }

            /// <summary>
            /// The results of the query
            /// </summary>
            public TResult Result { get; set; }

            /// <summary>
            /// The mode that the feedback was for
            /// </summary>
            public ExecutionRecord.ExecutionMode Mode { get; set; }
        }
        #endregion
    }
}

That wraps up the majority of the patterns code. If I stop here there is a full framework but no logic to trip or reset. As mentioned earlier this is implemented as a set of extensible policies. The reality is that the majority of policies will contain a set of common actions and attributes making it a great opportunity for a base class. The following code is the base policy class setting up the derived classes to easily implement Reactive Extension query variations.

 namespace Ronin.CircuitBreakers.Core
{
    #region Using Clauses
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using Parameters;
    #endregion

    /// <summary>
    /// The core of the breaker policies. A breaker policy instance should only ever be associated with a single breaker. Do not reuse them.
    /// </summary>
    public abstract class BreakerPolicy : IDisposable
    {
        #region Variables
        /// <summary>
        /// A synchronization object used to manage the transitions
        /// </summary>
        private readonly SemaphoreSlim _lifecycleControl = new SemaphoreSlim(1);

        /// <summary>
        /// The number of times the instance has been disposed of
        /// </summary>
        private int _disposalCount;

        /// <summary>
        /// The stream currently being observed
        /// </summary>
        private IObservable<DateTimeOffset> _observable;

        /// <summary>
        /// The object to dispose of to stop receiving notifications from the observed stream
        /// </summary>
        private IDisposable _unsubscriber;
        #endregion
        #region Constructors
        /// <summary>
        /// Initializes a new instance of the type
        /// </summary>
        protected BreakerPolicy()
        {
            Id = Guid.NewGuid();
        }
        #endregion
        #region Properties
        /// <summary>
        /// The instance of the breaker that the policy is associated with. This is only available after the initialize has been called.
        /// </summary>
        protected Breaker Breaker { get; private set; }

        /// <summary>
        /// The instance that will provide the stream that the policy is monitoring
        /// </summary>
        protected IObservable<ExecutionRecord> Reporter { get; private set; }

        /// <summary>
        /// True if the policy has already had initialize called on it.
        /// </summary>
        protected bool IsInitialized { get; private set; }

        /// <summary>
        /// True if the policy is currently monitoring for transition events
        /// </summary>
        protected bool IsMonitoring { get; private set; }

        /// <summary>
        /// The last time the transition occurred at
        /// </summary>
        public DateTimeOffset LastTransitionTime => DateTimeOffset.MinValue;

        /// <summary>
        /// An object to pass to the breaker when it notifies of state transition.
        /// </summary>
        public Guid Id { get; }
        #endregion
        #region Methods
        /// <summary>
        /// A method that is called by the breaker to initialize the associated policies. This can only be called once enforcing the fact that only the first breaker will be used
        /// by the policy.
        /// </summary>
        /// <param name="breaker">The breaker that the policy is associated with.</param>
        /// <param name="reporter">The object reporting on logic execution</param>
        /// <param name="cancellationToken">A token to be monitored for abort requests.</param>
        internal async Task InitializePolicyAsync(Breaker breaker, IObservable<ExecutionRecord> reporter, CancellationToken cancellationToken)
        {
            Guard.NotNull(nameof(breaker), breaker);
            Guard.NotNull(nameof(reporter), reporter);

            try
            {
                await _lifecycleControl.WaitAsync(cancellationToken).ConfigureAwait(false);

                if (!IsInitialized)
                {
                    Breaker = breaker;
                    Reporter = reporter;

                    await InitializeAsync(cancellationToken).ConfigureAwait(false);
                    IsInitialized = true;
                }
            }
            finally
            {
                _lifecycleControl.Release();
            }
        }

        /// <summary>
        /// Provides the breaker with an opportunity to initialize before use. This will only be called one time.
        /// </summary>
        /// <param name="cancellationToken">A token to be monitored for abort requests.</param>
        protected virtual Task InitializeAsync(CancellationToken cancellationToken)
        {
            return Task.FromResult<object>(null);
        }

        /// <summary>
        /// Starts the policies monitoring
        /// </summary>
        /// <param name="cancellationToken">A token to be monitored for abort requests.</param>
        public async Task StartMonitoringAsync(CancellationToken cancellationToken)
        {
            try
            {
                await _lifecycleControl.WaitAsync(cancellationToken).ConfigureAwait(false);

                _unsubscriber?.Dispose();

                if (!IsMonitoring)
                {
                    _observable = GetQuery();
                    _unsubscriber = _observable.Subscribe(OnNext);
                    IsMonitoring = true;
                }
            }
            finally
            {
                _lifecycleControl.Release();
            }
        }

        /// <summary>
        /// Stops the policies monitoring
        /// </summary>
        /// <param name="cancellationToken">A token to be monitored for abort requests.</param>
        public async Task StopMonitoringAsync(CancellationToken cancellationToken)
        {
            try
            {
                await _lifecycleControl.WaitAsync(cancellationToken).ConfigureAwait(false);

                if (IsMonitoring)
                {
                    _unsubscriber?.Dispose();
                    IsMonitoring = false;
                }
            }
            finally
            {
                _lifecycleControl.Release();
            }
        }
        #endregion
        #region Observation Methods
        /// <summary>
        /// A handler that is notified when the breaker policy is triggered
        /// </summary>
        /// <param name="dateTimeOffset">The date and or time the trigger occurred at</param>
        private void OnNext(DateTimeOffset dateTimeOffset)
        {
            if (IsMonitoring) Breaker.TransitionNotification(Id);
        }
        #endregion
        #region Abstract Methods
        /// <summary>
        /// Returns the observable query that is monitored by the breaker to see if it trips
        /// </summary>
        /// <returns>The observable query</returns>
        protected abstract IObservable<DateTimeOffset> GetQuery();
        #endregion
        #region Disposal
        /// <inheritdocs />
        public void Dispose()
        {
            if (Interlocked.Increment(ref _disposalCount) == 1)
            {
                _lifecycleControl.Dispose();
            }
        }
        #endregion
    }
}

Trip and reset policies are very similar logic but used in opposing health modes. A basic trip policy base class

 namespace Ronin.CircuitBreakers.Core
{
    /// <summary>
    /// A breaker policy that is used for trip logic
    /// </summary>
    public abstract class BreakerTripPolicy : BreakerPolicy
    {
        /// <summary>
        /// Initializes a new instance of the type
        /// </summary>
        protected BreakerTripPolicy()
        {
        }
    }
}

and now for the reset base class

 namespace Ronin.CircuitBreakers.Core
{
    #region Using Clauses
    using System.Threading;
    using Parameters;
    #endregion

    /// <summary>
    /// A breaker policy that is used for reset logic
    /// </summary>
    public abstract class BreakerResetPolicy : BreakerPolicy
    {
        /// <summary>
        /// Indicates how often to test the primary logic for reset
        /// </summary>
        private readonly int _testEvery;

        /// <summary>
        /// Counts the number of times the logic has executed
        /// </summary>
        private long _executionCounter;

        /// <summary>
        /// Initializes a new instance of the type
        /// </summary>
        /// <param name="testEvery">Indicates how often to test the primary logic for reset</param>
        protected BreakerResetPolicy(int testEvery)
        {
            Guard.NotLessThan(nameof(testEvery), testEvery, 1);

            _testEvery = testEvery;
        }

        /// <summary>
        /// Returns true if the logic should execute a test
        /// </summary>
        /// <returns>True if the logic should execute a test</returns>
        internal bool ShouldTest()
        {
            var executionNumber = Interlocked.Increment(ref _executionCounter);

            return executionNumber > 0 && executionNumber < int.MaxValue && (executionNumber % _testEvery) == 0;
        }
    }
}

With the base policy classes outlined and all of the framework in place we can create our first complete trip policy. This first policy looks for a number of consecutive failures to trip. As an example if 10 failures occur in a row with no successes the breaker will trip.

 namespace Ronin.CircuitBreakers.TripPolicies
{
    #region Using Clauses
    using System;
    using System.Linq;
    using Parameters;
    using System.Reactive.Linq;
    using Core;
    #endregion

    /// <summary>
    /// A breaker trip policy that completes when the specified number of consecutive failures occur in a row
    /// </summary>
    public class ConsecutiveFailureTripPolicy : BreakerTripPolicy
    {
        #region Variables
        /// <summary>
        /// The number of successes in a row that must occur to reset
        /// </summary>
        private readonly int _consecutiveSuccesses;

        /// <summary>
        /// True if long running are counted as success
        /// </summary>
        private readonly bool _includeLongRunning;

        /// <summary>
        /// True if execution requiring retries counts as success
        /// </summary>
        private readonly bool _includeRetried;
        #endregion
        #region Constructors
        /// <summary>
        /// Initializes a new instance of the type
        /// </summary>
        /// <param name="consecutiveSuccesses">The number of failures in a row that must occur to trip</param>
        /// <param name="includeLongRunning">True if long running are counted as success</param>
        /// <param name="includeRetried">True if execution requiring retries counts as success</param>
        public ConsecutiveFailureTripPolicy(int consecutiveSuccesses, bool includeLongRunning, bool includeRetried) 
        {
            Guard.NotLessThan(nameof(consecutiveSuccesses), consecutiveSuccesses, 1);

            _consecutiveSuccesses = consecutiveSuccesses;
            _includeLongRunning = includeLongRunning;
            _includeRetried = includeRetried;
        }
        #endregion
        #region Methods
        /// <inheritdocs />
        protected override IObservable<DateTimeOffset> GetQuery()
        {
            return from resultWindow in Reporter.Where(record => record.Mode == ExecutionRecord.ExecutionMode.Primary).Buffer(_consecutiveSuccesses, 1).Timestamp()
                   let isEntireWindowSuccess = resultWindow.Value.Count(element => !element.Success || (element.LongRunning && !_includeLongRunning) ||
                   element.RetryCount > 0 || !_includeRetried) == resultWindow.Value.Count
                   where isEntireWindowSuccess
                   select resultWindow.Timestamp;
        }
        #endregion
    }
}

Another example of a trip policy would be to have one that trips if the success percent is below a defined amount in a defined period of time. If the success to failure ratio drops below 80 percent in a 1 minute sliding window trip.

 namespace Ronin.CircuitBreakers.TripPolicies
{
    #region Using Clauses
    using System;
    using System.Linq;
    using Parameters;
    using System.Reactive.Linq;
    using Core;
    #endregion

    /// <summary>
    /// A breaker trip policy that completes when the specified number of consecutive failures occur in a row
    /// </summary>
    public class SuccessPercentTripPolicy : BreakerTripPolicy
    {
        #region Variables
        /// <summary>
        /// The size of the monitoring (sliding) window that the percent must be above the minimum for
        /// </summary>
        private readonly TimeSpan _monitoringWindowsSize;

        /// <summary>
        /// The minimum number of records that must be seen in the window to consider the test valid
        /// </summary>
        private readonly int _minimumWindowsCount;

        /// <summary>
        /// The percent that if it falls below of successful it fails.
        /// </summary>
        private readonly double _minimumSuccessPercent;

        /// <summary>
        /// True if long running are counted as success
        /// </summary>
        private readonly bool _includeLongRunning;

        /// <summary>
        /// True if execution requiring retries counts as success
        /// </summary>
        private readonly bool _includeRetried;
        #endregion
        #region Constructors
        /// <summary>
        /// Initializes a new instance of the type
        /// </summary>
        /// <param name="monitoringWindowsSize">The size of the monitoring (sliding) window that the percent must be above the minimum for</param>
        /// <param name="minimumWindowsCount">True if long running are counted as success</param>
        /// <param name="minimumSuccessPercent">True if execution requiring retries counts as success</param>
        /// <param name="includeLongRunning">True if long running are counted as success</param>
        /// <param name="includeRetried">True if execution requiring retries counts as success</param>
        public SuccessPercentTripPolicy(TimeSpan monitoringWindowsSize, int minimumWindowsCount, double minimumSuccessPercent, bool includeLongRunning, bool includeRetried)
        {
            Guard.NotLessThan(nameof(minimumWindowsCount), minimumWindowsCount, 1);
            Guard.NotLessThan(nameof(minimumSuccessPercent), (int)minimumSuccessPercent, 0);
            Guard.NotMoreThan(nameof(minimumSuccessPercent), (int)minimumSuccessPercent, 100);
            Guard.NotLessThan(nameof(monitoringWindowsSize), (int)monitoringWindowsSize.TotalSeconds, 1);
            Guard.NotMoreThan(nameof(monitoringWindowsSize), (int)monitoringWindowsSize.TotalSeconds, 86400);

            _monitoringWindowsSize = monitoringWindowsSize;
            _minimumSuccessPercent = minimumSuccessPercent;
            _minimumWindowsCount = minimumWindowsCount;
            _includeLongRunning = includeLongRunning;
            _includeRetried = includeRetried;
        }
        #endregion
        #region Methods
        /// <inheritdocs />
        protected override IObservable<DateTimeOffset> GetQuery()
        {
            return from resultWindow in Reporter.Where(record => record.Mode == ExecutionRecord.ExecutionMode.Primary).Buffer(_monitoringWindowsSize, TimeSpan.FromSeconds(1)).Timestamp()
                   let windowSuccess = resultWindow.Value.Count(element => element.Success && (!element.LongRunning || _includeLongRunning) && (element.RetryCount == 0 || _includeRetried) && element.Timestamp.Add(_monitoringWindowsSize) >= DateTime.UtcNow)
                   let windowCount = resultWindow.Value.Count(element => element.Timestamp.Add(_monitoringWindowsSize) >= DateTime.UtcNow)
                   let windowRatio = (double)windowSuccess / (double)windowCount
                   where windowRatio < _minimumSuccessPercent && windowCount >= _minimumWindowsCount
                   select resultWindow.Timestamp;
        }
        #endregion
    }
}

Now for the first fully functioning reset policy. This policy will look for a defined number of consecutive successes before the breaker is reset.

 namespace Ronin.CircuitBreakers.ResetPolicies
{
    #region Using Clauses
    using System;
    using System.Linq;
    using Parameters;
    using System.Reactive.Linq;
    using Core;
    #endregion

    /// <summary>
    /// A breaker reset policy that completes when the specified number of consecutive successful executions occur in a row
    /// </summary>
    public class ConsecutiveSuccessResetPolicy : BreakerResetPolicy
    {
        #region Variables
        /// <summary>
        /// The number of successes in a row that must occur to reset
        /// </summary>
        private readonly int _consecutiveSuccesses;

        /// <summary>
        /// True if long running are counted as success
        /// </summary>
        private readonly bool _includeLongRunning;

        /// <summary>
        /// True if execution requiring retries counts as success
        /// </summary>
        private readonly bool _includeRetried;
        #endregion
        #region Constructors

        /// <summary>
        /// Initializes a new instance of the type
        /// </summary>
        /// <param name="consecutiveSuccesses">The number of successes in a row that must occur to reset</param>
        /// <param name="includeLongRunning">True if long running are counted as success</param>
        /// <param name="includeRetried">True if execution requiring retries counts as success</param>
        /// <param name="testEvery">Indicates how often to test the primary logic for reset</param>
        public ConsecutiveSuccessResetPolicy(int testEvery, int consecutiveSuccesses, bool includeLongRunning, bool includeRetried) : base(testEvery)
        {
            Guard.NotLessThan(nameof(consecutiveSuccesses), consecutiveSuccesses, 1);

            _consecutiveSuccesses = consecutiveSuccesses;
            _includeLongRunning = includeLongRunning;
            _includeRetried = includeRetried;
        }
        #endregion
        #region Methods
        /// <inheritdocs />
        protected override IObservable<DateTimeOffset> GetQuery()
        {
            return from resultWindow in Reporter.Where(record => record.Mode == ExecutionRecord.ExecutionMode.Test).Buffer(_consecutiveSuccesses, 1).Timestamp()
                   let isEntireWindowSuccess = resultWindow.Value.Count(element => element.Success && (!element.LongRunning || _includeLongRunning) &&
                   element.RetryCount == 0 || _includeRetried) == resultWindow.Value.Count
                   where isEntireWindowSuccess
                   select resultWindow.Timestamp;
        }
        #endregion
    }
}

This wraps up the framework and classes associated with my circuit breaker implementation. With the logic written and in place let's see how to use it.

             var tripPolicy = new ConsecutiveFailureTripPolicy(5, true, true);
            var resetPolicy = new ConsecutiveSuccessResetPolicy(10, 5, true, false);

            var breaker = new Breaker(tripPolicy, resetPolicy);
            await breaker.InitializeAsync().ConfigureAwait(false);
            await breaker.Start().ConfigureAwait(false);

            breaker.Execute<int>((feedback, token) =>
            {
                // Perform my logic
                feedback.Success = true;
                feedback.Result = 4;
            },
            (feedback, token) =>
            {
                // Perform secondary logic
                feedback.Success = true;
                feedback.Result = 10;
            }, CancellationToken.None);

            await breaker.Stop().ConfigureAwait(false);

            breaker.Dispose();
            tripPolicy.Dispose();
            resetPolicy.Dispose();

<< Previous     Next >>