Freigeben über


Gewusst wie: Hinzufügen und Entfernen von einzelnen Elementen zu bzw. aus einer BlockingCollection

Dieses Beispiel zeigt, wie Elemente einem BlockingCollection<T>-Element sowohl auf blockierende als auch auf nicht blockierende Weise hinzugefügt bzw. aus dem Element entfernt werden können. Weitere Informationen zu BlockingCollection<T> finden Sie unter Übersicht über BlockingCollection.

Ein Beispiel zum Aufzählen eines BlockingCollection<T>-Elements, bis es leer ist und keine Elemente mehr hinzugefügt werden können, finden Sie unter Gewusst wie: Entfernen von Elementen in einer BlockingCollection mit ForEach.

Beispiel

Dieses erste Beispiel beschreibt das Hinzufügen bzw. Entfernen von Elementen zur Blockierung der Vorgänge, sobald die Auflistung entweder vorübergehend leer (beim Entfernen), die maximale Kapazität erreicht (beim Hinzufügen) oder ein angegebenes Timeout verstrichen ist. Beachten Sie, dass die Blockierung für die maximale Kapazität nur aktiviert wird, wenn bei der Erstellung von BlockingCollection im Konstruktor eine maximale Kapazität angegeben wurde.

Option Strict On
Option Explicit On

Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks

Module SimpleBlocking

    Class Program
        Shared Sub Main()
            ' Increase or decrease this value as desired.
            Dim itemsToAdd As Integer = 500

            ' Preserve all the display output for Adds and Takes
            Console.SetBufferSize(80, (itemsToAdd * 2) + 3)

            ' A bounded collection. Increase, decrease, or remove the 
            ' maximum capacity argument to see how it impacts behavior.
            Dim numbers = New BlockingCollection(Of Integer)(50)

            ' A simple blocking consumer with no cancellation.
            Task.Factory.StartNew(Sub()
                                      Dim i As Integer = -1
                                      While numbers.IsCompleted = False
                                          Try
                                              i = numbers.Take()
                                          Catch ioe As InvalidOperationException
                                              Console.WriteLine("Adding was completed!")
                                              Exit While
                                          End Try
                                          Console.WriteLine("Take:{0} ", i)
                                          ' Simulate a slow consumer. This will cause
                                          ' collection to fill up fast and thus Adds wil block.
                                          Thread.SpinWait(100000)
                                      End While
                                      Console.WriteLine(vbCrLf & "No more items to take. Press the Enter key to exit.")
                                  End Sub)

            ' A simple blocking producer with no cancellation.
            Task.Factory.StartNew(Sub()
                                      For i As Integer = 0 To itemsToAdd
                                          numbers.Add(i)
                                          Console.WriteLine("Add:{0} Count={1}", i, numbers.Count)
                                      Next

                                      'See documentation for this method.
                                      numbers.CompleteAdding()
                                  End Sub)

            'Keep the console window open in debug mode.
            Console.ReadLine()
        End Sub
    End Class

End Module
namespace BCBlockingAccess
{
    using System;
    using System.Collections.Concurrent;


    class Program
    {
        static void Main(string[] args)
        {
            // Increase or decrease this value as desired.
            int itemsToAdd = 500;

            // Preserve all the display output for Adds and Takes
            Console.SetBufferSize(80, (itemsToAdd * 2) + 3);

            // A bounded collection. Increase, decrease, or remove the 
            // maximum capacity argument to see how it impacts behavior.
            BlockingCollection<int> numbers = new BlockingCollection<int>(50);


            // A simple blocking consumer with no cancellation.
            Task.Factory.StartNew(() =>
            {
                int i = -1;
                while (!numbers.IsCompleted)
                {
                    try
                    {
                        i = numbers.Take();
                    }
                    catch (InvalidOperationException)
                    {
                        Console.WriteLine("Adding was compeleted!");
                        break;
                    }
                    Console.WriteLine("Take:{0} ", i);

                    // Simulate a slow consumer. This will cause
                    // collection to fill up fast and thus Adds wil block.
                    Thread.SpinWait(100000);
                }

                Console.WriteLine("\r\nNo more items to take. Press the Enter key to exit.");
            });

            // A simple blocking producer with no cancellation.
            Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < itemsToAdd; i++)
                {
                    numbers.Add(i);
                    Console.WriteLine("Add:{0} Count={1}", i, numbers.Count);
                }

