System.IO.Pipelines i .NET
System.IO.Pipelines är ett bibliotek som är utformat för att göra det enklare att utföra högpresterande I/O i .NET. Det är ett bibliotek som är inriktat på .NET Standard som fungerar på alla .NET-implementeringar.
Biblioteket är tillgängligt i Nuget-paketet System.IO.Pipelines .
Vilket problem löser System.IO.Pipelines?
Appar som parsar strömmande data består av exempelkod med många specialiserade och ovanliga kodflöden. Pannplåten och specialfallskoden är komplexa och svåra att underhålla.
System.IO.Pipelines
har konstruerats för att:
- Ha högpresterande parsning av strömmande data.
- Minska kodkomplexiteten.
Följande kod är typisk för en TCP-server som tar emot radavgränsade meddelanden (avgränsade av '\n'
) från en klient:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Föregående kod har flera problem:
- Hela meddelandet (radslutet) kanske inte tas emot i ett enda anrop till
ReadAsync
. - Den ignorerar resultatet av
stream.ReadAsync
.stream.ReadAsync
returnerar hur mycket data som lästes. - Den hanterar inte det fall där flera rader läss i ett enda
ReadAsync
anrop. - Den allokerar en
byte
matris med varje läsning.
För att åtgärda de föregående problemen krävs följande ändringar:
Buffring av inkommande data tills en ny rad hittas.
Parsa alla rader som returneras i bufferten.
Det är möjligt att linjen är större än 1 KB (1 024 byte). Koden måste ändra storlek på indatabufferten tills avgränsare hittas för att passa den fullständiga raden inuti bufferten.
- Om bufferten ändras görs fler buffertkopior eftersom längre rader visas i indata.
- För att minska slöseri med utrymme komprimerar du bufferten som används för att läsa linjer.
Överväg att använda buffertpooler för att undvika att allokera minne upprepade gånger.
Följande kod åtgärdar några av dessa problem:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Den tidigare koden är komplex och hanterar inte alla problem som identifierats. Nätverk med höga prestanda innebär vanligtvis att skriva komplex kod för att maximera prestanda. System.IO.Pipelines
har utformats för att göra det enklare att skriva den här typen av kod.
Pipa
Klassen Pipe kan användas för att skapa ett PipeWriter/PipeReader
par. Alla data som skrivs PipeWriter
till är tillgängliga i PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
Grundläggande användning av rör
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
Det finns två loopar:
FillPipeAsync
läser frånSocket
och skriver tillPipeWriter
.ReadPipeAsync
läser frånPipeReader
och parsar inkommande rader.
Det finns inga explicita buffertar allokerade. All bufferthantering delegeras till implementeringarna PipeReader
och PipeWriter
. Om du delegerar bufferthantering blir det enklare för användning av kod att fokusera enbart på affärslogik.
I den första loopen:
- PipeWriter.GetMemory(Int32) anropas för att hämta minne från den underliggande skrivaren.
- PipeWriter.Advance(Int32) anropas för att berätta
PipeWriter
hur mycket data som skrevs till bufferten. - PipeWriter.FlushAsync anropas för att göra data tillgängliga för
PipeReader
.
I den andra loopen PipeReader
förbrukar den buffertar som skrivits av PipeWriter
. Buffertarna kommer från socketen. Anropet till PipeReader.ReadAsync
:
Returnerar en ReadResult som innehåller två viktiga informationsdelar:
- De data som lästes i form av
ReadOnlySequence<byte>
. - Ett booleskt värde
IsCompleted
som anger om dataslutet (EOF) har nåtts.
- De data som lästes i form av
Efter att ha hittat slutet av linjens avgränsare (EOL) och parsat linjen:
- Logiken bearbetar bufferten för att hoppa över det som redan bearbetas.
PipeReader.AdvanceTo
anropas för att berättaPipeReader
hur mycket data som har förbrukats och undersökts.
Läsaren och skrivarslingorna slutar med att anropa Complete
. Complete
låter den underliggande Pipe frigöra det minne som den allokerade.
Backpressure och flödeskontroll
Helst fungerar läsning och parsning tillsammans:
- Lästråden förbrukar data från nätverket och placerar dem i buffertar.
- Parsningstråden ansvarar för att konstruera lämpliga datastrukturer.
Vanligtvis tar parsning längre tid än att bara kopiera datablock från nätverket:
- Lästråden hamnar före parsningstråden.
- Lästråden måste antingen sakta ned eller allokera mer minne för att lagra data för parsningstråden.
För optimala prestanda finns det en balans mellan frekventa pauser och allokering av mer minne.
För att lösa föregående problem har de Pipe
två inställningarna för att styra dataflödet:
- PauseWriterThreshold: Avgör hur mycket data som ska bufferas innan anropen pausas FlushAsync .
- ResumeWriterThreshold: Avgör hur mycket data läsaren måste observera innan anropen återupptas
PipeWriter.FlushAsync
.
- Returnerar en ofullständig
ValueTask<FlushResult>
när mängden data iPipe
korsarPauseWriterThreshold
. - Slutförs
ValueTask<FlushResult>
när den blir lägre änResumeWriterThreshold
.
Två värden används för att förhindra snabb cykling, vilket kan inträffa om ett värde används.
Exempel
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
PipeScheduler
Vanligtvis när du använder async
och återupptas asynkron kod på antingen en TaskScheduler eller den aktuella SynchronizationContextawait
.
När du gör I/O är det viktigt att ha detaljerad kontroll över var I/O utförs. Med den här kontrollen kan du dra nytta av CPU-cacheminnen på ett effektivt sätt. Effektiv cachelagring är avgörande för högpresterande appar som webbservrar. PipeScheduler ger kontroll över var asynkrona motringningar körs. Som standard:
- SynchronizationContext Strömmen används.
- Om det inte finns använder
SynchronizationContext
den trådpoolen för att köra återanrop.
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object?> action, object? state)
{
if (state is not null)
{
_queue.Add((action, state));
}
// else log the fact that _queue.Add was not called.
}
}
PipeScheduler.ThreadPool är implementeringen PipeScheduler som köar återanrop till trådpoolen. PipeScheduler.ThreadPool
är standard och det bästa valet. PipeScheduler.Inline kan orsaka oavsiktliga konsekvenser som dödlägen.
Röråterställning
Det är ofta effektivt att återanvända objektet Pipe
. Om du vill återställa röret anropar du PipeReader Reset när både PipeReader
och PipeWriter
är klara.
PipeReader
PipeReader hanterar minnet åt anroparen. Ring PipeReader.AdvanceTo alltid efter att ha ringt PipeReader.ReadAsync. Detta meddelar PipeReader
när anroparen är klar med minnet så att det kan spåras. Den ReadOnlySequence<byte>
returnerade från PipeReader.ReadAsync
är endast giltig tills anropet till PipeReader.AdvanceTo
. Det är olagligt att använda ReadOnlySequence<byte>
efter att ha anropat PipeReader.AdvanceTo
.
PipeReader.AdvanceTo
tar två SequencePosition argument:
- Det första argumentet avgör hur mycket minne som förbrukades.
- Det andra argumentet avgör hur mycket av bufferten som observerades.
Att markera data som förbrukade innebär att röret kan returnera minnet till den underliggande buffertpoolen. Om data markeras som observerade styrs vad nästa anrop till PipeReader.ReadAsync
gör. Att markera allt som observerats innebär att nästa anrop till PipeReader.ReadAsync
inte returneras förrän mer data har skrivits till röret. Alla andra värden gör nästa anrop för att PipeReader.ReadAsync
returnera omedelbart med observerade och oobserverade data, men inte data som redan har förbrukats.
Läsa scenarier för strömmande data
Det finns ett par typiska mönster som uppstår när du försöker läsa strömmande data:
- Med en dataström parsar du ett enda meddelande.
- Med en dataström parsar du alla tillgängliga meddelanden.
I följande exempel används TryParseLines
metoden för att parsa meddelanden från en ReadOnlySequence<byte>
. TryParseLines
parsar ett enda meddelande och uppdaterar indatabufferten för att trimma det parsade meddelandet från bufferten. TryParseLines
är inte en del av .NET, det är en användarskriven metod som används i följande avsnitt.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Läsa ett enda meddelande
Följande kod läser ett enda meddelande från en PipeReader
och returnerar det till anroparen.
async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Koden ovan:
- Parsar ett enda meddelande.
- Uppdaterar den förbrukade
SequencePosition
och undersöktaSequencePosition
så att den pekar på början av den trimmade indatabufferten.
De två SequencePosition
argumenten uppdateras eftersom TryParseLines
det tolkade meddelandet tas bort från indatabufferten. När du parsar ett enda meddelande från bufferten bör den undersökta positionen vanligtvis vara något av följande:
- Slutet av meddelandet.
- Slutet på den mottagna bufferten om inget meddelande hittades.
Det enskilda meddelandefallet har störst risk för fel. Om du skickar fel värden som ska undersökas kan det leda till ett undantag från minnet eller en oändlig loop. Mer information finns i avsnittet Om vanliga problem med PipeReader i den här artikeln.
Läsa flera meddelanden
Följande kod läser alla meddelanden från en och anropar ProcessMessageAsync
var och enPipeReader
.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Annullering
PipeReader.ReadAsync
:
- Har stöd för att skicka en CancellationToken.
- Utlöser ett OperationCanceledException om det
CancellationToken
avbryts medan det finns en väntande läsning. - Stöder ett sätt att avbryta den aktuella läsåtgärden via PipeReader.CancelPendingRead, vilket undviker att skapa ett undantag. Anrop
PipeReader.CancelPendingRead
gör att det aktuella eller nästa anropet tillPipeReader.ReadAsync
returnerar ett ReadResult medIsCanceled
inställt påtrue
. Detta kan vara användbart för att stoppa den befintliga läsloopen på ett icke-destruktivt och icke-exceptionellt sätt.
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Vanliga problem med PipeReader
Om du skickar fel värden till eller
examined
kan det leda tillconsumed
att du läser redan lästa data.Att skicka
buffer.End
som undersökt kan resultera i:- Data som har stoppats
- Eventuellt ett undantag från slut på minne (OOM) om data inte används. Till exempel
PipeReader.AdvanceTo(position, buffer.End)
när du bearbetar ett enda meddelande i taget från bufferten.
Om du skickar fel värden till
consumed
ellerexamined
kan det resultera i en oändlig loop. Ombuffer.Start
det till exempelPipeReader.AdvanceTo(buffer.Start)
inte har ändrats kommer nästa anrop attPipeReader.ReadAsync
returneras omedelbart innan nya data tas emot.Om fel värden skickas till
consumed
ellerexamined
kan det resultera i oändlig buffring (eventuell OOM).När du använder efteranropet
ReadOnlySequence<byte>
PipeReader.AdvanceTo
kan det leda till minnesskada (använd efter kostnadsfritt).Om du inte anropar
PipeReader.Complete/CompleteAsync
kan det leda till en minnesläcka.Att kontrollera ReadResult.IsCompleted och avsluta läslogik innan bufferten bearbetas resulterar i dataförlust. Loopavslutsvillkoret ska baseras på
ReadResult.Buffer.IsEmpty
ochReadResult.IsCompleted
. Om du gör detta felaktigt kan det resultera i en oändlig loop.
Problematisk kod
❌Dataförlust
ReadResult
Kan returnera det sista datasegmentet när IsCompleted
är inställt på true
. Att inte läsa dessa data innan läsloopen avslutas resulterar i dataförlust.
Varning
Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Varning
Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.
❌Oändlig loop
Följande logik kan resultera i en oändlig loop om Result.IsCompleted
är true
men det finns aldrig ett fullständigt meddelande i bufferten.
Varning
Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Varning
Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.
Här är en annan kod med samma problem. Den söker efter en buffert som inte är tom innan du kontrollerar ReadResult.IsCompleted
. Eftersom den finns i en else if
loopar den för alltid om det aldrig finns ett fullständigt meddelande i bufferten.
Varning
Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Varning
Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.
❌Program som inte svarar
Villkorslöst anrop PipeReader.AdvanceTo
med buffer.End
i examined
positionen kan leda till att programmet inte svarar när ett enda meddelande parsas. Nästa anrop till PipeReader.AdvanceTo
returnerar inte förrän:
- Mer data skrivs till röret.
- Och de nya data har inte undersökts tidigare.
Varning
Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Varning
Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.
❌Slut på minne (OOM)
Med följande villkor fortsätter följande kod buffring tills en OutOfMemoryException inträffar:
- Det finns ingen maximal meddelandestorlek.
- De data som returneras från
PipeReader
gör inte ett fullständigt meddelande. Det gör till exempel inte ett fullständigt meddelande eftersom den andra sidan skriver ett stort meddelande (till exempel ett 4 GB-meddelande).
Varning
Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Varning
Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.
❌Minnesskada
När du skriver hjälppersonal som läser bufferten bör alla returnerade nyttolaster kopieras innan du anropar Advance
. I följande exempel returneras minne som Pipe
har tagits bort och kan återanvända det för nästa åtgärd (läsa/skriva).
Varning
Använd INTE följande kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Följande exempel finns för att förklara PipeReader Vanliga problem.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Varning
Använd INTE föregående kod. Med det här exemplet resulterar det i dataförlust, låsning, säkerhetsproblem och bör INTE kopieras. Föregående exempel tillhandahålls för att förklara Vanliga problem med PipeReader.
PipeWriter
Hanterar PipeWriter buffertar för att skriva åt anroparen. PipeWriter
implementerar IBufferWriter<byte>
. IBufferWriter<byte>
gör det möjligt att få åtkomst till buffertar för att utföra skrivningar utan extra buffertkopior.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Föregående kod:
- Begär en buffert på minst 5 byte från
PipeWriter
att använda GetMemory. - Skriver byte för ASCII-strängen
"Hello"
till den returneradeMemory<byte>
. - Anrop Advance för att ange hur många byte som skrevs till bufferten.
- Tömer
PipeWriter
, som skickar byte till den underliggande enheten.
Den tidigare skrivmetoden använder buffertar som tillhandahålls av PipeWriter
. Det kunde också ha använt PipeWriter.WriteAsync, vilket:
- Kopierar den befintliga bufferten
PipeWriter
till . - Anropar
GetSpan
,Advance
efter behov och anropar FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
Annullering
FlushAsync har stöd för att skicka en CancellationToken. Om du skickar ett CancellationToken
resultat i ett OperationCanceledException
om token avbryts medan en tömning väntar. PipeWriter.FlushAsync
stöder ett sätt att avbryta den aktuella tömningsåtgärden via PipeWriter.CancelPendingFlush utan att skapa ett undantag. Anrop PipeWriter.CancelPendingFlush
gör att det aktuella eller nästa anropet till PipeWriter.FlushAsync
eller PipeWriter.WriteAsync
returnerar ett FlushResult med IsCanceled
inställt på true
. Detta kan vara användbart för att stoppa den resulterande tömningen på ett icke-destruktivt och icke-exceptionellt sätt.
Vanliga problem med PipeWriter
- GetSpan och GetMemory returnera en buffert med minst den begärda mängden minne. Anta inte exakta buffertstorlekar.
- Det finns ingen garanti för att efterföljande anrop returnerar samma buffert eller buffert i samma storlek.
- En ny buffert måste begäras efter anropet Advance för att kunna fortsätta skriva mer data. Det går inte att skriva till den tidigare hämtade bufferten.
- Det är inte säkert att ringa
GetMemory
ellerGetSpan
när det finns ett ofullständigt samtal tillFlushAsync
. - Att anropa
Complete
ellerCompleteAsync
när det finns oupplösade data kan leda till minnesskada.
Tips för att använda PipeReader och PipeWriter
Följande tips hjälper dig att använda klasserna System.IO.Pipelines :
- Slutför alltid PipeReader och PipeWriter, inklusive ett undantag där det är tillämpligt.
- Ring PipeReader.AdvanceTo alltid efter att ha ringt PipeReader.ReadAsync.
- Skriv regelbundet
await
PipeWriter.FlushAsync och kontrollera FlushResult.IsCompletedalltid . Avbryt skrivning omIsCompleted
ärtrue
, eftersom det indikerar att läsaren har slutförts och inte längre bryr sig om vad som skrivs. - Anropa PipeWriter.FlushAsync när du har skrivit något som du vill
PipeReader
ha åtkomst till. - Anropa
FlushAsync
inte om läsaren inte kan starta förränFlushAsync
den är klar, eftersom det kan orsaka ett dödläge. - Se till att endast en kontext "äger" en
PipeReader
ellerPipeWriter
eller kommer åt dem. Dessa typer är inte trådsäkra. - Få aldrig åtkomst till en ReadResult.Buffer när du har anropat
AdvanceTo
eller slutförtPipeReader
.
IDuplexPipe
IDuplexPipe är ett kontrakt för typer som stöder både läsning och skrivning. Till exempel skulle en nätverksanslutning representeras av en IDuplexPipe
.
Till skillnad från Pipe
, som innehåller en PipeReader
och en PipeWriter
, IDuplexPipe
representerar en enda sida av en fullständig duplex-anslutning. Det innebär att det som skrivs till PipeWriter
inte kommer att läsas från PipeReader
.
Strömmar
När du läser eller skriver dataström läser du vanligtvis data med en de-serialiserare och skriver data med en serialiserare. De flesta av dessa läs- och skrivström-API:er har en Stream
parameter. För att göra det enklare att integrera med dessa befintliga API:er PipeReader
och PipeWriter
exponera en AsStream metod. AsStream returnerar en Stream
implementering runt PipeReader
eller PipeWriter
.
Stream-exempel
PipeReader
och PipeWriter
instanser kan skapas med hjälp av statiska Create
metoder givet ett Stream objekt och valfria motsvarande alternativ för skapande.
Tillåt StreamPipeReaderOptions kontroll över skapandet av instansen PipeReader
med följande parametrar:
- StreamPipeReaderOptions.BufferSize är den minsta buffertstorleken i byte som används när du hyr minne från poolen och standardvärdet är
4096
. - StreamPipeReaderOptions.LeaveOpen flagga avgör om den underliggande strömmen lämnas öppen efter slutföranden
PipeReader
och standardvärdet ärfalse
. - StreamPipeReaderOptions.MinimumReadSize representerar tröskelvärdet för återstående byte i bufferten innan en ny buffert allokeras och standardvärdet
1024
är . - StreamPipeReaderOptions.Pool är den
MemoryPool<byte>
som används vid allokering av minne och är standardvärdetnull
.
Tillåt StreamPipeWriterOptions kontroll över skapandet av instansen PipeWriter
med följande parametrar:
- StreamPipeWriterOptions.LeaveOpen flagga avgör om den underliggande strömmen lämnas öppen efter slutföranden
PipeWriter
och standardvärdet ärfalse
. - StreamPipeWriterOptions.MinimumBufferSize representerar den minsta buffertstorlek som ska användas när du hyr minne från Pool, och standardvärdet är
4096
. - StreamPipeWriterOptions.Pool är den
MemoryPool<byte>
som används vid allokering av minne och är standardvärdetnull
.
Viktigt!
När du skapar PipeReader
och PipeWriter
instanser med hjälp av Create
metoderna måste du överväga objektets Stream
livslängd. Om du behöver åtkomst till dataströmmen när läsaren eller skrivaren är klar med den måste du ange LeaveOpen
flaggan till true
för skapandealternativen. Annars stängs strömmen.
Följande kod visar hur du skapar och PipeWriter
instanser med hjälp Create
av PipeReader
metoderna från en dataström.
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
Programmet använder en StreamReader för att läsa lorem-ipsum.txt-filen som en ström, och den måste avslutas med en tom rad. FileStream skickas till PipeReader.Create, som instansierar ett PipeReader
objekt. Konsolprogrammet skickar sedan sin standardutdataström till PipeWriter.Create med hjälp av Console.OpenStandardOutput(). Exemplet stöder annullering.