Udostępnij za pośrednictwem


Ronin Building Blocks – Retries

In most systems, it is important to use retries in the case of failure. Obviously not all errors make sense to retry such as “invalid password” where there is a configuration or other change required before it will succeed. Retrying when there is no chance for success is a fool’s game!

You are probably asking yourself why to build retry logic when there are all sorts of retry mechanisms for the various technologies, a great example being Azure Storage. Well the answer is that the need to retry although typically tied to a specific technology may span technologies or be associated with business logic. I am of the strong opinion as I would assume many people are that if a technology offers an appropriate retry mechanism use it first!

The first thing we need to do as seen before is to build the initial .NET Core library and cleanup the solution. The project.json I will use targets the netstandard1.6 and net451 frameworks. The file can be seen below.

 {
  "version": "1.0.3-*",
  "name": "Ronin.Retries",
  "description": "A retry framework with common algorithms.",
  "copyright": "Chris Clayton 2016",
  "title": "Ronin Retries",
  "authors": [ "Chris Clayton" ],
  "language": "en-US",
  "buildOptions": {
    "optimize": true,
    "warningsAsErrors": true,
    "xmlDoc": true,
    "platform": "anycpu",
    "debugType": "portable"
  },
  "frameworks": {
  "netstandard1.6": { "dependencies": { "NETStandard.Library": "1.6.*" } },
  "net451": {}
  },
  "dependencies": {
    "Ronin.Parameters": "1.*"
  }
}

The project uses policies to determine when to retry and the time to delay between each attempt. An interface is used to make this extensible and called IRetryPolicy for clarity.

 namespace Ronin.Retries
 {
 #region Using Clauses
 using System;
 #endregion

    /// <summary>
 /// A retry policy used to determine sleep times and number of retries.
 /// </summary>
 public interface IRetryPolicy
 {
 /// <summary>
 /// Determines if the retry policy should attempt another retry.
 /// </summary>
 /// <param name="currentAttempt">The current attempt number (starting at 1 for the first attempt).</param>
 /// <param name="lastException">The last exception that occurred or null if none has.</param>
 /// <returns>True if the a retry should be performed.</returns>
 bool ShouldRetry(int currentAttempt, Exception lastException);

        /// <summary>
 /// Determines the delay before the next retry.
 /// </summary>
 /// <param name="currentAttempt">The current attempt number (starting at 1 for the first attempt).</param>
 /// <param name="lastException">The last exception that occurred or null if none has.</param>
 /// <returns>A TimeSpan indicating the sleep period before the next attempt.</returns>
 TimeSpan Delay(int currentAttempt, Exception lastException);
 }
 }

For transparency of retries the ability to observe the retry policy makes it easier to log failures and retry exhaustion. I decided to build a base class that all the retry implementations will derive from. First the event that is observed must be created.

 namespace Ronin.Retries
 {
 #region Using Clauses
 using System;
 #endregion

    /// <summary>
 /// A container for the retry observers
 /// </summary>
 public sealed class RetryData
 {
 #region Constructors
 /// <summary>
 /// Initializes a new instance of the RetryData type
 /// </summary>
 /// <param name="id">The id associated with the retry being executed to allow for it to be traced</param>
 /// <param name="timestamp">The time that the retry occurred at</param>
 /// <param name="attempt">The attempt number</param>
 /// <param name="last">True if there are no more retires.</param>
 /// <param name="cancelled">True if the method was cancelled.</param>
 internal RetryData(Guid id, DateTimeOffset timestamp, int attempt, bool last, bool cancelled)
 {
 Id = id;
 TimeStamp = timestamp;
 Attempt = attempt;
 Last = last;
 Cancelled = cancelled;
 }
 #endregion
 #region Properties
 /// <summary>
 /// The id associated with the retry being executed to allow for it to be traced
 /// </summary>
 public Guid Id { get; }

        /// <summary>
 /// The time that the retry occurred at
 /// </summary>
 public DateTimeOffset TimeStamp { get; }

        /// <summary>
 /// The attempt number
 /// </summary>
 public int Attempt { get; }

        /// <summary>
 /// True if there are no more retires.
 /// </summary>
 public bool Last { get; }

        /// <summary>
 /// True if the method was cancelled.
 /// </summary>
 public bool Cancelled { get; }
 #endregion
 }
 }

