Dela via


System.IO.Pipelines i .NET

System.IO.Pipelines är ett bibliotek som är utformat för att göra det enklare att utföra högpresterande I/O i .NET. Det är ett bibliotek som är inriktat på .NET Standard som fungerar på alla .NET-implementeringar.

Biblioteket är tillgängligt i Nuget-paketet System.IO.Pipelines .

Vilket problem löser System.IO.Pipelines?

Appar som parsar strömmande data består av exempelkod med många specialiserade och ovanliga kodflöden. Pannplåten och specialfallskoden är komplexa och svåra att underhålla.

System.IO.Pipelines har konstruerats för att:

  • Ha högpresterande parsning av strömmande data.
  • Minska kodkomplexiteten.

Följande kod är typisk för en TCP-server som tar emot radavgränsade meddelanden (avgränsade av '\n') från en klient:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

Föregående kod har flera problem:

  • Hela meddelandet (radslutet) kanske inte tas emot i ett enda anrop till ReadAsync.
  • Den ignorerar resultatet av stream.ReadAsync. stream.ReadAsync returnerar hur mycket data som lästes.
  • Den hanterar inte det fall där flera rader läss i ett enda ReadAsync anrop.
  • Den allokerar en byte matris med varje läsning.

För att åtgärda de föregående problemen krävs följande ändringar:

  • Buffring av inkommande data tills en ny rad hittas.

  • Parsa alla rader som returneras i bufferten.

  • Det är möjligt att linjen är större än 1 KB (1 024 byte). Koden måste ändra storlek på indatabufferten tills avgränsare hittas för att passa den fullständiga raden inuti bufferten.

    • Om bufferten ändras görs fler buffertkopior eftersom längre rader visas i indata.
    • För att minska slöseri med utrymme komprimerar du bufferten som används för att läsa linjer.
  • Överväg att använda buffertpooler för att undvika att allokera minne upprepade gånger.

  • Följande kod åtgärdar några av dessa problem:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Den tidigare koden är komplex och hanterar inte alla problem som identifierats. Nätverk med höga prestanda innebär vanligtvis att skriva komplex kod för att maximera prestanda. System.IO.Pipelines har utformats för att göra det enklare att skriva den här typen av kod.

Pipa

Klassen Pipe kan användas för att skapa ett PipeWriter/PipeReader par. Alla data som skrivs PipeWriter till är tillgängliga i PipeReader:

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Grundläggande användning av rör

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

Det finns två loopar:

  • FillPipeAsync läser från Socket och skriver till PipeWriter.
  • ReadPipeAsync läser från PipeReader och parsar inkommande rader.

Det finns inga explicita buffertar allokerade. All bufferthantering delegeras till implementeringarna PipeReader och PipeWriter . Om du delegerar bufferthantering blir det enklare för användning av kod att fokusera enbart på affärslogik.

I den första loopen:

I den andra loopen PipeReader förbrukar den buffertar som skrivits av PipeWriter. Buffertarna kommer från socketen. Anropet till PipeReader.ReadAsync:

  • Returnerar en ReadResult som innehåller två viktiga informationsdelar:

    • De data som lästes i form av ReadOnlySequence<byte>.
    • Ett booleskt värde IsCompleted som anger om dataslutet (EOF) har nåtts.

Efter att ha hittat slutet av linjens avgränsare (EOL) och parsat linjen:

  • Logiken bearbetar bufferten för att hoppa över det som redan bearbetas.
  • PipeReader.AdvanceTo anropas för att berätta PipeReader hur mycket data som har förbrukats och undersökts.

Läsaren och skrivarslingorna slutar med att anropa Complete. Complete låter den underliggande Pipe frigöra det minne som den allokerade.

Backpressure och flödeskontroll

Helst fungerar läsning och parsning tillsammans:

  • Lästråden förbrukar data från nätverket och placerar dem i buffertar.
  • Parsningstråden ansvarar för att konstruera lämpliga datastrukturer.

Vanligtvis tar parsning längre tid än att bara kopiera datablock från nätverket:

  • Lästråden hamnar före parsningstråden.
  • Lästråden måste antingen sakta ned eller allokera mer minne för att lagra data för parsningstråden.

För optimala prestanda finns det en balans mellan frekventa pauser och allokering av mer minne.

För att lösa föregående problem har de Pipe två inställningarna för att styra dataflödet:

Diagram med ResumeWriterThreshold och PauseWriterThreshold

