System.IO.Pipelines is a new library that is designed to make it easier to do high performance IO in .NET. It’s a library targeting .NET Standard that works on all .NET implementations.
Pipelines was born from the work the .NET Core team did to make Kestrel one of the fastest web servers in the industry. What started as an implementation detail inside of Kestrel progressed into a re-usable API that shipped in 2.1 as a first class BCL API (System.IO.Pipelines) available for all .NET developers.
What problem does it solve?
Correctly parsing data from a stream or socket is dominated by boilerplate code and has many corner cases, leading to complex code that is difficult to maintain.
Achieving high performance and being correct, while also dealing with this complexity is difficult. Pipelines aims to solve this complexity.
What extra complexity exists today?
Let’s start with a simple problem. We want to write a TCP server that receives line-delimited messages (delimited by \n) from a client.
TCP Server with NetworkStream
DISCLAIMER: As with all performance sensitive work, each of the scenarios should be measured within the context of your application. The overhead of the various techniques mentioned may not be necessary depending on the scale your networking applications need to handle.
The typical code you would write in .NET before pipelines looks something like this:
This code might work when testing locally but it’s has several errors:
- The entire message (end of line) may not have been received in a single call to
ReadAsync. - It’s ignoring the result of
stream.ReadAsync()which returns how much data was actually filled into the buffer. - It doesn’t handle the case where multiple lines come back in a single
ReadAsynccall.
These are some of the common pitfalls when reading streaming data. To account for this we need to make a few changes:
- We need to buffer the incoming data until we have found a new line.
- We need to parse all of the lines returned in the buffer
Once again, this might work in local testing but it’s possible that the line is bigger than 1KiB (1024 bytes). We need to resize the input buffer until we have found a new line.
Also, we’re allocating buffers on the heap as longer lines are processed. We can improve this by using the ArrayPool<byte> to avoid repeated buffer allocations as we parse longer lines from the client.
This code works but now we’re re-sizing the buffer which results in more buffer copies. It also uses more memory as the logic doesn’t shrink the buffer after lines are processed. To avoid this, we can store a list of buffers instead of resizing each time we cross the 1KiB buffer size.
Also, we don’t grow the the 1KiB buffer until it’s completely empty. This means we can end up passing smaller and smaller buffers to ReadAsync which will result in more calls into the operating system.
To mitigate this, we’ll allocate a new buffer when there’s less than 512 bytes remaining in the existing buffer:
This code just got much more complicated. We’re keeping track of the filled up buffers as we’re looking for the delimiter. To do this, we’re using a List<BufferSegment> here to represent the buffered data while looking for the new line delimiter. As a result, ProcessLine and IndexOf now accept a List<BufferSegment> instead of a byte[], offset and count. Our parsing logic needs to now handle one or more buffer segments.
Our server now handles partial messages, and it uses pooled memory to reduce overall memory consumption but there are still a couple more changes we need to make:
- The
byte[]we’re using from theArrayPool<byte>are just regular managed arrays. This means whenever we do aReadAsyncorWriteAsync, those buffers get pinned for the lifetime of the asynchronous operation (in order to interop with the native IO APIs on the operating system). This has performance implications on the garbage collector since pinned memory cannot be moved which can lead to heap fragmentation. Depending on how long the async operations are pending, the pool implementation may need to change. - The throughput can be optimized by decoupling the reading and processing logic. This creates a batching effect that lets the parsing logic consume larger chunks of buffers, instead of reading more data only after parsing a single line. This introduces some additional complexity:
- We need two loops that run independently of each other. One that reads from the
Socketand one that parses the buffers. - We need a way to signal the parsing logic when data becomes available.
- We need to decide what happens if the loop reading from the
Socketis “too fast”. We need a way to throttle the reading loop if the parsing logic can’t keep up. This is commonly referred to as “flow control” or “back pressure”. - We need to make sure things are thread safe. We’re now sharing a set of buffers between the reading loop and the parsing loop and those run independently on different threads.
- The memory management logic is now spread across two different pieces of code, the code that rents from the buffer pool is reading from the socket and the code that returns from the buffer pool is the parsing logic.
- We need to be extremely careful with how we return buffers after the parsing logic is done with them. If we’re not careful, it’s possible that we return a buffer that’s still being written to by the
Socketreading logic.
- We need two loops that run independently of each other. One that reads from the
The complexity has gone through the roof (and we haven’t even covered all of the cases). High performance networking usually means writing very complex code in order to eke out more performance from the system.
The goal of System.IO.Pipelines is to make writing this type of code easier.
TCP server with System.IO.Pipelines
Let’s take a look at what this example looks like with System.IO.Pipelines:
The pipelines version of our line reader has 2 loops:
FillPipeAsyncreads from theSocketand writes into thePipeWriter.ReadPipeAsyncreads from thePipeReaderand parses incoming lines.
Unlike the original examples, there are no explicit buffers allocated anywhere. This is one of pipelines’ core features. All buffer management is delegated to the PipeReader/PipeWriter implementations.
This makes it easier for consuming code to focus solely on the business logic instead of complex buffer management.
In the first loop, we first call PipeWriter.GetMemory(int) to get some memory from the underlying writer; then we call PipeWriter.Advance(int) to tell the PipeWriter how much data we actually wrote to the buffer. We then call PipeWriter.FlushAsync() to make the data available to the PipeReader.
In the second loop, we’re consuming the buffers written by the PipeWriter which ultimately comes from the Socket. When the call to PipeReader.ReadAsync() returns, we get a ReadResult which contains 2 important pieces of information, the data that was read in the form of ReadOnlySequence<byte> and a bool IsCompleted that lets the reader know if the writer is done writing (EOF). After finding the end of line (EOL) delimiter and parsing the line, we slice the buffer to skip what we’ve already processed and then we call PipeReader.AdvanceTo to tell the PipeReader how much data we have consumed.
At the end of each of the loops, we complete both the reader and the writer. This lets the underlying Pipe release all of the memory it allocated.
System.IO.Pipelines
Partial Reads
Besides handling the memory management, the other core pipelines feature is the ability to peek at data in the Pipe without actually consuming it.
PipeReader has two core APIs ReadAsync and AdvanceTo. ReadAsync gets the data in the Pipe, AdvanceTo tells the PipeReader that these buffers are no longer required by the reader so they can be discarded (for example returned to the underlying buffer pool).
Here’s an example of an http parser that reads partial data buffers data in the Pipe until a valid start line is received.
ReadOnlySequence<T>
The Pipe implementation stores a linked list of buffers that get passed between the PipeWriter and PipeReader. PipeReader.ReadAsync exposes a ReadOnlySequence<T> which is a new BCL type that represents a view over one or more segments of ReadOnlyMemory<T>, similar to Span<T> and Memory<T> which provide a view over arrays and strings.
The Pipe internally maintains pointers to where the reader and writer are in the overall set of allocated data and updates them as data is written or read. The SequencePosition represents a single point in the linked list of buffers and can be used to efficiently slice the ReadOnlySequence<T>.
Since the ReadOnlySequence<T> can support one or more segments, it’s typical for high performance processing logic to split fast and slow paths based on single or multiple segments.
For example, here’s a routine that converts an ASCII ReadOnlySequence<byte> into a string:
Back pressure and flow control
In a perfect world, reading & parsing work as a team: the reading thread consumes the data from the network and puts it in buffers while the parsing thread is responsible for constructing the appropriate data structures. Normally, parsing will take more time than just copying blocks of data from the network. As a result, the reading thread can easily overwhelm the parsing thread. The result is that the reading thread will have to either slow down or allocate more memory to store the data for the parsing thread. For optimal performance, there is a balance between frequent pauses and allocating more memory.
To solve this problem, the pipe has two settings to control the flow of data, the PauseWriterThreshold and the ResumeWriterThreshold. The PauseWriterThreshold determines how much data should be buffered before calls to PipeWriter.FlushAsync pauses. The ResumeWriterThreshold controls how much the reader has to consume before writing can resume.
PipeWriter.FlushAsync “blocks” when the amount of data in the Pipe crosses PauseWriterThreshold and “unblocks” when it becomes lower than ResumeWriterThreshold. Two values are used to prevent thrashing around the limit.
Scheduling IO
Usually when using async/await, continuations are called on either on thread pool threads or on the current SynchronizationContext.
When doing IO it’s very important to have fine-grained control over where that IO is performed so that one can take advantage of CPU caches more effectively, which is critical for high-performance applications like web servers. Pipelines exposes a PipeScheduler that determines where asynchronous callbacks run. This gives the caller fine-grained control over exactly what threads are used for IO.
An example of this in practice is in the Kestrel Libuv transport where IO callbacks run on dedicated event loop threads.
Other benefits of the PipeReader pattern:
- Some underlying systems support a “bufferless wait”, that is, a buffer never needs to be allocated until there’s actually data available in the underlying system. For example on Linux with epoll, it’s possible to wait until data is ready before actually supplying a buffer to do the read. This avoids the problem where having a large number of threads waiting for data doesn’t immediately require reserving a huge amount of memory.
- The default
Pipemakes it easy to write unit tests against networking code because the parsing logic is separated from the networking code so unit tests only run the parsing logic against in-memory buffers rather than consuming directly from the network. It also makes it easy to test those hard to test patterns where partial data is sent. ASP.NET Core uses this to test various aspects of the Kestrel’s http parser. - Systems that allow exposing the underlying OS buffers (like the Registered IO APIs on Windows) to user code are a natural fit for pipelines since buffers are always provided by the
PipeReaderimplementation.
Other Related types
As part of making System.IO.Pipelines, we also added a number of new primitive BCL types:
- MemoryPool<T>, IMemoryOwner<T>, MemoryManager<T> – .NET Core 1.0 added ArrayPool<T> and in .NET Core 2.1 we now have a more general abstraction for a pool that works over any
Memory<T>. This provides an extensibility point that lets you plug in more advanced allocation strategies as well as control how buffers are managed (for e.g. provide pre-pinned buffers instead of purely managed arrays). - IBufferWriter<T> – Represents a sink for writing synchronous buffered data. (
PipeWriterimplements this) - IValueTaskSource – ValueTask<T> has existed since .NET Core 1.1 but has gained some super powers in .NET Core 2.1 to allow allocation-free awaitable async operations. See https://github.com/dotnet/corefx/issues/27445 for more details.
How do I use Pipelines?
The APIs exist in the System.IO.Pipelines nuget package.
Here’s an example of a .NET Core 2.1 server application that uses pipelines to handle line based messages (our example above) https://github.com/davidfowl/TcpEcho. It should run with dotnet run (or by running it in Visual Studio). It listens to a socket on port 8087 and writes out received messages to the console. You can use a client like netcat or putty to make a connection to 8087 and send line based messages to see it working.
Today Pipelines powers Kestrel and SignalR, and we hope to see it at the center of many networking libraries and components from the .NET community.




