System.IO.Pipelines in .NET
System.IO.Pipelines is een bibliotheek die is ontworpen om het eenvoudiger te maken om I/O met hoge prestaties in .NET uit te voeren. Het is een bibliotheek die is gericht op .NET Standard die werkt op alle .NET-implementaties.
De bibliotheek is beschikbaar in het Nuget-pakket System.IO.Pipelines .
Welk probleem lost System.IO.Pipelines op
Apps die streaminggegevens parseren, bestaan uit standaardcode met veel gespecialiseerde en ongebruikelijke codestromen. De standaard- en speciale casecode is complex en moeilijk te onderhouden.
System.IO.Pipelines
is ontworpen voor:
- Streaminggegevens met hoge prestaties parseren.
- Verminder de complexiteit van code.
De volgende code is gebruikelijk voor een TCP-server die berichten met regelscheidingstekens ontvangt (gescheiden door) '\n'
van een client:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
De voorgaande code heeft verschillende problemen:
- Het hele bericht (einde van regel) wordt mogelijk niet ontvangen in één aanroep naar
ReadAsync
. - Het negeert het resultaat van
stream.ReadAsync
.stream.ReadAsync
retourneert hoeveel gegevens zijn gelezen. - Het behandelt niet het geval waarbij meerdere regels in één
ReadAsync
aanroep worden gelezen. - Er wordt een
byte
matrix met elke leesbewerking toegewezen.
Om de voorgaande problemen op te lossen, zijn de volgende wijzigingen vereist:
Buffer de binnenkomende gegevens totdat er een nieuwe regel wordt gevonden.
Parseert alle regels die in de buffer worden geretourneerd.
Het is mogelijk dat de lijn groter is dan 1 kB (1024 bytes). De code moet het formaat van de invoerbuffer wijzigen totdat het scheidingsteken wordt gevonden om de volledige lijn in de buffer te kunnen aanpassen.
- Als het formaat van de buffer wordt gewijzigd, worden er meer bufferkopieën gemaakt naarmate er langere regels in de invoer worden weergegeven.
- Om verspilde ruimte te verminderen, compergeer de buffer die wordt gebruikt voor leeslijnen.
Overweeg het gebruik van bufferpooling om het toewijzen van geheugen herhaaldelijk te voorkomen.
Met de volgende code worden enkele van deze problemen opgelost:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
De vorige code is complex en heeft geen betrekking op alle geïdentificeerde problemen. Netwerken met hoge prestaties betekenen meestal het schrijven van complexe code om de prestaties te maximaliseren. System.IO.Pipelines
is ontworpen om het schrijven van dit type code eenvoudiger te maken.
Pijp
De Pipe klasse kan worden gebruikt om een PipeWriter/PipeReader
paar te maken. Alle gegevens die in de PipeWriter
gegevens zijn geschreven, zijn beschikbaar in het PipeReader
volgende:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
Basisgebruik van pijpen
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Er zijn twee lussen:
FillPipeAsync
leest van deSocket
en schrijft naar dePipeWriter
.ReadPipeAsync
leest van dePipeReader
binnenkomende regels en parseert deze.
Er zijn geen expliciete buffers toegewezen. Alle bufferbeheer wordt gedelegeerd aan de PipeReader
en PipeWriter
implementaties. Het delegeren van bufferbeheer maakt het eenvoudiger om code te gebruiken om zich alleen te richten op de bedrijfslogica.
In de eerste lus:
- PipeWriter.GetMemory(Int32) wordt aangeroepen om geheugen op te halen van de onderliggende schrijver.
- PipeWriter.Advance(Int32) wordt aangeroepen om te zien
PipeWriter
hoeveel gegevens naar de buffer zijn geschreven. - PipeWriter.FlushAsync wordt aangeroepen om de gegevens beschikbaar te maken voor de
PipeReader
.
In de tweede lus verbruikt de PipeReader
buffers die zijn geschreven door PipeWriter
. De buffers zijn afkomstig van de socket. De oproep naar PipeReader.ReadAsync
:
Retourneert een ReadResult met twee belangrijke gegevens:
- De gegevens die zijn gelezen in de vorm van
ReadOnlySequence<byte>
. - Een Booleaanse waarde
IsCompleted
die aangeeft of het einde van de gegevens (EOF) is bereikt.
- De gegevens die zijn gelezen in de vorm van
Nadat u het einde van het regelscheidingsteken (EOL) hebt gevonden en de regel hebt geparserd:
- De logica verwerkt de buffer om over te slaan wat al is verwerkt.
PipeReader.AdvanceTo
wordt aangeroepen om te vertellenPipeReader
hoeveel gegevens er zijn verbruikt en onderzocht.
De lezer- en schrijflussen eindigen door aan te roepen Complete
. Complete
laat de onderliggende pipe het geheugen vrijgeven dat is toegewezen.
Backpressure en stroombesturing
In het ideale voorbeeld werken lezen en parseren samen:
- De leesthread verbruikt gegevens uit het netwerk en plaatst deze in buffers.
- De parseringsthread is verantwoordelijk voor het samenstellen van de juiste gegevensstructuren.
Het parseren kost doorgaans meer tijd dan het kopiëren van blokken gegevens uit het netwerk:
- De leesthread loopt voor op de parseringsthread.
- De leesthread moet vertragen of meer geheugen toewijzen om de gegevens voor de parseringsthread op te slaan.
Voor optimale prestaties is er een balans tussen frequente pauzes en het toewijzen van meer geheugen.
Om het voorgaande probleem op te lossen, heeft de Pipe
twee instellingen voor het beheren van de gegevensstroom:
- PauseWriterThreshold: bepaalt hoeveel gegevens moeten worden gebufferd voordat aanroepen worden FlushAsync onderbroken.
- ResumeWriterThreshold: Bepaalt hoeveel gegevens de lezer moet observeren voordat oproepen worden
PipeWriter.FlushAsync
hervat.
- Retourneert een onvolledige
ValueTask<FlushResult>
hoeveelheid gegevens in dePipe
kruisingenPauseWriterThreshold
. ValueTask<FlushResult>
Wordt voltooid wanneer deze lager wordt danResumeWriterThreshold
.
Er worden twee waarden gebruikt om snelle cyclussen te voorkomen, die kunnen optreden als één waarde wordt gebruikt.
Voorbeelden
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
PipeScheduler
Wanneer u doorgaans asynchrone code gebruikt async
en await
asynchroon wordt hervat op een TaskScheduler of de huidige SynchronizationContextcode.
Bij het uitvoeren van I/O is het belangrijk om nauwkeurige controle te hebben over waar de I/O wordt uitgevoerd. Met dit besturingselement kunt u effectief profiteren van CPU-caches. Efficiënte caching is essentieel voor krachtige apps zoals webservers. PipeScheduler biedt controle over waar asynchrone callbacks worden uitgevoerd. Standaard:
- De huidige SynchronizationContext wordt gebruikt.
- Als er geen
SynchronizationContext
is, wordt de thread-pool gebruikt om callbacks uit te voeren.
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool is de PipeScheduler implementatie waarmee callbacks naar de thread-pool worden geplaatst. PipeScheduler.ThreadPool
is de standaardinstelling en over het algemeen de beste keuze. PipeScheduler.Inline kan onbedoelde gevolgen veroorzaken, zoals impasses.
Pijp opnieuw instellen
Het is vaak efficiënt om het Pipe
object opnieuw te gebruiken. Als u de pipe opnieuw wilt instellen, roept PipeReader Reset u aan wanneer zowel de PipeReader
als PipeWriter
de pijp is voltooid.
PipeReader
PipeReader beheert het geheugen namens de beller. Bel altijd na het bellenPipeReader.ReadAsync.PipeReader.AdvanceTo Hiermee wordt PipeReader
aangegeven wanneer de beller klaar is met het geheugen, zodat deze kan worden bijgehouden. De ReadOnlySequence<byte>
geretourneerde PipeReader.ReadAsync
waarde is alleen geldig tot de aanroep .PipeReader.AdvanceTo
Het is illegaal om te gebruiken ReadOnlySequence<byte>
na het bellen PipeReader.AdvanceTo
.
PipeReader.AdvanceTo
heeft twee SequencePosition argumenten:
- Het eerste argument bepaalt hoeveel geheugen is verbruikt.
- Het tweede argument bepaalt hoeveel van de buffer is waargenomen.
Het markeren van gegevens als verbruikt betekent dat de pijp het geheugen kan retourneren naar de onderliggende buffergroep. Als u gegevens markeert zoals waargenomen, bepaalt u wat de volgende aanroep doet PipeReader.ReadAsync
. Als u alles markeert zoals waargenomen, betekent dit dat de volgende aanroep PipeReader.ReadAsync
niet wordt geretourneerd totdat er meer gegevens naar de pijp worden geschreven. Elke andere waarde zal de volgende aanroep maken om onmiddellijk te PipeReader.ReadAsync
retourneren met de waargenomen en niet-geobserveerde gegevens, maar niet met gegevens die al zijn gebruikt.
Scenario's voor streaminggegevens lezen
Er zijn een aantal typische patronen die zich voordoen bij het lezen van streaminggegevens:
- Met een gegevensstroom parseert u één bericht.
- Met een gegevensstroom parseert u alle beschikbare berichten.
In de volgende voorbeelden wordt de methode gebruikt voor het TryParseLines
parseren van berichten uit een ReadOnlySequence<byte>
. TryParseLines
parseert één bericht en werkt de invoerbuffer bij om het geparseerde bericht uit de buffer te knippen. TryParseLines
maakt geen deel uit van .NET, het is een door de gebruiker geschreven methode die in de volgende secties wordt gebruikt.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Eén bericht lezen
De volgende code leest één bericht van een PipeReader
en retourneert het bericht naar de aanroeper.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Met de voorgaande code wordt:
- Parseert één bericht.
- Werkt de verbruikte en onderzochte
SequencePosition
SequencePosition
gegevens bij zodat deze verwijzen naar het begin van de ingekorte invoerbuffer.
De twee SequencePosition
argumenten worden bijgewerkt omdat TryParseLines
het geparseerde bericht uit de invoerbuffer wordt verwijderd. Over het algemeen moet bij het parseren van één bericht uit de buffer de onderzochte positie een van de volgende zijn:
- Het einde van het bericht.
- Het einde van de ontvangen buffer als er geen bericht is gevonden.
De case met één bericht heeft het meeste potentieel voor fouten. Het doorgeven van de verkeerde waarden die moeten worden onderzocht , kan resulteren in een uitzondering op het geheugen of een oneindige lus. Zie de sectie Algemene problemen van PipeReader in dit artikel voor meer informatie.
Meerdere berichten lezen
Met de volgende code worden alle berichten van een PipeReader
en aanroepen ProcessMessageAsync
op elke code gelezen.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Opzegging
PipeReader.ReadAsync
:
- Ondersteunt het doorgeven van een CancellationToken.
- Genereert een OperationCanceledException als de
CancellationToken
status wordt geannuleerd terwijl er een leesbewerking in behandeling is. - Ondersteunt een manier om de huidige leesbewerking te annuleren via PipeReader.CancelPendingRead, waardoor er geen uitzondering wordt gegenereerd. Het aanroepen
PipeReader.CancelPendingRead
zorgt ervoor dat de huidige of volgende aanroepPipeReader.ReadAsync
een ReadResult metIsCanceled
set retourneert.true
Dit kan handig zijn voor het stoppen van de bestaande leeslus op een niet-destructieve en niet-uitzonderlijke manier.
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Veelvoorkomende problemen met PipeReader
Het doorgeven van de verkeerde waarden aan
consumed
ofexamined
kan leiden tot het lezen van gegevens die al zijn gelezen.Het doorgeven
buffer.End
als onderzocht kan het volgende tot gevolg hebben:- Vastgelopen gegevens
- Mogelijk een OOM-uitzondering (Out of Memory) als er geen gegevens worden gebruikt. Bijvoorbeeld bij
PipeReader.AdvanceTo(position, buffer.End)
het verwerken van één bericht tegelijk vanuit de buffer.
Het doorgeven van de verkeerde waarden aan
consumed
ofexamined
kan resulteren in een oneindige lus. Alsbuffer.Start
dit bijvoorbeeld niet is gewijzigd,PipeReader.AdvanceTo(buffer.Start)
wordt de volgende aanroepPipeReader.ReadAsync
onmiddellijk geretourneerd voordat nieuwe gegevens binnenkomen.Het doorgeven van de verkeerde waarden aan
consumed
ofexamined
kan resulteren in oneindige buffering (uiteindelijke OOM).Als u de
ReadOnlySequence<byte>
aanroep na aanroepPipeReader.AdvanceTo
gebruikt, kan dit leiden tot beschadiging van het geheugen (gebruik na gratis gebruik).Het niet aanroepen
PipeReader.Complete/CompleteAsync
kan leiden tot een geheugenlek.Het controleren ReadResult.IsCompleted en afsluiten van de leeslogica voordat de buffer wordt verwerkt, resulteert in gegevensverlies. De voorwaarde voor het afsluiten van de lus moet zijn gebaseerd op
ReadResult.Buffer.IsEmpty
enReadResult.IsCompleted
. Als u dit onjuist doet, kan dit resulteren in een oneindige lus.
Problematische code
❌Gegevensverlies
Het ReadResult
laatste segment van de gegevens kan worden geretourneerd wanneer IsCompleted
deze is ingesteld op true
. Als u deze gegevens niet leest voordat u de leeslus afsluit, gaan gegevens verloren.
Waarschuwing
Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Waarschuwing
Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
❌Oneindige lus
De volgende logica kan resulteren in een oneindige lus als de Result.IsCompleted
is true
, maar er nooit een volledig bericht in de buffer staat.
Waarschuwing
Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Waarschuwing
Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
Hier volgt nog een stukje code met hetzelfde probleem. Er wordt gecontroleerd op een niet-lege buffer voordat deze wordt gecontroleerd ReadResult.IsCompleted
. Omdat het zich in een else if
bevindt, wordt het voor altijd herhaald als er nooit een volledig bericht in de buffer staat.
Waarschuwing
Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Waarschuwing
Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
❌Niet-reagerende toepassing
Onvoorwaardelijke aanroepen PipeReader.AdvanceTo
met buffer.End
in de examined
positie kan ertoe leiden dat de toepassing niet meer reageert bij het parseren van één bericht. De volgende aanroep om pas terug te PipeReader.AdvanceTo
keren:
- Er zijn meer gegevens naar de pijp geschreven.
- En de nieuwe gegevens zijn nog niet eerder onderzocht.
Waarschuwing
Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Waarschuwing
Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
❌Onvoldoende geheugen (OOM)
Met de volgende voorwaarden blijft de volgende code bufferen totdat er een OutOfMemoryException probleem optreedt:
- Er is geen maximale berichtgrootte.
- De gegevens die door de
PipeReader
gegevens worden geretourneerd, maken geen volledig bericht. Het maakt bijvoorbeeld geen volledig bericht omdat de andere kant een groot bericht schrijft (bijvoorbeeld een bericht van 4 GB).
Waarschuwing
Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Waarschuwing
Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
❌Geheugenbeschadiging
Bij het schrijven van helpers die de buffer lezen, moet elke geretourneerde nettolading worden gekopieerd voordat u aanroept Advance
. In het volgende voorbeeld wordt geheugen geretourneerd dat is Pipe
verwijderd en opnieuw kan worden gebruikt voor de volgende bewerking (lezen/schrijven).
Waarschuwing
Gebruik niet de volgende code. Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het volgende voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Waarschuwing
Gebruik de voorgaande code NIET . Het gebruik van dit voorbeeld leidt tot gegevensverlies, loopt vast, beveiligingsproblemen en mag NIET worden gekopieerd. Het voorgaande voorbeeld wordt gegeven om veelvoorkomende problemen met PipeReader uit te leggen.
PipeWriter
De PipeWriter beheert buffers voor het schrijven namens de beller. PipeWriter
implementeert IBufferWriter<byte>
. IBufferWriter<byte>
maakt het mogelijk om toegang te krijgen tot buffers om schrijfbewerkingen uit te voeren zonder extra bufferkopieën.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
De vorige code:
- Vraagt een buffer aan van ten minste 5 bytes van het
PipeWriter
gebruik GetMemory. - Schrijft bytes voor de ASCII-tekenreeks naar de geretourneerde
Memory<byte>
tekenreeks"Hello"
. - Aanroepen Advance om aan te geven hoeveel bytes naar de buffer zijn geschreven.
- Hiermee worden de
PipeWriter
bytes naar het onderliggende apparaat verzonden.
De vorige schrijfmethode maakt gebruik van de buffers die door de PipeWriter
. Het kan ook hebben gebruikt PipeWriter.WriteAsync, wat:
- Kopieert de bestaande buffer naar de
PipeWriter
. - Oproepen
GetSpan
,Advance
indien van toepassing en oproepen FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
Opzegging
FlushAsync ondersteunt het doorgeven van een CancellationToken. Het doorgeven van een CancellationToken
resultaat in een OperationCanceledException
als het token wordt geannuleerd terwijl er een leegmaken in behandeling is. PipeWriter.FlushAsync
ondersteunt een manier om de huidige flush-bewerking te annuleren zonder PipeWriter.CancelPendingFlush een uitzondering op te geven. Het aanroepen PipeWriter.CancelPendingFlush
veroorzaakt de huidige of volgende oproep naar PipeWriter.FlushAsync
of PipeWriter.WriteAsync
retourneert een FlushResult met IsCanceled
de instelling true
. Dit kan nuttig zijn voor het stoppen van de opbrengst op een niet-destructieve en niet-uitzonderlijke manier.
Veelvoorkomende problemen met PipeWriter
- GetSpan en GetMemory retourneer een buffer met ten minste de aangevraagde hoeveelheid geheugen. Neem niet de exacte buffergrootten aan.
- Er is geen garantie dat opeenvolgende aanroepen dezelfde buffer of dezelfde buffer met dezelfde grootte retourneren.
- Er moet een nieuwe buffer worden aangevraagd na het aanroepen Advance om meer gegevens te kunnen schrijven. De eerder verkregen buffer kan niet worden weggeschreven naar.
- Bellen
GetMemory
ofGetSpan
terwijl er een onvolledig gesprek is,FlushAsync
is niet veilig. - Het aanroepen
Complete
ofCompleteAsync
terwijl er niet-geflusheerde gegevens zijn, kan leiden tot beschadiging van het geheugen.
Tips voor het gebruik van PipeReader en PipeWriter
Met de volgende tips kunt u de System.IO.Pipelines klassen gebruiken:
- Voltooi altijd de PipeReader en PipeWriter, inclusief een uitzondering indien van toepassing.
- PipeReader.AdvanceTo Bel altijd na het bellenPipeReader.ReadAsync.
- Regelmatig tijdens
await
PipeWriter.FlushAsync het schrijven en altijd controleren FlushResult.IsCompleted. Schrijf af alsIsCompleted
dat zo istrue
, zoals dat aangeeft dat de lezer is voltooid en geeft niet langer om wat er is geschreven. - PipeWriter.FlushAsync Bel na het schrijven van iets waartoe u toegang wilt
PipeReader
hebben. - Roep niet
FlushAsync
aan als de lezer pas kan beginnen alsFlushAsync
deze is voltooid, omdat dit een impasse kan veroorzaken. - Zorg ervoor dat slechts één context eigenaar is van een
PipeReader
ofPipeWriter
of toegang heeft tot deze context. Deze typen zijn niet thread-safe. - Nooit toegang krijgen tot een ReadResult.Buffer na het aanroepen
AdvanceTo
of voltooien van dePipeReader
.
IDuplexPipe
Het IDuplexPipe is een contract voor typen die zowel lezen als schrijven ondersteunen. Een netwerkverbinding wordt bijvoorbeeld vertegenwoordigd door een IDuplexPipe
.
In tegenstelling tot Pipe
, dat een PipeReader
en een PipeWriter
bevat, IDuplexPipe
vertegenwoordigt een enkele zijde van een volledige duplex-verbinding. Dat betekent dat wat naar de PipeWriter
brief wordt geschreven, niet wordt gelezen uit de PipeReader
.
Stromen
Wanneer u streamgegevens leest of schrijft, leest u doorgaans gegevens met behulp van een de-serializer en schrijft u gegevens met behulp van een serializer. De meeste van deze stream-API's voor lezen en schrijven hebben een Stream
parameter. Om het gemakkelijker te maken om te integreren met deze bestaande API's PipeReader
en PipeWriter
een AsStream methode beschikbaar te maken. AsStream retourneert een Stream
implementatie rond de PipeReader
of PipeWriter
.
Stream-voorbeeld
PipeReader
en PipeWriter
exemplaren kunnen worden gemaakt met behulp van de statische Create
methoden op basis van een Stream object en optionele overeenkomstige aanmaakopties.
De StreamPipeReaderOptions mogelijkheid voor controle over het maken van het PipeReader
exemplaar met de volgende parameters:
- StreamPipeReaderOptions.BufferSize is de minimale buffergrootte in bytes die worden gebruikt bij het huren van geheugen van de pool, en standaard ingesteld op
4096
. - StreamPipeReaderOptions.LeaveOpen vlag bepaalt of de onderliggende stroom al dan niet open blijft nadat de
PipeReader
bewerking is voltooid en dat de standaardwaarden zijn ingesteldfalse
op . - StreamPipeReaderOptions.MinimumReadSize vertegenwoordigt de drempelwaarde van resterende bytes in de buffer voordat een nieuwe buffer wordt toegewezen, en wordt standaard ingesteld op
1024
. - StreamPipeReaderOptions.Pool wordt gebruikt bij het
MemoryPool<byte>
toewijzen van geheugen en wordt standaard ingesteld opnull
.
De StreamPipeWriterOptions mogelijkheid voor controle over het maken van het PipeWriter
exemplaar met de volgende parameters:
- StreamPipeWriterOptions.LeaveOpen vlag bepaalt of de onderliggende stroom al dan niet open blijft nadat de
PipeWriter
bewerking is voltooid en dat de standaardwaarden zijn ingesteldfalse
op . - StreamPipeWriterOptions.MinimumBufferSize vertegenwoordigt de minimale buffergrootte die moet worden gebruikt bij het huren van geheugen van de Pool, en standaard ingesteld op
4096
. - StreamPipeWriterOptions.Pool wordt gebruikt bij het
MemoryPool<byte>
toewijzen van geheugen en wordt standaard ingesteld opnull
.
Belangrijk
Bij het maken PipeReader
en PipeWriter
uitvoeren van exemplaren met behulp van de Create
methoden moet u rekening houden met de levensduur van het Stream
object. Als u toegang tot de stream nodig hebt nadat de lezer of schrijver ermee klaar is, moet u de LeaveOpen
vlag true
instellen op de aanmaakopties. Anders wordt de stream gesloten.
In de volgende code ziet u hoe u exemplaren kunt maken en PipeWriter
exemplaren gebruikt met behulp van PipeReader
de Create
methoden uit een stream.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
De toepassing gebruikt een StreamReader bestand voor het lezen van het lorem-ipsum.txt als een stroom en moet eindigen met een lege regel. De FileStream wordt doorgegeven aan PipeReader.Create, waarmee een PipeReader
object wordt geïnstitueerd. De consoletoepassing geeft vervolgens de standaarduitvoerstroom door aan PipeWriter.Create het gebruik van Console.OpenStandardOutput(). In het voorbeeld wordt annulering ondersteund.