다음을 통해 공유


방법: 동시성 수준을 제한하는 작업 스케줄러 만들기

드물기는 하지만 System.Threading.Tasks.TaskScheduler 클래스에서 파생되는 사용자 지정 작업 스케줄러를 만들어 수행 속도를 향상시킬 수도 있습니다. 스케줄러를 만든 다음 System.Threading.Tasks.ParallelOptions 열거형을 사용하여 For 메서드 또는 ForEach() 메서드에서 이 스케줄러를 지정할 수 있습니다. Task 개체를 직접 사용하는 경우에는 입력 매개 변수로 TaskScheduler를 받아들이는 TaskFactory 생성자를 사용하거나 StartNew() 등의 다른 수단을 사용하여 사용자 지정 스케줄러를 지정할 수 있습니다.

사용자 지정 스케줄러는 엄격한 FIFO(선입선출) 실행 순서 같은 기본 스케줄러에서 제공하지 않는 기능을 얻기 위해 사용할 수도 있습니다. 다음 예제에서는 사용자 지정 작업 스케줄러를 만드는 방법을 보여 줍니다. 이 스케줄러를 사용하면 동시성 수준을 지정할 수 있습니다.

예제

다음 예제는 MSDN Code Gallery 웹 사이트의 Parallel Extensions Samples에서 발췌한 것입니다.

Imports System
Imports System.Collections.Generic
Imports System.Linq
Imports System.Threading
Imports System.Threading.Tasks
Module Module2


    Sub Main()

        ' Create a scheduler that uses only one thread.
        Dim lcts As New LimitedConcurrencyLevelTaskScheduler(1)

        ' Create a TaskFactory and pass it our custom scheduler.
        Dim factory As New TaskFactory(lcts)
        Dim cts As New CancellationTokenSource()

        ' Use our factory to run a task.
        Dim t As Task = factory.StartNew(Sub()
                                             For i As Integer = 1 To 50000
                                                 Console.Write("{0} on thread {1}.   ", i, Thread.CurrentThread.ManagedThreadId)
                                             Next
                                         End Sub,
                                cts.Token)

        Console.WriteLine("Press any key to exit.")
        Console.ReadKey()

    End Sub


    ''' <summary>
    ''' Provides a task scheduler that ensures a maximum concurrency level While
    ''' running on top of the ThreadPool.
    ''' </summary>
    Public Class LimitedConcurrencyLevelTaskScheduler
        Inherits TaskScheduler

        ''' <summary>Whether the current thread is processing work items.</summary>
        <ThreadStatic()>
        Private Shared _currentThreadIsProcessingItems As Boolean
        ''' <summary>The list of tasks to be executed.</summary>
        Private ReadOnly _tasks As LinkedList(Of Task) = New LinkedList(Of Task)() ' protected by lock(_tasks)
        ''' <summary>The maximum concurrency level allowed by this scheduler.</summary>
        Private ReadOnly _maxDegreeOfParallelism As Integer
        ''' <summary>Whether the scheduler is currently processing work items.</summary>
        Private _delegatesQueuedOrRunning As Integer = 0 ' protected by lock(_tasks)

        ''' <summary>
        ''' Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
        ''' specified degree of parallelism.
        ''' </summary>
        ''' <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
        Public Sub New(ByVal maxDegreeOfParallelism As Integer)

            If (maxDegreeOfParallelism < 1) Then
                Throw New ArgumentOutOfRangeException("maxDegreeOfParallelism")
            End If
            _maxDegreeOfParallelism = maxDegreeOfParallelism

        End Sub
        ''' <summary>Queues a task to the scheduler.</summary>
        ''' <param name="t">The task to be queued.</param>
        Protected Overrides Sub QueueTask(ByVal t As Task)

            ' Add the task to the list of tasks to be processed.  If there aren't enough
            ' delegates currently queued or running to process tasks, schedule another.
            SyncLock (_tasks)

                _tasks.AddLast(t)
                If (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) Then

                    _delegatesQueuedOrRunning = _delegatesQueuedOrRunning + 1
                    NotifyThreadPoolOfPendingWork()
                End If
            End SyncLock
        End Sub

        ''' <summary>
        ''' Informs the ThreadPool that there's work to be executed for this scheduler.
        ''' </summary>
        Private Sub NotifyThreadPoolOfPendingWork()

            ThreadPool.UnsafeQueueUserWorkItem(Sub()
                                                   ' Note that the current thread is now processing work items.
                                                   ' This is necessary to enable inlining of tasks into this thread.
                                                   _currentThreadIsProcessingItems = True
                                                   Try

                                                       ' Process all available items in the queue.
                                                       While (True)
                                                           Dim item As Task
                                                           SyncLock (_tasks)
                                                               ' When there are no more items to be processed,
                                                               ' note that we're done processing, and get out.
                                                               If (_tasks.Count = 0) Then
                                                                   _delegatesQueuedOrRunning = _delegatesQueuedOrRunning - 1
                                                                   Exit While
                                                               End If

                                                               ' Get the next item from the queue
                                                               item = _tasks.First.Value
                                                               _tasks.RemoveFirst()
                                                           End SyncLock

                                                           ' Execute the task we pulled out of the queue
                                                           MyBase.TryExecuteTask(item)

                                                       End While
                                                       ' We're done processing items on the current thread
                                                   Finally
                                                       _currentThreadIsProcessingItems = False
                                                   End Try
                                               End Sub,
                                          Nothing)
        End Sub

        ''' <summary>Attempts to execute the specified task on the current thread.</summary>
        ''' <param name="task">The task to be executed.</param>
        ''' <param name="taskWasPreviouslyQueued"></param>
        ''' <returns>Whether the task could be executed on the current thread.</returns>
        Protected Overrides Function TryExecuteTaskInline(ByVal t As Task, ByVal taskWasPreviouslyQueued As Boolean) As Boolean

            ' If this thread isn't already processing a task, we don't support inlining
            If (Not _currentThreadIsProcessingItems) Then
                Return False
            End If

            ' If the task was previously queued, remove it from the queue
            If (taskWasPreviouslyQueued) Then
                TryDequeue(t)
            End If
            ' Try to run the task.
            Return MyBase.TryExecuteTask(t)
        End Function

        ''' <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
        ''' <param name="t">The task to be removed.</param>
        ''' <returns>Whether the task could be found and removed.</returns>
        Protected Overrides Function TryDequeue(ByVal t As Task) As Boolean

            SyncLock (_tasks)
                Return _tasks.Remove(t)
            End SyncLock

        End Function

        ''' <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
        Public Overrides ReadOnly Property MaximumConcurrencyLevel As Integer

            Get
                Return _maxDegreeOfParallelism
            End Get
        End Property

        ''' <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
        ''' <returns>An enumerable of the tasks currently scheduled.</returns>
        Protected Overrides Function GetScheduledTasks() As IEnumerable(Of Task)

            Dim lockTaken As Boolean = False
            Try

                Monitor.TryEnter(_tasks, lockTaken)
                If (lockTaken) Then
                    Return _tasks.ToArray()
                Else
                    Throw New NotSupportedException()
                End If
            Finally

                If (lockTaken) Then
                    Monitor.Exit(_tasks)
                End If
            End Try
        End Function
    End Class