Thanks for the great blog David!
I’ve been writing some TCP code based on SocketAsyncEventArgs (https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.socketasynceventargs?view=netcore-2.1). Will I see much of a performance/memory allocation improvement using Pipelines?
It definitely *can*. It’ll probably help you write more correct code first off. It’ll also force you into patterns that can improve your memory allocations and throughput (as described in the article). I’d definitely ask you to try the API out and see what you end up with.
How should we consume network streams and files using Pipelines?
Thank you. BTW, “delimeter” should be “delimiter”
Thanks!
Should it be `bytesConsumedBufferIndex = segmentIndex;` instead of `bytesConsumedBufferIndex = segmentOffset;`?
Great post!
Excellent!
The given code assumes ASCII encoding when searching for the newline. A short note about that might have been good, before people start blindly copying this code 🙂
I understand actually fixing it is outside the scope of the example, unless there is a clever way of dealing with (for example) UTF-8 that I’m not aware of…
The newline sequence is the same in UTF-8, all 7 bit ASCII characters are the same in the UTF formats. For UTF-8 support beyond ASCII, change the Encoding.ASCII to Encoding.UTF8, and verify that the read buffer is not ending mid-character.
There is a clever way.. Or rather UTF-8 is clever. It never uses values of 0-127 inside multibyte characters so you are safe to just search through to a newline and then convert the bytes up to that point into a string using UTF8.GetString().
See: https://en.wikipedia.org/wiki/UTF-8
Thanks for this! Can we please have a post like this for system.threading.channels? The readme from the corefxlabs repo seems to be the only existing documentation, and it appears to be out of date. The comments in the code are great, but aside from the hints in the early readme there doesn’t seem to be much describing how it’s all intended to fit together. Even a pointer to somewhere it’s being used in another project – haven’t had much luck finding usage so far.
SignalR uses Channels for it’s data streaming.
SignalR uses Channels for it’s data streaming feature.
Really nice article!
I wonder what it would take to abstract this even more maybe using `IObservable` and Rx though. I always found Rx’s way of representing this kind of push operation to be incredibly streamlined and simple to comprehend.
Even though the new API is drastically simpler than the old version, I still find it somewhat complex and hard to read. Maybe if another abstraction level is introduced it would be possible to make this a little bit more intuitive.
That’s fair but RX really isn’t appropriate at this level of abstraction. You can certainly build higher level object streams on top of pipelines once the data is parsed but at the raw byte level you need more fine grained control over things like allocating memory and back pressure (something RX isn’t particularly good at)
I’ve been using a custom TCP binary protocol not delimited by lines or any other character(s), just binary serialized objects with length prefixed. Is this library able to handle this kind of behavior or only line-delimited messages?
Definitively yes. You just keep reading until the length you need is completed. You can even get smart with asking buffers from the pipe that match the expected lengths.
anyone does that and writing critical code please beware DDos attacks via length prefixes. A bug or a hacker can cause OutOfMemoryException in your server. Limiting message size will prevent it.
The default PauseWriterThreshold will protect you by default from OOMing if you’re using the pipe for buffering.
Yes, it can. Your scenario is easier so the stream code would not have been quite as tricky but you’ll still get good benefits from pipelines
Don’t catch catch(exception) without of rethrowing
Great Post, Fantastic Library
David,
What about using this with things other than TCP sockets?
EG: Serial ports, for GPS data and other connected devices such as industrial control, or perhaps byte orientated streams coming in from USB devices like payment card terminals (PCSC), automated tills and such like.
Yep! Any incoming byte stream can be represented using Pipelines.
How do I handle serial port data usingPipelines?
Excellent!
Now to try a RegEx on incoming pipelines… for streaming regex (when possible, of course there’s some regex’s that end up needing the whole string)
This would have been really great to have 10 years ago, but I’m not about to refactor all our legacy code to use this.
I can’t see the code examples. Is there something missing?
Fantastic stuff!
UWP?
Excellent, thank you for the thorough explanation. Having struggled with 2 TCP listener projects I can see the advantages beside the more elegant way to deal with memory and performance. I’ll definitely keep this in mind should I ever need it again.
David, how can one use this same mechanic from a .NET Framework application or a .NET Standard library?
Because for a .NET Core application this works quite nicely, but it seems that the used Socket.ReceiveAsync overload is only available within the .NET Core implementation of SocketTaskExtensions. So far I have not been able to come up with a solution for the non-Core scopes, but I might be missing something…
I’ve got the same. But to use this very interesting pipe-approach I just have created an additional net core lib. I hope we have a chance to use SocketTaskExtensions in net standard lib in some way or in the future. Any way, definately I will use it in my hackster.io project – https://www.hackster.io/contests/wiznet/ideas/8276.
At last example, how to decode multi-segment to UTF8?
I think this can not work:
foreach (var segment in sequence)
{
Encoding.UTF8.GetChars(segment.Span, span);
span = span.Slice(segment.Length);
}