PipeWriter.FlushAsync:

  • Returnerar en ofullständig ValueTask<FlushResult> när mängden data i Pipe korsar PauseWriterThreshold.
  • Slutförs ValueTask<FlushResult> när den blir lägre än ResumeWriterThreshold.

Två värden används för att förhindra snabb cykling, vilket kan inträffa om ett värde används.

Exempel

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Vanligtvis när du använder async och återupptas asynkron kod på antingen en TaskScheduler eller den aktuella SynchronizationContextawait.

När du gör I/O är det viktigt att ha detaljerad kontroll över var I/O utförs. Med den här kontrollen kan du dra nytta av CPU-cacheminnen på ett effektivt sätt. Effektiv cachelagring är avgörande för högpresterande appar som webbservrar. PipeScheduler ger kontroll över var asynkrona motringningar körs. Som standard:

  • SynchronizationContext Strömmen används.
  • Om det inte finns använder SynchronizationContextden trådpoolen för att köra återanrop.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool är implementeringen PipeScheduler som köar återanrop till trådpoolen. PipeScheduler.ThreadPool är standard och det bästa valet. PipeScheduler.Inline kan orsaka oavsiktliga konsekvenser som dödlägen.

Röråterställning

Det är ofta effektivt att återanvända objektet Pipe . Om du vill återställa röret anropar du PipeReader Reset när både PipeReader och PipeWriter är klara.

PipeReader

PipeReader hanterar minnet åt anroparen. Ring PipeReader.AdvanceTo alltid efter att ha ringt PipeReader.ReadAsync. Detta meddelar PipeReader när anroparen är klar med minnet så att det kan spåras. Den ReadOnlySequence<byte> returnerade från PipeReader.ReadAsync är endast giltig tills anropet till PipeReader.AdvanceTo. Det är olagligt att använda ReadOnlySequence<byte> efter att ha anropat PipeReader.AdvanceTo.

PipeReader.AdvanceTo tar två SequencePosition argument:

  • Det första argumentet avgör hur mycket minne som förbrukades.
  • Det andra argumentet avgör hur mycket av bufferten som observerades.

Att markera data som förbrukade innebär att röret kan returnera minnet till den underliggande buffertpoolen. Om data markeras som observerade styrs vad nästa anrop till PipeReader.ReadAsync gör. Att markera allt som observerats innebär att nästa anrop till PipeReader.ReadAsync inte returneras förrän mer data har skrivits till röret. Alla andra värden gör nästa anrop för att PipeReader.ReadAsync returnera omedelbart med observerade och oobserverade data, men inte data som redan har förbrukats.

Läsa scenarier för strömmande data

Det finns ett par typiska mönster som uppstår när du försöker läsa strömmande data:

  • Med en dataström parsar du ett enda meddelande.
  • Med en dataström parsar du alla tillgängliga meddelanden.

I följande exempel används TryParseLines metoden för att parsa meddelanden från en ReadOnlySequence<byte>. TryParseLines parsar ett enda meddelande och uppdaterar indatabufferten för att trimma det parsade meddelandet från bufferten. TryParseLines är inte en del av .NET, det är en användarskriven metod som används i följande avsnitt.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Läsa ett enda meddelande

Följande kod läser ett enda meddelande från en PipeReader och returnerar det till anroparen.

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

Koden ovan:

  • Parsar ett enda meddelande.
  • Uppdaterar den förbrukade SequencePosition och undersökta SequencePosition så att den pekar på början av den trimmade indatabufferten.

De två SequencePosition argumenten uppdateras eftersom TryParseLines det tolkade meddelandet tas bort från indatabufferten. När du parsar ett enda meddelande från bufferten bör den undersökta positionen vanligtvis vara något av följande:

  • Slutet av meddelandet.
  • Slutet på den mottagna bufferten om inget meddelande hittades.

Det enskilda meddelandefallet har störst risk för fel. Om du skickar fel värden som ska undersökas kan det leda till ett undantag från minnet eller en oändlig loop. Mer information finns i avsnittet Om vanliga problem med PipeReader i den här artikeln.

Läsa flera meddelanden

Följande kod läser alla meddelanden från en och anropar ProcessMessageAsync var och enPipeReader.

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Annullering

