Postupy: Přidávání odebírání jednotlivých položek v BlockingCollection
Tento příklad ukazuje, jak přidávat a odebírat položky z BlockingCollection<T> blokujícího i neblokujícího způsobu. Další informace naleznete v BlockingCollection<T>tématu BlockingCollection Overview.
Příklad, jak vytvořit výčet BlockingCollection<T> , dokud nebude prázdný a nebudou přidány žádné další prvky, naleznete v tématu Postupy: Použití příkazu ForEach k odebrání položek v BlockingCollection.
Příklad 1
Tento první příklad ukazuje, jak přidat a převzít položky, aby operace blokovaly, pokud je kolekce dočasně prázdná (při pořizování) nebo maximální kapacita (při přidávání) nebo pokud uplynulo zadané časové období. Všimněte si, že blokování maximální kapacity je povoleno pouze při vytvoření BlockingCollection s maximální kapacitou zadanou v konstruktoru.
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main()
{
// Increase or decrease this value as desired.
int itemsToAdd = 500;
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
int width = Math.Max(Console.BufferWidth, 80);
int height = Math.Max(Console.BufferHeight, itemsToAdd * 2 + 3);
// Preserve all the display output for Adds and Takes
Console.SetBufferSize(width, height);
}
// A bounded collection. Increase, decrease, or remove the
// maximum capacity argument to see how it impacts behavior.
var numbers = new BlockingCollection<int>(50);
// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
int i = -1;
while (!numbers.IsCompleted)
{
try
{
i = numbers.Take();
}
catch (InvalidOperationException)
{
Console.WriteLine("Adding was completed!");
break;
}
Console.WriteLine("Take:{0} ", i);
// Simulate a slow consumer. This will cause
// collection to fill up fast and thus Adds wil block.
Thread.SpinWait(100000);
}
Console.WriteLine("\r\nNo more items to take. Press the Enter key to exit.");
});
// A simple blocking producer with no cancellation.
Task.Run(() =>
{
for (int i = 0; i < itemsToAdd; i++)
{
numbers.Add(i);
Console.WriteLine("Add:{0} Count={1}", i, numbers.Count);
}
// See documentation for this method.
numbers.CompleteAdding();
});
// Keep the console display open in debug mode.
Console.ReadLine();
}
}
Option Strict On
Option Explicit On
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks
Module SimpleBlocking
Class Program
Shared Sub Main()
' Increase or decrease this value as desired.
Dim itemsToAdd As Integer = 500
' Preserve all the display output for Adds and Takes
Console.SetBufferSize(80, (itemsToAdd * 2) + 3)
' A bounded collection. Increase, decrease, or remove the
' maximum capacity argument to see how it impacts behavior.
Dim numbers = New BlockingCollection(Of Integer)(50)
' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
Dim i As Integer = -1
While numbers.IsCompleted = False
Try
i = numbers.Take()
Catch ioe As InvalidOperationException
Console.WriteLine("Adding was completed!")
Exit While
End Try
Console.WriteLine("Take:{0} ", i)
' Simulate a slow consumer. This will cause
' collection to fill up fast and thus Adds wil block.
Thread.SpinWait(100000)
End While
Console.WriteLine(vbCrLf & "No more items to take. Press the Enter key to exit.")
End Sub)
' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
For i As Integer = 0 To itemsToAdd
numbers.Add(i)
Console.WriteLine("Add:{0} Count={1}", i, numbers.Count)
Next
'See documentation for this method.
numbers.CompleteAdding()
End Sub)
'Keep the console window open in debug mode.
Console.ReadLine()
End Sub
End Class
End Module
Příklad 2
Tento druhý příklad ukazuje, jak přidat a převzít položky, aby operace neblokovaly. Pokud není k dispozici žádná položka, byla dosažena maximální kapacita v vázané kolekci nebo uplynul časový limit, TryAdd vrátí operace TryTake hodnotu false. To umožňuje, aby vlákno nějakou dobu udělalo nějakou další užitečnou práci a zkuste to znovu později a buď načíst novou položku, nebo zkuste přidat stejnou položku, kterou nelze přidat dříve. Program také ukazuje, jak implementovat zrušení při přístupu k .BlockingCollection<T>
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
class ProgramWithCancellation
{
static int inputs = 2000;
static void Main()
{
// The token source for issuing the cancelation request.
var cts = new CancellationTokenSource();
// A blocking collection that can hold no more than 100 items at a time.
var numberCollection = new BlockingCollection<int>(100);
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
int width = Math.Max(Console.BufferWidth, 80);
int height = Math.Max(Console.BufferHeight, 8000);
// Preserve all the display output for Adds and Takes
Console.SetBufferSize(width, height);
}
// The simplest UI thread ever invented.
Task.Run(() =>
{
if (Console.ReadKey(true).KeyChar == 'c')
{
cts.Cancel();
}
});
// Start one producer and one consumer.
Task t1 = Task.Run(() => NonBlockingConsumer(numberCollection, cts.Token));
Task t2 = Task.Run(() => NonBlockingProducer(numberCollection, cts.Token));
// Wait for the tasks to complete execution
Task.WaitAll(t1, t2);
cts.Dispose();
Console.WriteLine("Press the Enter key to exit.");
Console.ReadLine();
}
static void NonBlockingConsumer(BlockingCollection<int> bc, CancellationToken ct)
{
// IsCompleted == (IsAddingCompleted && Count == 0)
while (!bc.IsCompleted)
{
int nextItem = 0;
try
{
if (!bc.TryTake(out nextItem, 0, ct))
{
Console.WriteLine(" Take Blocked");
}
else
{
Console.WriteLine(" Take:{0}", nextItem);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Taking canceled.");
break;
}
// Slow down consumer just a little to cause
// collection to fill up faster, and lead to "AddBlocked"
Thread.SpinWait(500000);
}
Console.WriteLine("\r\nNo more items to take.");
}
static void NonBlockingProducer(BlockingCollection<int> bc, CancellationToken ct)
{
int itemToAdd = 0;
bool success = false;
do
{
// Cancellation causes OCE. We know how to handle it.
try
{
// A shorter timeout causes more failures.
success = bc.TryAdd(itemToAdd, 2, ct);
}
catch (OperationCanceledException)
{
Console.WriteLine("Add loop canceled.");
// Let other threads know we're done in case
// they aren't monitoring the cancellation token.
bc.CompleteAdding();
break;
}
if (success)
{
Console.WriteLine(" Add:{0}", itemToAdd);
itemToAdd++;
}
else
{
Console.Write(" AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count);
// Don't increment nextItem. Try again on next iteration.
//Do something else useful instead.
UpdateProgress(itemToAdd);
}
} while (itemToAdd < inputs);
// No lock required here because only one producer.
bc.CompleteAdding();
}
static void UpdateProgress(int i)
{
double percent = ((double)i / inputs) * 100;
Console.WriteLine("Percent complete: {0}", percent);
}
}
Option Strict On
Option Explicit On
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks
Class NonBlockingAccess
Shared inputs As Integer = 2000
Shared Sub Main()
' The token source for issuing the cancelation request.
Dim cts As New CancellationTokenSource()
' A blocking collection that can hold no more than 100 items at a time.
Dim numberCollection As BlockingCollection(Of Integer) = New BlockingCollection(Of Integer)(100)
' Set console buffer to hold our prodigious output.
Console.SetBufferSize(80, 2000)
' The simplest UI thread ever invented.
Task.Run(Sub()
If Console.ReadKey.KeyChar() = "c"c Then
cts.Cancel()
End If
End Sub)
' Start one producer and one consumer.
Dim t1 As Task = Task.Run(Sub() NonBlockingConsumer(numberCollection, cts.Token))
Dim t2 As Task = Task.Run(Sub() NonBlockingProducer(numberCollection, cts.Token))
' Wait for the tasks to complete execution
Task.WaitAll(t1, t2)
cts.Dispose()
Console.WriteLine("Press the Enter key to exit.")
Console.ReadLine()
End Sub
Shared Sub NonBlockingConsumer(ByVal bc As BlockingCollection(Of Integer), ByVal ct As CancellationToken)
' IsCompleted is equivalent to (IsAddingCompleted And Count = 0)
While bc.IsCompleted = False
Dim nextItem As Integer = 0
Try
If bc.TryTake(nextItem, 0, ct) = False Then
Console.WriteLine(" Take Blocked.")
Else
Console.WriteLine(" Take: {0}", nextItem)
End If
Catch ex As OperationCanceledException
Console.WriteLine("Taking canceled.")
Exit While
End Try
'Slow down consumer just a little to cause
' collection to fill up faster, and lead to "AddBlocked"
Thread.SpinWait(500000)
End While
Console.WriteLine(vbCrLf & "No more items to take.")
End Sub
Shared Sub NonBlockingProducer(ByVal bc As BlockingCollection(Of Integer), ByVal ct As CancellationToken)
Dim itemToAdd As Integer = 0
Dim success As Boolean = False
Do
'Cancellation causes OCE. We know how to handle it.
Try
success = bc.TryAdd(itemToAdd, 2, ct)
Catch ex As OperationCanceledException
Console.WriteLine("Add loop canceled.")
' Let other threads know we're done in case
' they aren't monitoring the cancellation token.
bc.CompleteAdding()
Exit Do
End Try
If success = True Then
Console.WriteLine(" Add:{0}", itemToAdd)
itemToAdd = itemToAdd + 1
Else
Console.Write(" AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count)
' Don't increment nextItem. Try again on next iteration
' Do something else useful instead.
UpdateProgress(itemToAdd)
End If
Loop While itemToAdd < inputs
' No lock required here because only one producer.
bc.CompleteAdding()
End Sub
Shared Sub UpdateProgress(ByVal i As Integer)
Dim percent As Double = (CType(i, Double) / inputs) * 100
Console.WriteLine("Percent complete: {0}", percent)
End Sub
End Class