Udostępnij za pośrednictwem


Biblioteka System.Threading.Channels

System.Threading.Channels Przestrzeń nazw udostępnia zestaw struktur danych synchronizacji do przekazywania danych między producentami a konsumentami asynchronicznie. Biblioteka jest przeznaczona dla platformy .NET Standard i działa we wszystkich implementacjach platformy .NET.

Ta biblioteka jest dostępna w pakiecie NuGet System.Threading.Channels . Jeśli jednak używasz platformy .NET Core 3.0 lub nowszej, pakiet jest dołączany jako część struktury.

Model programowania koncepcyjnego producenta/konsumenta

Kanały to implementacja modelu programowania koncepcyjnego producenta/konsumenta. W tym modelu programowania producenci asynchronicznie tworzą dane, a konsumenci asynchronicznie zużywają te dane. Innymi słowy, ten model przekazuje dane z jednej strony do innej za pośrednictwem kolejki pierwszy na wejściu (FIFO). Spróbuj myśleć o kanałach, tak jak w przypadku innych typowych typów kolekcji ogólnych, takich jak List<T>. Podstawową różnicą jest to, że ta kolekcja zarządza synchronizacją i udostępnia różne modele zużycia za pośrednictwem opcji tworzenia fabryki. Te opcje kontrolują zachowanie kanałów, takie jak liczba elementów, które mogą być przechowywane, i co się stanie w przypadku osiągnięcia tego limitu, lub czy kanał jest uzyskiwany przez wielu producentów lub wielu odbiorców jednocześnie.

Strategie ograniczenia

W zależności od sposobu Channel<T> tworzenia elementu jego czytelnik i składnik zapisywania zachowują się inaczej.

Aby utworzyć kanał, który określa maksymalną pojemność, wywołaj metodę Channel.CreateBounded. Aby utworzyć kanał używany przez dowolną liczbę czytelników i autorów jednocześnie, wywołaj metodę Channel.CreateUnbounded. Każda strategia ograniczenia uwidacznia różne opcje zdefiniowane przez twórcę lub BoundedChannelOptions UnboundedChannelOptions odpowiednio.

Uwaga

Niezależnie od strategii ograniczenia kanał zawsze będzie zgłaszać ChannelClosedException wartość po jej zamknięciu.

Niezwiązane kanały

Aby utworzyć niezwiązany kanał, wywołaj jedno z Channel.CreateUnbounded przeciążeń:

var channel = Channel.CreateUnbounded<T>();

Podczas tworzenia niezwiązanego kanału domyślnie kanał może być używany przez dowolną liczbę czytelników i składników zapisywania jednocześnie. Alternatywnie można określić zachowanie niezdefiniowane podczas tworzenia niezwiązanego kanału, udostępniając UnboundedChannelOptions wystąpienie. Pojemność kanału jest niezwiązana, a wszystkie operacje zapisu są wykonywane synchronicznie. Aby uzyskać więcej przykładów, zobacz Niezwiązane wzorce tworzenia.

Ograniczone kanały

Aby utworzyć ograniczony kanał, wywołaj jedno z Channel.CreateBounded przeciążeń:

var channel = Channel.CreateBounded<T>(7);

Powyższy kod tworzy kanał, który ma maksymalną pojemność 7 elementów. Podczas tworzenia ograniczonego kanału kanał jest powiązany z maksymalną pojemnością. Po osiągnięciu ograniczenia domyślne zachowanie polega na tym, że kanał asynchronicznie blokuje producenta do momentu udostępnienia miejsca. To zachowanie można skonfigurować, określając opcję podczas tworzenia kanału. Ograniczone kanały można tworzyć z dowolną wartością pojemności większą niż zero. Aby zapoznać się z innymi przykładami, zobacz Powiązane wzorce tworzenia.

Zachowanie trybu pełnego

W przypadku korzystania z ograniczonego kanału można określić zachowanie, do którego jest przestrzegany kanał po osiągnięciu skonfigurowanej granicy. W poniższej tabeli wymieniono zachowania trybu pełnego dla każdej BoundedChannelFullMode wartości:

Wartość Zachowanie
BoundedChannelFullMode.Wait Jest to wartość domyślna. Wywołania oczekujące WriteAsync na dostępność miejsca w celu ukończenia operacji zapisu. Wywołania do natychmiastowego powrotu TryWrite false .
BoundedChannelFullMode.DropNewest Usuwa i ignoruje najnowszy element w kanale w celu zapisania miejsca na element.
BoundedChannelFullMode.DropOldest Usuwa i ignoruje najstarszy element w kanale, aby zrobić miejsce na zapisywany element.
BoundedChannelFullMode.DropWrite Pomiń zapisywany element.

Ważne

Za każdym razem, gdy produkuje się Channel<TWrite,TRead>.Writer szybciej niż może Channel<TWrite,TRead>.Reader zużyć, scenarzysta kanału doświadcza presji zwrotnej.

Interfejsy API producenta

Funkcja producenta jest uwidaczniona w pliku Channel<TWrite,TRead>.Writer. Interfejsy API producenta i oczekiwane zachowanie zostały szczegółowo opisane w poniższej tabeli:

interfejs API Oczekiwane zachowanie
ChannelWriter<T>.Complete Oznacza kanał jako ukończony, co oznacza, że do niego nie są zapisywane żadne elementy.
ChannelWriter<T>.TryComplete Próbuje oznaczyć kanał jako ukończony, co oznacza, że do kanału nie są zapisywane żadne dane.
ChannelWriter<T>.TryWrite Próbuje zapisać określony element w kanale. Jeśli jest używany z niezwiązanym kanałem, zawsze zwraca to wartość true , chyba że moduł zapisywania kanału sygnalizuje ukończenie przy użyciu ChannelWriter<T>.Completeelementu , lub ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync Zwraca wartość ValueTask<TResult> , która kończy się, gdy miejsce jest dostępne do zapisania elementu.
ChannelWriter<T>.WriteAsync Asynchronicznie zapisuje element w kanale.

Interfejsy API przeznaczone dla klientów

Funkcja konsumenta jest uwidaczniona w programie Channel<TWrite,TRead>.Reader. Interfejsy API konsumentów i oczekiwane zachowanie zostały szczegółowo opisane w poniższej tabeli:

interfejs API Oczekiwane zachowanie
ChannelReader<T>.ReadAllAsync Tworzy obiekt IAsyncEnumerable<T> , który umożliwia odczytywanie wszystkich danych z kanału.
ChannelReader<T>.ReadAsync Asynchronicznie odczytuje element z kanału.
ChannelReader<T>.TryPeek Próbuje zajrzeć do elementu z kanału.
ChannelReader<T>.TryRead Próbuje odczytać element z kanału.
ChannelReader<T>.WaitToReadAsync Zwraca wartość ValueTask<TResult> , która kończy się, gdy dane są dostępne do odczytu.

Typowe wzorce użycia

Istnieje kilka wzorców użycia kanałów. Interfejs API został zaprojektowany tak, aby był prosty, spójny i tak elastyczny, jak to możliwe. Wszystkie metody asynchroniczne zwracają metodę ValueTask (lub ValueTask<bool>), która reprezentuje lekką operację asynchroniczną, która może uniknąć przydzielania, jeśli operacja zakończy się synchronicznie, a potencjalnie nawet asynchronicznie. Ponadto interfejs API jest przeznaczony do komponowania, w tym, że twórca kanału składa obietnice dotyczące zamierzonego użycia. Po utworzeniu kanału z określonymi parametrami wewnętrzna implementacja może działać wydajniej znając te obietnice.

Wzorce tworzenia

Wyobraź sobie, że tworzysz rozwiązanie producenta/konsumenta dla globalnego systemu pozycjonowania (GPS). Chcesz śledzić współrzędne urządzenia w czasie. Przykładowy obiekt współrzędnych może wyglądać następująco:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Niezwiązane wzorce tworzenia

Jednym z typowych wzorców użycia jest utworzenie domyślnego niezwiązanego kanału:

var channel = Channel.CreateUnbounded<Coordinates>();

Załóżmy jednak, że chcesz utworzyć niezwiązany kanał z wieloma producentami i konsumentami:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