PipeReader.ReadAsync:

  • Har stöd för att skicka en CancellationToken.
  • Utlöser ett OperationCanceledException om det CancellationToken avbryts medan det finns en väntande läsning.
  • Stöder ett sätt att avbryta den aktuella läsåtgärden via PipeReader.CancelPendingRead, vilket undviker att skapa ett undantag. Anrop PipeReader.CancelPendingRead gör att det aktuella eller nästa anropet till PipeReader.ReadAsync returnerar ett ReadResult med IsCanceled inställt på true. Detta kan vara användbart för att stoppa den befintliga läsloopen på ett icke-destruktivt och icke-exceptionellt sätt.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Vanliga problem med PipeReader

  • Om du skickar fel värden till eller examined kan det leda till consumed att du läser redan lästa data.

  • Att skicka buffer.End som undersökt kan resultera i:

    • Data som har stoppats
    • Eventuellt ett undantag från slut på minne (OOM) om data inte används. Till exempel PipeReader.AdvanceTo(position, buffer.End) när du bearbetar ett enda meddelande i taget från bufferten.
  • Om du skickar fel värden till consumed eller examined kan det resultera i en oändlig loop. Om buffer.Start det till exempel PipeReader.AdvanceTo(buffer.Start) inte har ändrats kommer nästa anrop att PipeReader.ReadAsync returneras omedelbart innan nya data tas emot.

  • Om fel värden skickas till consumed eller examined kan det resultera i oändlig buffring (eventuell OOM).

  • När du använder efteranropet ReadOnlySequence<byte> PipeReader.AdvanceTo kan det leda till minnesskada (använd efter kostnadsfritt).

  • Om du inte anropar PipeReader.Complete/CompleteAsync kan det leda till en minnesläcka.

  • Att kontrollera ReadResult.IsCompleted och avsluta läslogik innan bufferten bearbetas resulterar i dataförlust. Loopavslutsvillkoret ska baseras på ReadResult.Buffer.IsEmpty och ReadResult.IsCompleted. Om du gör detta felaktigt kan det resultera i en oändlig loop.

Problematisk kod

Dataförlust

ReadResult Kan returnera det sista datasegmentet när IsCompleted är inställt på true. Att inte läsa dessa data innan läsloopen avslutas resulterar i dataförlust.

Varning

Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Varning

Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.

Oändlig loop

Följande logik kan resultera i en oändlig loop om Result.IsCompleted är true men det finns aldrig ett fullständigt meddelande i bufferten.

Varning

Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Varning

Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.

Här är en annan kod med samma problem. Den söker efter en buffert som inte är tom innan du kontrollerar ReadResult.IsCompleted. Eftersom den finns i en else ifloopar den för alltid om det aldrig finns ett fullständigt meddelande i bufferten.

Varning

Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Varning

Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.

Program som inte svarar

Villkorslöst anrop PipeReader.AdvanceTo med buffer.End i examined positionen kan leda till att programmet inte svarar när ett enda meddelande parsas. Nästa anrop till PipeReader.AdvanceTo returnerar inte förrän:

  • Mer data skrivs till röret.
  • Och de nya data har inte undersökts tidigare.

Varning

Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Varning

Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.

Slut på minne (OOM)

Med följande villkor fortsätter följande kod buffring tills en OutOfMemoryException inträffar:

  • Det finns ingen maximal meddelandestorlek.
  • De data som returneras från PipeReader gör inte ett fullständigt meddelande. Det gör till exempel inte ett fullständigt meddelande eftersom den andra sidan skriver ett stort meddelande (till exempel ett 4 GB-meddelande).

Varning

Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Varning

Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.

Minnesskada

När du skriver hjälppersonal som läser bufferten bör alla returnerade nyttolaster kopieras innan du anropar Advance. I följande exempel returneras minne som Pipe har tagits bort och kan återanvända det för nästa åtgärd (läsa/skriva).

Varning

Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Varning

Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.

PipeWriter

Hanterar PipeWriter buffertar för att skriva åt anroparen. PipeWriter implementerar IBufferWriter<byte>. IBufferWriter<byte> gör det möjligt att få åtkomst till buffertar för att utföra skrivningar utan extra buffertkopior.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

Föregående kod:

  • Begär en buffert på minst 5 byte från PipeWriter att använda GetMemory.
  • Skriver byte för ASCII-strängen "Hello" till den returnerade Memory<byte>.
  • Anrop Advance för att ange hur många byte som skrevs till bufferten.
  • Tömer PipeWriter, som skickar byte till den underliggande enheten.

Den tidigare skrivmetoden använder buffertar som tillhandahålls av PipeWriter. Det kunde också ha använt PipeWriter.WriteAsync, vilket:

  • Kopierar den befintliga bufferten PipeWritertill .
  • Anropar GetSpan, Advance efter behov och anropar FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

Annullering