                // See documentation for this method.
                numbers.CompleteAdding();
            });

            // Keep the console display open in debug mode.

            Console.ReadLine();
        }
    }
}

In diesem zweiten Beispiel wird veranschaulicht, wie Sie Elemente hinzufügen bzw. entfernen, damit die Vorgänge nicht blockiert werden. Wenn kein Element vorhanden ist bzw. wenn die maximale Kapazität in einer begrenzten Auflistung erreicht wurde oder das Timeout verstrichen ist, wird vom TryAdd()-Vorgang oder TryTake()-Vorgang "false" zurückgegeben. Dies ermöglicht dem Thread, vorübergehend eine andere wichtige Aufgabe zu erledigen. Etwas später wird der Thread entweder ein neues Element abrufen oder das Element hinzufügen, das zuvor nicht hinzugefügt werden konnte. Das Programm veranschaulicht auch, wie für den Zugriff auf BlockingCollection<T> ein Abbruch implementiert wird.

Option Strict On
Option Explicit On
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks
Module NonBlockingBC


    Class NonBlockingAccess
        Shared inputs As Integer
        Shared Sub Main()
            ' The token source for issuing the cancelation request.
            Dim cts As CancellationTokenSource = New CancellationTokenSource()

            ' A blocking collection that can hold no more than 100 items at a time.
            Dim numberCollection As BlockingCollection(Of Integer) = New BlockingCollection(Of Integer)(100)

            ' Set console buffer to hold our prodigious output.
            Console.SetBufferSize(80, 2000)

            ' The simplest UI thread ever invented.
            Task.Factory.StartNew(Sub()
                                      If Console.ReadKey.KeyChar() = "c"c Then
                                          cts.Cancel()
                                      End If
                                  End Sub)
            ' Start one producer and one consumer.
            Task.Factory.StartNew(Sub() NonBlockingConsumer(numberCollection, cts.Token))
            Task.Factory.StartNew(Sub() NonBlockingProducer(numberCollection, cts.Token))


            Console.WriteLine("Press the Enter key to exit.")
            Console.ReadLine()
        End Sub

        Shared Sub NonBlockingConsumer(ByVal bc As BlockingCollection(Of Integer), ByVal ct As CancellationToken)

            ' IsCompleted is equivalent to (IsAddingCompleted And Count = 0)
            While bc.IsCompleted = False
                Dim nextItem As Integer = 0
                Try
                    If bc.TryTake(nextItem, 0, ct) Then
                        Console.WriteLine("  Take Blocked.")
                    Else
                        Console.WriteLine(" Take: {0}", nextItem)
                    End If
                Catch ex As OperationCanceledException
                    Console.WriteLine("Taking canceled.")
                    Exit While
                End Try
                'Slow down consumer just a little to cause
                ' collection to fill up faster, and lead to "AddBlocked"
                Thread.SpinWait(500000)
            End While

            Console.WriteLine(vbCrLf & "No more items to take. Press the Enter key to exit.")
        End Sub

        Shared Sub NonBlockingProducer(ByVal bc As BlockingCollection(Of Integer), ByVal ct As CancellationToken)
            Dim itemToAdd As Integer = 0
            Dim success As Boolean = False

            Do While itemToAdd < inputs
                'Cancellation causes OCE. We know how to handle it.
                Try
                    success = bc.TryAdd(itemToAdd, 2, ct)
                Catch ex As OperationCanceledException
                    Console.WriteLine("Add loop canceled.")

                    ' Let other threads know we're done in case
                    ' they aren't monitoring the cancellation token.
                    bc.CompleteAdding()
                    Exit Do
                End Try

                If success = True Then
                    Console.WriteLine(" Add:{0}", itemToAdd)
                    itemToAdd = itemToAdd + 1
                Else
                    Console.Write("  AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count)

                    ' Don't increment nextItem. Try again on next iteration
                    ' Do something else useful instead.
                    UpdateProgress(itemToAdd)
                End If
            Loop
        End Sub

        Shared Sub UpdateProgress(ByVal i As Integer)
            Dim percent As Double = (CType(i, Double) / inputs) * 100
            Console.WriteLine("Percent complete: {0}", percent)
        End Sub
    End Class

End Module
namespace BCNonBlockingWithCancellation
{
    using System;
    using System.Collections.Concurrent;

    class ProgramWithCancellation
    {

        static int inputs = 2000;
        static void Main(string[] args)
        {
            // The token source for issuing the cancelation request.
            CancellationTokenSource cts = new CancellationTokenSource();

            // A blocking collection that can hold no more than 100 items at a time.
            BlockingCollection<int> numberCollection = new BlockingCollection<int>(100);

            // Set console buffer to hold our prodigious output.
            Console.SetBufferSize(80, 2000);

            // The simplest UI thread ever invented.
            Task.Factory.StartNew(() =>
                {
                    if (Console.ReadKey().KeyChar == 'c')
                        cts.Cancel();
                });

            // Start one producer and one consumer.
            Task.Factory.StartNew(() => NonBlockingConsumer(numberCollection, cts.Token));
            Task.Factory.StartNew(() => NonBlockingProducer(numberCollection, cts.Token));


            Console.WriteLine("Press the Enter key to exit.");
            Console.ReadLine();
        }

        static void NonBlockingConsumer(BlockingCollection<int> bc, CancellationToken ct)
        {
            // IsCompleted == (IsAddingCompleted && Count == 0)
            while (!bc.IsCompleted)
            {
                int nextItem = 0;
                try
                {
                    if (!bc.TryTake(out nextItem, 0, ct))
                    {
                        Console.WriteLine(" Take Blocked");
                    }
                    else
                        Console.WriteLine(" Take:{0}", nextItem);
                }

                catch (OperationCanceledException)
                {
                    Console.WriteLine("Taking canceled.");
                    break;
                }

                // Slow down consumer just a little to cause
                // collection to fill up faster, and lead to "AddBlocked"
                Thread.SpinWait(500000);
            }

            Console.WriteLine("\r\nNo more items to take. Press the Enter key to exit.");
        }

        static void NonBlockingProducer(BlockingCollection<int> bc, CancellationToken ct)
        {
            int itemToAdd = 0;
            bool success = false;

            do
            {
                // Cancellation causes OCE. We know how to handle it.
                try
                {
                    // A shorter timeout causes more failures.
                    success = bc.TryAdd(itemToAdd, 2, ct);
                }
                catch (OperationCanceledException)
                {
                    Console.WriteLine("Add loop canceled.");
                    // Let other threads know we're done in case
                    // they aren't monitoring the cancellation token.
                    bc.CompleteAdding();
                    break;
                }

                if (success)
                {
                    Console.WriteLine(" Add:{0}", itemToAdd);
                    itemToAdd++;
                }
                else
                {
                    Console.Write(" AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count);
                    // Don't increment nextItem. Try again on next iteration.

                    //Do something else useful instead.
                    UpdateProgress(itemToAdd);
                }

            } while (itemToAdd < inputs);

            // No lock required here because only one producer.
            bc.CompleteAdding();
        }

        static void UpdateProgress(int i)
        {
            double percent = ((double)i / inputs) * 100;
            Console.WriteLine("Percent complete: {0}", percent);
        }
    }
}

Siehe auch

Referenz

System.Collections.Concurrent

Konzepte

Übersicht über BlockingCollection