End Module
namespace System.Threading.Tasks.Schedulers
{

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;

    class Program
    {
        static void Main()
        {
            LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
            TaskFactory factory = new TaskFactory(lcts);

            factory.StartNew(()=> 
                {
                    for (int i = 0; i < 500; i++)
                    {
                        Console.Write("{0} on thread {1}", i, Thread.CurrentThread.ManagedThreadId);
                    }
                }
            );

            Console.ReadKey();
        }
    }

    /// <summary>
    /// Provides a task scheduler that ensures a maximum concurrency level while
    /// running on top of the ThreadPool.
    /// </summary>
    public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
    {
        /// <summary>Whether the current thread is processing work items.</summary>
        [ThreadStatic]
        private static bool _currentThreadIsProcessingItems;
        /// <summary>The list of tasks to be executed.</summary>
        private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
        /// <summary>The maximum concurrency level allowed by this scheduler.</summary>
        private readonly int _maxDegreeOfParallelism;
        /// <summary>Whether the scheduler is currently processing work items.</summary>
        private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)

        /// <summary>
        /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
        /// specified degree of parallelism.
        /// </summary>
        /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
        public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
        {
            if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
            _maxDegreeOfParallelism = maxDegreeOfParallelism;
        }

        /// <summary>Queues a task to the scheduler.</summary>
        /// <param name="task">The task to be queued.</param>
        protected sealed override void QueueTask(Task task)
        {
            // Add the task to the list of tasks to be processed.  If there aren't enough
            // delegates currently queued or running to process tasks, schedule another.
            lock (_tasks)
            {
                _tasks.AddLast(task);
                if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
                {
                    ++_delegatesQueuedOrRunning;
                    NotifyThreadPoolOfPendingWork();
                }
            }
        }

        /// <summary>
        /// Informs the ThreadPool that there's work to be executed for this scheduler.
        /// </summary>
        private void NotifyThreadPoolOfPendingWork()
        {
            ThreadPool.UnsafeQueueUserWorkItem(_ =>
            {
                // Note that the current thread is now processing work items.
                // This is necessary to enable inlining of tasks into this thread.
                _currentThreadIsProcessingItems = true;
                try
                {
                    // Process all available items in the queue.
                    while (true)
                    {
                        Task item;
                        lock (_tasks)
                        {
                            // When there are no more items to be processed,
                            // note that we're done processing, and get out.
                            if (_tasks.Count == 0)
                            {
                                --_delegatesQueuedOrRunning;
                                break;
                            }

                            // Get the next item from the queue
                            item = _tasks.First.Value;
                            _tasks.RemoveFirst();
                        }

                        // Execute the task we pulled out of the queue
                        base.TryExecuteTask(item);
                    }
                }
                // We're done processing items on the current thread
                finally { _currentThreadIsProcessingItems = false; }
            }, null);
        }

        /// <summary>Attempts to execute the specified task on the current thread.</summary>
        /// <param name="task">The task to be executed.</param>
        /// <param name="taskWasPreviouslyQueued"></param>
        /// <returns>Whether the task could be executed on the current thread.</returns>
        protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // If this thread isn't already processing a task, we don't support inlining
            if (!_currentThreadIsProcessingItems) return false;

            // If the task was previously queued, remove it from the queue
            if (taskWasPreviouslyQueued) TryDequeue(task);

            // Try to run the task.
            return base.TryExecuteTask(task);
        }

        /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
        /// <param name="task">The task to be removed.</param>
        /// <returns>Whether the task could be found and removed.</returns>
        protected sealed override bool TryDequeue(Task task)
        {
            lock (_tasks) return _tasks.Remove(task);
        }

        /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
        public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }

        /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
        /// <returns>An enumerable of the tasks currently scheduled.</returns>
        protected sealed override IEnumerable<Task> GetScheduledTasks()
        {
            bool lockTaken = false;
            try
            {
                Monitor.TryEnter(_tasks, ref lockTaken);
                if (lockTaken) return _tasks.ToArray();
                else throw new NotSupportedException();
            }
            finally
            {
                if (lockTaken) Monitor.Exit(_tasks);
            }
        }
    }
}

참고 항목

개념

작업 스케줄러