FlushAsync har stöd för att skicka en CancellationToken. Om du skickar ett CancellationToken resultat i ett OperationCanceledException om token avbryts medan en tömning väntar. PipeWriter.FlushAsync stöder ett sätt att avbryta den aktuella tömningsåtgärden via PipeWriter.CancelPendingFlush utan att skapa ett undantag. Anrop PipeWriter.CancelPendingFlush gör att det aktuella eller nästa anropet till PipeWriter.FlushAsync eller PipeWriter.WriteAsync returnerar ett FlushResult med IsCanceled inställt på true. Detta kan vara användbart för att stoppa den resulterande tömningen på ett icke-destruktivt och icke-exceptionellt sätt.

Vanliga problem med PipeWriter

  • GetSpan och GetMemory returnera en buffert med minst den begärda mängden minne. Anta inte exakta buffertstorlekar.
  • Det finns ingen garanti för att efterföljande anrop returnerar samma buffert eller buffert i samma storlek.
  • En ny buffert måste begäras efter anropet Advance för att kunna fortsätta skriva mer data. Det går inte att skriva till den tidigare hämtade bufferten.
  • Det är inte säkert att ringa GetMemory eller GetSpan när det finns ett ofullständigt samtal till FlushAsync .
  • Att anropa Complete eller CompleteAsync när det finns oupplösade data kan leda till minnesskada.

Tips för att använda PipeReader och PipeWriter

Följande tips hjälper dig att använda klasserna System.IO.Pipelines :

  • Slutför alltid PipeReader och PipeWriter, inklusive ett undantag där det är tillämpligt.
  • Ring PipeReader.AdvanceTo alltid efter att ha ringt PipeReader.ReadAsync.
  • Skriv regelbundet await PipeWriter.FlushAsync och kontrollera FlushResult.IsCompletedalltid . Avbryt skrivning om IsCompleted är true, eftersom det indikerar att läsaren har slutförts och inte längre bryr sig om vad som skrivs.
  • Anropa PipeWriter.FlushAsync när du har skrivit något som du vill PipeReader ha åtkomst till.
  • Anropa FlushAsync inte om läsaren inte kan starta förrän FlushAsync den är klar, eftersom det kan orsaka ett dödläge.
  • Se till att endast en kontext "äger" en PipeReader eller PipeWriter eller kommer åt dem. Dessa typer är inte trådsäkra.
  • Få aldrig åtkomst till en ReadResult.Buffer när du har anropat AdvanceTo eller slutfört PipeReader.

IDuplexPipe

IDuplexPipe är ett kontrakt för typer som stöder både läsning och skrivning. Till exempel skulle en nätverksanslutning representeras av en IDuplexPipe.

Till skillnad från Pipe, som innehåller en PipeReader och en PipeWriter, IDuplexPipe representerar en enda sida av en fullständig duplex-anslutning. Det innebär att det som skrivs till PipeWriter inte kommer att läsas från PipeReader.

Strömmar

När du läser eller skriver dataström läser du vanligtvis data med en de-serialiserare och skriver data med en serialiserare. De flesta av dessa läs- och skrivström-API:er har en Stream parameter. För att göra det enklare att integrera med dessa befintliga API:er PipeReader och PipeWriter exponera en AsStream metod. AsStream returnerar en Stream implementering runt PipeReader eller PipeWriter.

Stream-exempel

PipeReader och PipeWriter instanser kan skapas med hjälp av statiska Create metoder givet ett Stream objekt och valfria motsvarande alternativ för skapande.

Tillåt StreamPipeReaderOptions kontroll över skapandet av instansen PipeReader med följande parametrar:

Tillåt StreamPipeWriterOptions kontroll över skapandet av instansen PipeWriter med följande parametrar:

Viktigt!

När du skapar PipeReader och PipeWriter instanser med hjälp av Create metoderna måste du överväga objektets Stream livslängd. Om du behöver åtkomst till dataströmmen när läsaren eller skrivaren är klar med den måste du ange LeaveOpen flaggan till true för skapandealternativen. Annars stängs strömmen.

Följande kod visar hur du skapar och PipeWriter instanser med hjälp Create av PipeReader metoderna från en dataström.

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

Programmet använder en StreamReader för att läsa lorem-ipsum.txt-filen som en ström, och den måste avslutas med en tom rad. FileStream skickas till PipeReader.Create, som instansierar ett PipeReader objekt. Konsolprogrammet skickar sedan sin standardutdataström till PipeWriter.Create med hjälp av Console.OpenStandardOutput(). Exemplet stöder annullering.