W tym przypadku wszystkie zapisy są synchroniczne, nawet WriteAsync. Jest to spowodowane tym, że niezwiązany kanał zawsze ma dostępny pokój na zapis skutecznie natychmiast. Jednak w przypadku AllowSynchronousContinuations ustawienia na truewartość zapisy mogą kończyć się pracą skojarzoną z czytelnikiem przez wykonanie ich kontynuacji. Nie ma to wpływu na synchronizację operacji.

Powiązane wzorce tworzenia

W przypadku ograniczonych kanałów możliwość konfigurowania kanału powinna być znana konsumentowi, aby zapewnić odpowiednie zużycie. Oznacza to, że użytkownik powinien wiedzieć, jakie zachowanie ma kanał po osiągnięciu skonfigurowanej granicy. Przyjrzyjmy się niektórym typowym wzorcom tworzenia powiązanych.

Najprostszym sposobem utworzenia ograniczonego kanału jest określenie pojemności:

var channel = Channel.CreateBounded<Coordinates>(1);

Powyższy kod tworzy ograniczony kanał z maksymalną pojemnością 1. Dostępne są inne opcje, niektóre opcje są takie same jak kanał bez ruchu, a inne są specyficzne dla niezwiązanych kanałów:

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

W poprzednim kodzie kanał jest tworzony jako ograniczony kanał, który jest ograniczony do 1000 elementów z jednym zapisem, ale wielu czytelników. Jego zachowanie w trybie pełnym jest zdefiniowane jako DropWrite, co oznacza, że odrzuca element zapisywany, jeśli kanał jest pełny.

Aby obserwować elementy, które są porzucane podczas korzystania z ograniczonych kanałów, zarejestruj itemDropped wywołanie zwrotne:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Za każdym razem, gdy kanał jest pełny i dodawany jest nowy element, itemDropped wywołanie zwrotne jest wywoływane. W tym przykładzie podany wywołanie zwrotne zapisuje element w konsoli, ale możesz wykonać dowolną inną akcję.

Wzorce producenta

Załóżmy, że producent w tym scenariuszu zapisuje nowe współrzędne w kanale. Producent może to zrobić, wywołując polecenie TryWrite:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

Poprzedni kod producenta:

  • Channel<Coordinates>.Writer Akceptuje element (ChannelWriter<Coordinates>) jako argument wraz z początkowym Coordinateselementem .
  • Definiuje pętlę warunkową while , która próbuje przenieść współrzędne przy użyciu polecenia TryWrite.

Alternatywny producent może użyć WriteAsync metody :

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Channel<Coordinates>.Writer Ponownie element jest używany w while pętli. Jednak tym razem wywoływana WriteAsync jest metoda . Metoda będzie kontynuowana dopiero po zapisaniu współrzędnych. Po zakończeniu while pętli zostanie wykonane wywołanie Complete , które sygnalizuje, że żadne dane nie są zapisywane w kanale.

Innym wzorcem producenta jest użycie WaitToWriteAsync metody , rozważ następujący kod:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

W ramach warunkowego whilewynik WaitToWriteAsync wywołania służy do określenia, czy kontynuować pętlę.

Wzorce konsumentów

Istnieje kilka typowych wzorców odbiorców kanału. Gdy kanał nigdy się nie kończy, co oznacza, że generuje dane na czas nieokreślony, konsument może używać while (true) pętli i odczytywać dane, gdy staną się dostępne:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Uwaga

Ten kod zgłosi wyjątek w przypadku zamknięcia kanału.

Alternatywny konsument może uniknąć tego problemu, używając zagnieżdżonej pętli while, jak pokazano w poniższym kodzie:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

W poprzednim kodzie użytkownik czeka na odczytanie danych. Gdy dane będą dostępne, użytkownik próbuje go odczytać. Pętle te nadal oceniają, dopóki producent kanału nie będzie miał już danych do odczytania. Z tego względu, gdy producent jest znany, że ma skończonej liczby produktów, które produkuje i sygnalizuje ukończenie, konsument może używać await foreach semantyki do iteracji nad przedmiotami:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Powyższy kod używa ReadAllAsync metody do odczytywania wszystkich współrzędnych z kanału.

Zobacz też