Now the observable implementation is created. This will be the base class all of my policies will derive from.

 namespace Ronin.Retries.Internal
 {
 #region Using Clauses
 using System;
 using System.Threading;
 using Parameters;
 using System.Collections.Concurrent;
 #endregion

    /// <summary>
 /// A retry policy that publishes observable events each time a retry is attempted.
 /// </summary>
 public abstract class ObservableRetryPolicy : IObservable<RetryData>, IRetryPolicy, IDisposable
 {
 #region Delegates
 /// <summary>
 /// Determines if the provided exeception is transient
 /// </summary>
 /// <param name="exception">The execption that is being evaluated</param>
 /// <returns>True if the exception is transient.</returns>
 public delegate bool TransientDetector(Exception exception);
 #endregion
 #region Variables
 /// <summary>
 /// The number of times the object has been requested to dispose
 /// </summary>
 private int _disposalCount;

        /// <summary>
 /// Maintains a thread safe list of observers.
 /// </summary>
 private readonly ConcurrentDictionary<Guid, IObserver<RetryData>> _observers = new ConcurrentDictionary<Guid, IObserver<RetryData>>();
 #endregion
 #region Constructors
 /// <summary>
 /// Initializes a new type of observable retry  policy
 /// </summary>
 /// <param name="maximumRetryCount">The maximum number of retries to be attempted</param>
 /// <param name="initialDelay">The duration that should delay before the first "delayed" retry in milliseconds (between 1 and 3600000 inclusive).</param>
 /// <param name="fastFirst">True if the first retry should be triggered immediately</param>
 /// <param name="detector">The transient detector to use or null if all exceptions are considered transient and will be retried.</param>
 protected ObservableRetryPolicy(int maximumRetryCount, int initialDelay, bool fastFirst, TransientDetector detector)
 {
 Guard.NotLessThan(nameof(maximumRetryCount), maximumRetryCount, 0);
 Guard.NotMoreThan(nameof(maximumRetryCount), maximumRetryCount, 1000);
 Guard.NotLessThan(nameof(initialDelay), initialDelay, 1);
 Guard.NotMoreThan(nameof(initialDelay), initialDelay, 3600000);

MaximumRetryCount = maximumRetryCount;
 InitialDelay = initialDelay;
 FastFirst = fastFirst;
 Detector = detector ?? (exception => true);
 }
 #endregion
 #region Properties
 /// <summary>
 /// True if the retry should use an immediate retry on the first attempt with no delay
 /// </summary>
 protected bool FastFirst { get; }

        /// <summary>
 /// The maximum number of retries to be attempted
 /// </summary>
 protected int MaximumRetryCount { get; }

        /// <summary>
 /// The duration that should delay before the first "delayed" retry in milliseconds (between 1 and 3600000 inclusive).
 /// </summary>
 protected int InitialDelay { get; }

        /// <summary>
 /// The transient detector that is in use
 /// </summary>
 protected TransientDetector Detector { get; }
 #endregion
 #region Methods
 /// <summary>
 /// Determines if the retry policy should attempt another retry.
 /// </summary>
 /// <param name="retryNumber">The current retry number (starts at 1)</param>
 /// <param name="lastException">The last exception that occurred or null if none has.</param>
 /// <returns>True if the a retry should be performed.</returns>
 public virtual bool ShouldRetry(int retryNumber, Exception lastException = null)
 {
 Guard.NotLessThan(nameof(retryNumber), retryNumber, 1);

            var retry = false;

            if (retryNumber <= MaximumRetryCount)
 {
 if (lastException == null || (Detector(lastException)))
 {
 retry = true;
 }
 }

            return retry;
 }

        /// <summary>
 /// Determines the delay before the next retry.
 /// </summary>
 /// <param name="currentAttempt">The current attempt number (starting at 1 for the first attempt).</param>
 /// <param name="lastException">The last exception that occurred or null if none has.</param>
 /// <returns>A TimeSpan indicating the sleep period before the next attempt.</returns>
 public abstract TimeSpan Delay(int currentAttempt, Exception lastException);

        /// <summary>
 /// Publishes a notification when if the current attempt is not the first one.
 /// </summary>
 /// <param name="attempt">The attempt number of the function call.</param>
 /// <param name="id">The id used to correlate the retries of the same logic block.</param>
 /// <param name="last">True if there are no more retries.</param>
 /// <param name="cancelled">True if the method was cancelled.</param>
 public virtual void NotifyOfRetry(int attempt, Guid id, bool last, bool cancelled)
 {
 try
 {
 var data = new RetryData(id, DateTimeOffset.UtcNow, attempt, last, cancelled);

                foreach (var observer in _observers.Values)
 {
 try
 {
 observer.OnNext(data);
 }
 catch (Exception)
 {
 // Supress errors
 }
 }
 }
 catch (Exception)
 {
 // suppress error
 }
 }
 #endregion
 #region Disposal Pattern
 /// <summary>
 /// The IDisposable interfaces Dispose method.
 /// </summary>
 public void Dispose()
 {
 if (Interlocked.Increment(ref _disposalCount) == 1)
 {
 try
 {
 foreach (var observer in _observers.Values)
 {
 try
 {
 observer.OnCompleted();
 }
 catch (Exception)
 {
 // Supress errors
 }
 }
 }
 catch (Exception)
 {
 // suppress error
 }
 }
 }

        /// <summary>
 /// Used by observers to subscribe to the stream.
 /// </summary>
 /// <param name="observer">The observer that is subscribing</param>
 /// <returns>An object that when disposed of will unsubscribe</returns>
 public IDisposable Subscribe(IObserver<RetryData> observer)
 {
 Guard.NotNull(nameof(observer), observer);

            var unsubscriber = new Unsubscriber(this);
 _observers.TryAdd(unsubscriber.Id, observer);

            return unsubscriber;
 }

        /// <summary>
 /// Called by the unsubscriber to remove the subscription from the list of observers
 /// </summary>
 /// <param name="id">The id to uniquely identify the observer</param>
 private void Unsubscribe(Guid id)
 {
 var attempt = 0;
 bool success;

            do
 {
 IObserver<RetryData> observer;
 success = _observers.TryRemove(id, out observer);
 } while (!success && ++attempt < 3);
 }
 #endregion
 #region Unsubscriber
 /// <summary>
 /// A type that is used for observers to unsubscribe
 /// </summary>
 public sealed class Unsubscriber : IDisposable
 {
 #region Variables
 /// <summary>
 /// Tracks the number of times that the Dispose() method has been called
 /// </summary>
 private int _disposalCount;

            /// <summary>
 /// The policy that the unsubscriber is observed
 /// </summary>
 private readonly ObservableRetryPolicy _policy;
 #endregion
 #region Methods
 /// <summary>
 /// Initializes a new instance of the type
 /// </summary>
 /// <param name="policy">The policy that the unsubscriber is observed</param>
 internal Unsubscriber(ObservableRetryPolicy policy)
 {
 _policy = policy;
 Id = Guid.NewGuid();
 }
 #endregion
 #region Properties
 /// <summary>
 /// The unique identifier used to identify the observer
 /// </summary>
 internal Guid Id { get; }
 #endregion
 #region Disposal
 /// <inheritdoc />
 public void Dispose()
 {
 if (Interlocked.Increment(ref _disposalCount) == 1)
 {
 _policy.Unsubscribe(Id);
 }
 }
 #endregion
 }
 #endregion
 }
 }

