Share via

Improving System.IO.Pipelines buffer utilization

Henric Jungheim 20 Reputation points
2024-07-31T18:14:05.4766667+00:00

Our current mechanism for writing to a System.IO.Pipelines Pipe is a loop that writes small bits of data to the PipeWriter. Inside that loop, we call FlushAsync() after at least N bytes have been written. We discovered a case where MinimumSegmentSize was not an integer multiple of the size of those "small bits" we're writing. This resulted in the reader getting data with segments alternating in size between almost MinimumSegmentSize and nearly zero (the one "small bit" that made it into the buffer).

For now, our work-around is to detect that a buffer is as full as it's ever going to get by looking at the size of the buffer returned by .GetMemory(). We now do something like this:

  PipeOptions _pipeOptions = ... ;
  const int FlushThreshold = ... ;
...
  await foreach (var thing in source.ReadEmAllAsync())
  {
    var size = thing.Size;
    var buffer = writer.GetMemory(size);

    if (buffer.Length >= _pipeOptions.MinimumSegmentSize
      && writer.UnflushedBytes >= FlushThreshold)
    {
        await writer.FlushAsync();
        buffer = writer.GetMemory(size);
    }
    
    thing.CopyTo(buffer.Span[..size]);
    writer.Advance(size);
  }

The buffers presented to the reader are now almost always full, but is this really the right way to solve the problem? It doesn't seem right for the code using the PipeWriter to have to know so much about the pipe's internal buffering strategy. Is there some cleaner way to flush any and all completed buffers, without pushing through tiny partial buffers?

In our application, in addition to the obvious memory usage implications, this has a throughput impact since the code that reads from the pipe is more efficient when presented with large chunks of contiguous data. Write consolidation might not be a primary application for System.IO.Pipelines, but except for this glitch we are now working around, it works really well for that purpose.

The above call to .Advance(0); is commented out because it results in zero-length segments being presented to the reader, but is that a correct usage of the API since this means calling .GetMemory() without a corresponding .Advance()?

Thanks.

Developer technologies | .NET | Other
0 comments No comments

Answer accepted by question author

Anonymous
2024-08-01T03:22:05.4933333+00:00

Hi @Henric Jungheim , Welcome to Microsoft Q&A,

Given the constraints and the need to efficiently manage buffer sizes without relying heavily on internal details, you can focus on using Advance and FlushAsync more strategically, along with leveraging PipeWriter.CreateBuffer to handle the memory management more efficiently.

var remaining = int.MaxValue;
const int flushThreshold = 131070; // Adjust based on your needs

await foreach (var thing in source.ReadEmAllAsync())
{
    var size = thing.Size;

    // If the remaining space in the buffer is not enough for the new data
    if (size > remaining)
    {
        await writer.FlushAsync();
        remaining = int.MaxValue; // Reset remaining after flush
    }

    var buffer = writer.GetMemory(size);
    thing.CopyTo(buffer.Span[..size]);

    remaining = buffer.Length - size;
    writer.Advance(size);

    // Optionally, flush based on a threshold to avoid over-accumulating
    if (writer.UnflushedBytes >= flushThreshold)
    {
        await writer.FlushAsync();
        remaining = int.MaxValue; // Reset remaining after flush
    }
}

Best Regards,

Jiale


If the answer is the right solution, please click "Accept Answer" and kindly upvote it. If you have extra questions about this answer, please click "Comment". 

Note: Please follow the steps in our documentation to enable e-mail notifications if you want to receive the related email notification for this thread.

Was this answer helpful?


1 additional answer

Sort by: Most helpful
  1. Henric Jungheim 20 Reputation points
    2024-08-01T16:18:22.2166667+00:00

    This work-around doesn't need to call .GetMemory() multiple times:

      PipeOptions _pipeOptions = ... ;
      const int FlushThreshold = ... ;
    ...
      var remaining = int.MaxValue;
      await foreach (var thing in source.ReadEmAllAsync())
      {
        var size = thing.Size;
    
        if (size > remaining)
        {
          // We could also check if the total bytes written
          // exceeds FlushThreshold before flushing and still
          // get good buffer utilization.
          await writer.FlushAsync();
        }
    
        var buffer = writer.GetMemory(size);
        
        thing.CopyTo(buffer.Span[..size]);
    
        remaining = buffer.Length - size;
    
        writer.Advance(size);
      }
    

    The current Pipe API does not provide any way to do this that so that the code does not need to make assumptions about the given pipe's buffering strategy?

    Was this answer helpful?

    0 comments No comments

Your answer

Answers can be marked as 'Accepted' by the question author and 'Recommended' by moderators, which helps users know the answer solved the author's problem.