본문 바로가기

C#

C# System.IO.Pipelines를 활용한 스트리밍 데이터 처리

System.IO.Pipelines는 고성능 스트리밍 I/O를 위해 설계된 API로, 최소한의 복사, 명확한 백프레셔, 효율적인 파싱을 제공합니다. 네트워크 소켓, 파일 스트림 등에서 연속적으로 도착하는 데이터를 안전하고 빠르게 처리할 수 있습니다.

핵심은 PipeReader와 PipeWriter를 통해 데이터를 읽고 쓰면서 ReadOnlySequence를 기반으로 메시지 경계를 파싱하는 것입니다.

1. 기본 개념 요약

Pipe는 생산자-소비자 모델입니다. Writer는 버퍼에 채우고 FlushAsync로 내보내며, Reader는 ReadAsync로 버퍼를 받아 파싱 후 AdvanceTo(consumed, examined)로 소비한 지점을 표시합니다. consumed는 실제로 사용 완료한 위치, examined는 다음 Read에서 다시 보길 원하는 마지막 위치입니다.

2. 라인 기반 프로토콜 처리 예제 (CRLF, \n)

아래 예제는 NetworkStream에서 데이터를 Pipe로 채우고, Reader에서 줄 단위로 파싱합니다. 부분 메시지, 경계 파싱, 완료 처리까지 포함합니다.

using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

public static class LinePipeline
{
    public static async Task RunAsync(NetworkStream stream, CancellationToken token)
    {
        var pipe = new Pipe();
        Task writing = FillPipeAsync(stream, pipe.Writer, token);
        Task reading = ReadPipeAsync(pipe.Reader, token);
        await Task.WhenAll(reading, writing);
    }

    private static async Task FillPipeAsync(Stream stream, PipeWriter writer, CancellationToken token)
    {
        const int minBufferSize = 512;
        try
        {
            while (true)
            {
                Memory<byte> memory = writer.GetMemory(minBufferSize);
                int bytesRead = await stream.ReadAsync(memory, token).ConfigureAwait(false);
                if (bytesRead == 0) break; // EOF
                writer.Advance(bytesRead);

                FlushResult result = await writer.FlushAsync(token).ConfigureAwait(false);
                if (result.IsCompleted || result.IsCanceled) break;
            }
        }
        finally
        {
            await writer.CompleteAsync().ConfigureAwait(false);
        }
    }

    private static async Task ReadPipeAsync(PipeReader reader, CancellationToken token)
    {
        try
        {
            while (true)
            {
                ReadResult result = await reader.ReadAsync(token).ConfigureAwait(false);
                ReadOnlySequence<byte> buffer = result.Buffer;

                while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
                {
                    ProcessLine(line);
                }

                reader.AdvanceTo(buffer.Start, buffer.End);

                if (result.IsCompleted && buffer.Length == 0)
                    break;
            }
        }
        finally
        {
            await reader.CompleteAsync().ConfigureAwait(false);
        }
    }

    private static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
    {
        SequencePosition? position = buffer.PositionOf((byte)'\n');
        if (position == null)
        {
            line = default;
            return false;
        }

        line = buffer.Slice(0, position.Value);
        // 이동: '\n' 다음으로 버퍼를 진행
        SequencePosition next = buffer.GetPosition(1, position.Value);
        buffer = buffer.Slice(next);
        return true;
    }

    private static void ProcessLine(ReadOnlySequence<byte> line)
    {
        // CR 제거 및 문자열 변환 (데모용으로 ToArray 사용, 실제 서비스는 Span/SequenceReader 활용 권장)
        string text = Encoding.UTF8.GetString(line.ToArray()).TrimEnd('\r');
        Console.WriteLine(text);
    }
}

핵심 포인트는 PositionOf로 줄 경계를 찾고, AdvanceTo로 정확한 consumed/examined를 지정해 불필요한 복사를 방지하며 부분 메시지를 다음 Read로 넘기는 것입니다.

3. Stream을 직접 감싸기: PipeReader.Create / PipeWriter.Create

Pipe를 수동으로 구성하지 않고 기존 Stream을 바로 감싸 처리할 수 있습니다. 아래 예제는 길이-프리픽스 프레이밍(헤더 4바이트 + 페이로드) 파싱과 전송입니다.

using System;
using System.Buffers.Binary;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;

public static class FramedProtocol
{
    public static async Task ReadFramesAsync(Stream stream, CancellationToken token)
    {
        PipeReader reader = PipeReader.Create(stream);
        try
        {
            while (true)
            {
                ReadResult result = await reader.ReadAsync(token).ConfigureAwait(false);
                ReadOnlySequence<byte> buffer = result.Buffer;

                while (TryReadFrame(ref buffer, out ReadOnlySequence<byte> payload))
                {
                    HandleFrame(payload);
                }

                reader.AdvanceTo(buffer.Start, buffer.End);
                if (result.IsCompleted) break;
            }
        }
        finally
        {
            await reader.CompleteAsync().ConfigureAwait(false);
        }
    }

    private static bool TryReadFrame(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> payload)
    {
        payload = default;
        if (buffer.Length < 4) return false;

        Span<byte> header = stackalloc byte[4];
        buffer.Slice(0, 4).CopyTo(header);
        int len = BinaryPrimitives.ReadInt32LittleEndian(header);
        if (len < 0) throw new InvalidOperationException("Invalid length");
        if (buffer.Length < 4 + len) return false; // 부분 메시지

        payload = buffer.Slice(4, len);
        buffer = buffer.Slice(4 + len);
        return true;
    }

    private static void HandleFrame(ReadOnlySequence<byte> payload)
    {
        // 필요한 처리 수행 (파싱/디코딩 등)
    }

    public static async Task WriteFrameAsync(Stream stream, ReadOnlyMemory<byte> payload, CancellationToken token)
    {
        PipeWriter writer = PipeWriter.Create(stream);
        Span<byte> header = stackalloc byte[4];
        BinaryPrimitives.WriteInt32LittleEndian(header, payload.Length);

        writer.Write(header);
        writer.Write(payload.Span);
        await writer.FlushAsync(token).ConfigureAwait(false);
        await writer.CompleteAsync().ConfigureAwait(false);
    }
}

PipeReader.Create와 PipeWriter.Create는 기존 라이브러리와의 호환성을 높여주며, 파이프의 메모리 관리 이점을 그대로 활용할 수 있습니다.

4. 백프레셔, 취소, 완료 처리

FlushAsync는 소비자 속도에 따라 대기할 수 있어 자연스러운 백프레셔를 제공합니다. ReadAsync, FlushAsync 모두 CancellationToken을 지원하므로 서버 종료나 타임아웃 시 빠르게 중단할 수 있습니다. 스트림이 종료되면 Writer/Reader에 대해 반드시 CompleteAsync를 호출해 리소스를 반환합니다.

5. 성능 팁

가능하면 ToArray로 전체 복사하지 말고 ReadOnlySequence와 SequenceReader를 활용해 부분 파싱합니다. 큰 메시지는 BinaryPrimitives로 헤더를 직접 디코딩합니다. AdvanceTo에서 examined를 적절히 지정해 불필요한 wake-up을 줄입니다. PipeOptions로 버퍼 임계값을 조정해 메모리 사용량과 지연을 균형 있게 설정합니다.

정리하면, Pipelines는 스트리밍 데이터 경계 파싱, 백프레셔 제어, 최소 복사를 자연스럽게 구현할 수 있는 도구입니다. 기존 Stream 기반 코드에서도 점진적으로 도입해 성능과 안정성을 개선할 수 있습니다.