One of the most common implementations would be an exponential back off with randomized timing implementation. This implementation increases the delay between each attempt, increasing exponentially with the “initial delay in seconds” ^ “attempt number – 1”. The reason that this is the most common policy is to mitigate the impact of the retry wave affect. To demonstrate the reason this is the most common policy consider a linear policy that is writing to a store that is currently throttling.
If the standard write rate to the store is 1000 entities per second and it throttles all the clients that are configured with a 10 second linear retry policy. The throttling store will have a load level of 2000 entities after the first 10 seconds when the first retry occurs and 3000 after the next. This can create a convoy affect where there may not be a way for the service to eventually recover.

 namespace Ronin.Retries.Policies
 {
 #region Using Clauses
 using System;
 using Parameters;
 using Internal;
 #endregion

    /// <summary>
 /// A retry policy that uses exponential back off but randomizes each step
 /// </summary>
 public class ExponentialRandomRetryPolicy : ObservableRetryPolicy
 {
 #region Variables

        /// <summary>
 /// The maximum number of milliseconds for randomization on each attempt
 /// </summary>
 private readonly ushort _maximumRandomMilliSeconds;

        /// <summary>
 /// A random number creation instance
 /// </summary>
 private readonly Random _randomGenerator;
 #endregion
 #region Constructors
 /// <summary>
 /// Initializes the retry policy
 /// </summary>
 /// <param name="maximumRetryCount">The maximum number of retries to be attempted</param>
 /// <param name="maximumRandomMilliSeconds">The maximum number of milliseconds for randomization on each attempt</param>
 /// <param name="initialDelay">The duration that should delay before the first "delayed" retry in milliseconds (between 1 and 3600000 inclusive).</param>
 /// <param name="fastFirst">True if the first retry should be triggered immediately</param>
 /// <param name="detector">The transient detector to use or null if all exceptions are considered transient and will be retried.</param>
 public ExponentialRandomRetryPolicy(ushort maximumRandomMilliSeconds, int maximumRetryCount, int initialDelay, bool fastFirst, TransientDetector detector = null) : base(maximumRetryCount, initialDelay, fastFirst, detector)
 {
 Guard.NotLessThan(nameof(maximumRandomMilliSeconds), maximumRandomMilliSeconds, 0);

            _maximumRandomMilliSeconds = maximumRandomMilliSeconds;
 _randomGenerator = new Random();
 }
 #endregion
 #region Methods
 /// <summary>
 /// Determines the delay before the next retry.
 /// </summary>
 /// <param name="currentAttempt">The current attempt number (starting at 1 for the first attempt).</param>
 /// <param name="lastException">The last exception that occurred or null if none has.</param>
 /// <returns>A TimeSpan indicating the sleep period before the next attempt.</returns>
 public override TimeSpan Delay(int currentAttempt, Exception lastException)
 {
 Guard.NotLessThan(nameof(currentAttempt), currentAttempt, 1);

            TimeSpan delay;

            if (currentAttempt == 1 && FastFirst)
 {
 delay = TimeSpan.Zero;
 }
 else
 {
 var exponent = (currentAttempt - 1) - (FastFirst ? 1 : 0);
 var delayMilliseconds = (InitialDelay * Math.Pow(2, exponent)) - _randomGenerator.Next(_maximumRandomMilliSeconds * -1, _maximumRandomMilliSeconds);

                if (delayMilliseconds < 0) delayMilliseconds = 0;

                delay = TimeSpan.FromMilliseconds(delayMilliseconds);
 }

            return delay;
 }
 #endregion
 }
 }

The final piece is to put the actual retry logic in place that will make use of the policies and have the consumer directly interact with. The retry class I implemented enables method signatures for the consumer to execute asynchronous and synchronous methods and or functions.

 namespace Ronin.Retries
 {
 #region Using Clauses
 using System;
 using Internal;
 using System.Threading.Tasks;
 using System.Threading;
 #endregion

    /// <summary>
 /// The class that executes the logic with retries
 /// </summary>
 public static class Retry
 {
 #region Delegates
 /// <summary>
 /// The signature for a logic block that is a synchronous function
 /// </summary>
 /// <typeparam name="TResult">The result that will be returned</typeparam>
 /// <param name="id">The identifier of the logic block used to correlate retries</param>
 /// <param name="cancellationToken">The cancellation token that should be monitored for abort requests</param>
 /// <returns>The results of the logic block</returns>
 // ReSharper disable once TypeParameterCanBeVariant
 public delegate TResult LogicFunction<TResult>(Guid id, CancellationToken cancellationToken);

        /// <summary>
 /// The signature for a logic block that is a synchronous action
 /// </summary>
 /// <param name="id">The identifier of the logic block used to correlate retries</param>
 /// <param name="cancellationToken">The cancellation token that should be monitored for abort requests</param>
 public delegate void LogicAction(Guid id, CancellationToken cancellationToken);

        /// <summary>
 /// The signature for a logic block that is a asynchronous function
 /// </summary>
 /// <typeparam name="TResult">The result that will be returned</typeparam>
 /// <param name="id">The identifier of the logic block used to correlate retries</param>
 /// <param name="cancellationToken">The cancellation token that should be monitored for abort requests</param>
 /// <returns>The results of the logic block</returns>
 // ReSharper disable once TypeParameterCanBeVariant
 public delegate Task<TResult> LogicFunctionAsync<TResult>(Guid id, CancellationToken cancellationToken);

        /// <summary>
 /// The signature for a logic block that is a asynchronous action
 /// </summary>
 /// <param name="id">The identifier of the logic block used to correlate retries</param>
 /// <param name="cancellationToken">The cancellation token that should be monitored for abort requests</param>
 public delegate Task LogicActionAsync(Guid id, CancellationToken cancellationToken);
 #endregion
 #region Methods
 /// <summary>
 /// Retries an action if exceptions are raised or the return result is not considered valid
 /// </summary>
 /// <param name="logic">The function that is to be retried as appropriate</param>
 /// <param name="policy">The retry policy in use</param>
 /// <param name="id">The unique identifier used to correlate retries to logic blocks</param>
 /// <param name="cancellationToken">An optional cancellation token that is passed to the function and stops retries</param>
 /// <returns>The results of the logic block.</returns>
 public static void Execute(LogicAction logic, IRetryPolicy policy, Guid id = default(Guid), CancellationToken cancellationToken = default(CancellationToken))
 {
 Execute<object>((identifier, token) =>
 {
 logic(identifier, token);

                return null;
 }, policy, o => true, id, cancellationToken);
 }

        /// <summary>
 /// Retries a function if exceptions are raised or the return result is not considered valid
 /// </summary>
 /// <typeparam name="TResult">The return type of the logic block</typeparam>
 /// <param name="logic">The function that is to be retried as appropriate</param>
 /// <param name="policy">The retry policy in use</param>
 /// <param name="id">The unique identifier used to correlate retries to logic blocks</param>
 /// <param name="validation">An optional function that validates the return results of the logic block</param>
 /// <param name="cancellationToken">An optional cancellation token that is passed to the function and stops retries</param>
 /// <returns>The results of the logic block.</returns>
 public static TResult Execute<TResult>(LogicFunction<TResult> logic, IRetryPolicy policy, Func<TResult, bool> validation = null, Guid id = default(Guid), CancellationToken cancellationToken = default(CancellationToken))
 {
 var result = default(TResult);
 var attempt = 0;
 bool shouldRetry;
 var observablePolicy = policy as ObservableRetryPolicy;
 Exception lastException = null;

            do
 {
 try
 {
 if (attempt > 0)
 {
 Task.Delay(policy.Delay(attempt, lastException), cancellationToken).Wait(cancellationToken);
 observablePolicy?.NotifyOfRetry(attempt, id, false, false);
 }

                    result = logic(id, cancellationToken);

                    if (validation == null || validation(result))
 {
 shouldRetry = false;
 }
 else
 {
 if (cancellationToken.IsCancellationRequested)
 {
 shouldRetry = false;
 observablePolicy?.NotifyOfRetry(attempt, id, true, true);
 }
 else
 {
 shouldRetry = policy.ShouldRetry(attempt + 1, null);
 if (!shouldRetry) observablePolicy?.NotifyOfRetry(attempt, id, true, false);
 }
 }
 }
 catch (Exception e)
 {
 lastException = e;

                    if (cancellationToken.IsCancellationRequested)
 {
 shouldRetry = false;
 observablePolicy?.NotifyOfRetry(attempt, id, true, true);
 }
 else
 {
 shouldRetry = policy.ShouldRetry(attempt + 1, e);

                        if (!shouldRetry)
 {
 observablePolicy?.NotifyOfRetry(attempt, id, true, false);
 throw;
 }
 }
 }

                attempt++;
 } while (shouldRetry);

            return result;
 }

        /// <summary>
 /// Retries an action if exceptions are raised or the return result is not considered valid
 /// </summary>
 /// <param name="logic">The function that is to be retried as appropriate</param>
 /// <param name="policy">The retry policy in use</param>
 /// <param name="id">The unique identifier used to correlate retries to logic blocks</param>
 /// <param name="cancellationToken">An optional cancellation token that is passed to the function and stops retries</param>
 /// <returns>The results of the logic block.</returns>
 public static Task ExecuteAsync(LogicActionAsync logic, IRetryPolicy policy, Guid id = default(Guid), CancellationToken cancellationToken = default(CancellationToken))
 {
 return ExecuteAsync<object>(async (identifier, token) =>
 {
 await logic(identifier, token).ConfigureAwait(false);

                return null;
 }, policy, o => true, id, cancellationToken);
 }

/// <summary>
 /// Retries a function if exceptions are raised or the return result is not considered valid
 /// </summary>
 /// <typeparam name="TResult">The return type of the logic block</typeparam>
 /// <param name="logic">The function that is to be retried as appropriate</param>
 /// <param name="policy">The retry policy in use</param>
 /// <param name="id">The unique identifier used to correlate retries to logic blocks</param>
 /// <param name="validation">An optional function that validates the return results of the logic block</param>
 /// <param name="cancellationToken">An optional cancellation token that is passed to the function and stops retries</param>
 /// <returns>The results of the logic block.</returns>
 public static async Task<TResult> ExecuteAsync<TResult>(LogicFunctionAsync<TResult> logic, IRetryPolicy policy, Func<TResult, bool> validation = null, Guid id = default(Guid), CancellationToken cancellationToken = default(CancellationToken))
 {
 var result = default(TResult);
 var attempt = 0;
 bool shouldRetry;
 var observablePolicy = policy as ObservableRetryPolicy;
 Exception lastException = null;

            do
 {
 try
 {
 if (attempt > 0)
 {
 await Task.Delay(policy.Delay(attempt, lastException), cancellationToken).ConfigureAwait(false);
 observablePolicy?.NotifyOfRetry(attempt, id, false, false);
 }

                    result = await logic(id, cancellationToken).ConfigureAwait(false);

                    if (validation == null || validation(result))
 {
 shouldRetry = false;
 }
 else
 {
 if (cancellationToken.IsCancellationRequested)
 {
 shouldRetry = false;
 observablePolicy?.NotifyOfRetry(attempt, id, true, true);
 }
 else
 {
 shouldRetry = policy.ShouldRetry(attempt + 1, null);
 if (!shouldRetry) observablePolicy?.NotifyOfRetry(attempt, id, true, false);
 }
 }
 }
 catch (Exception e)
 {
 lastException = e;

                    if (cancellationToken.IsCancellationRequested)
 {
 shouldRetry = false;
 observablePolicy?.NotifyOfRetry(attempt, id, true, true);
 }
 else
 {
 shouldRetry = policy.ShouldRetry(attempt + 1, e);

                        if (!shouldRetry)
 {
 observablePolicy?.NotifyOfRetry(attempt, id, true, false);
 throw;
 }
 }
 }

                attempt++;
 } while (shouldRetry);

            return result;
 }

        #endregion
 }
 }

That wraps up this code block, and you use it as follows

 var policy = new LinearRetryPolicy(maximumRetries, 2000, false);
 var tokenSource = new CancellationTokenSource();

try
 {
 resultingValue = Retry.Execute((id, token) =>
 {
 if (Interlocked.Increment(ref logicCounter) <= failureCount)
 {
 throw new Exception("Oops");
 }

        return 1;
 }, policy, null, Guid.NewGuid(), tokenSource.Token);
 }
 catch (Exception e)
 {
 exception = e;
 }
 finally
 {
 endTime = DateTime.UtcNow;
 }

<< Previous     Next >>