본문 바로가기

C#

C# 비동기 스트림(IAsyncEnumerable) 처리와 응용

IAsyncEnumerable는 대용량 I/O를 지연(lazy) 처리하면서 메모리를 아끼고, 자연스럽게 back-pressure(생산과 소비 균형)를 제공하는 C# 8+의 비동기 스트림입니다. 파일/네트워크/DB 등 느린 소스를 한 줄(하나)씩 받아 처리할 때 유용합니다.

1. 기본 문법: async iterator + await foreach

생산자는 async iterator로 IAsyncEnumerable을 만들고, 소비자는 await foreach로 순회합니다.

using System; 
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class Demo
{
    // 소비 측의 WithCancellation 토큰을 연결하려면 [EnumeratorCancellation]을 지정합니다.
    public static async IAsyncEnumerable GenerateNumbersAsync(
        int count = 10,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        for (int i = 0; i < count; i++)
        {
            ct.ThrowIfCancellationRequested();
            await Task.Delay(200, ct).ConfigureAwait(false);
            yield return i;
        }
    }

    public static async Task RunAsync(CancellationToken ct = default)
    {
        await foreach (var n in GenerateNumbersAsync().WithCancellation(ct).ConfigureAwait(false))
        {
            Console.WriteLine(n);
        }
    }
}

포인트: yield return으로 항목을 하나씩 내보내며, await foreach는 각 항목을 기다리며 처리합니다.

2. 취소와 타임아웃

열거를 중단하고 싶으면 CancellationToken을 연결합니다. await foreach에 WithCancellation을 붙이거나, 메서드 인자로 토큰을 직접 전달합니다.

using System;
using System.Threading;
using System.Threading.Tasks;

public static class CancelSample
{
    public static async Task RunAsync()
    {
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));

        try
        {
            await foreach (var n in Demo.GenerateNumbersAsync(100).WithCancellation(cts.Token).ConfigureAwait(false))
            {
                Console.WriteLine(n);
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("취소되었습니다.");
        }
    }
}

팁: 라이브러리 코드라면 await foreach 뒤에 ConfigureAwait(false)를 붙여 컨텍스트 캡처 비용을 줄이는 것이 좋습니다.

3. 파일/HTTP 스트리밍 예시

큰 파일을 한 줄씩, HTTP 응답을 라인 단위로 처리하는 예시입니다.

using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class Streaming
{
    public static async IAsyncEnumerable<string> ReadLinesAsync(
        string path,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        await using var stream = File.OpenRead(path);
        using var reader = new StreamReader(stream);

        while (!reader.EndOfStream)
        {
            ct.ThrowIfCancellationRequested();
            var line = await reader.ReadLineAsync().ConfigureAwait(false);
            if (line is null) yield break;
            yield return line;
        }
    }

    private static readonly HttpClient _http = new HttpClient();

    public static async IAsyncEnumerable<string> GetLinesAsync(
        string url,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        await using var stream = await _http.GetStreamAsync(url, ct).ConfigureAwait(false);
        using var reader = new StreamReader(stream);

        while (!reader.EndOfStream)
        {
            ct.ThrowIfCancellationRequested();
            var line = await reader.ReadLineAsync().ConfigureAwait(false);
            if (line is null) yield break;
            yield return line;
        }
    }
}

파일/네트워크를 한 번에 모두 읽지 않고, 들어오는 만큼 처리하므로 메모리 사용이 안정적입니다.

4. 변환/필터링/수집 유틸

간단한 비동기 Where/Select/ToListAsync 유틸입니다. 외부 패키지 없이도 파이프라인을 구성할 수 있습니다.

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class AsyncLinqLite
{
    public static async IAsyncEnumerable<T> WhereAwait<T>(
        this IAsyncEnumerable<T> source,
        Func<T, ValueTask<bool>> predicate,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        await foreach (var item in source.WithCancellation(ct).ConfigureAwait(false))
        {
            if (await predicate(item).ConfigureAwait(false))
                yield return item;
        }
    }

    public static async IAsyncEnumerable<TResult> SelectAwait<T, TResult>(
        this IAsyncEnumerable<T> source,
        Func<T, ValueTask<TResult>> projector,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        await foreach (var item in source.WithCancellation(ct).ConfigureAwait(false))
        {
            yield return await projector(item).ConfigureAwait(false);
        }
    }

    public static async Task<List<T>> ToListAsync<T>(
        this IAsyncEnumerable<T> source,
        CancellationToken ct = default)
    {
        var list = new List<T>();
        await foreach (var item in source.WithCancellation(ct).ConfigureAwait(false))
            list.Add(item);
        return list;
    }
}

// 사용 예시
// await foreach (var name in Streaming.ReadLinesAsync(path)
//     .WhereAwait(line => new ValueTask<bool>(line.StartsWith("INFO")))
//     .SelectAwait(line => new ValueTask<string>(line.ToUpperInvariant())))
// {
//     Console.WriteLine(name);
// }

복잡한 쿼리가 필요하면 System.Linq.Async 패키지(별도 NuGet)를 사용하는 것도 방법입니다.

5. 제한된 동시성 처리

각 항목을 I/O 바운드 작업으로 가공해야 한다면 동시성(Concurrent)으로 처리 시간을 줄일 수 있습니다. 아래는 간단한 제한 동시성 Select 예시입니다.

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class Concurrency
{
    public static async IAsyncEnumerable<TResult> SelectParallel<T, TResult>(
        this IAsyncEnumerable<T> source,
        int degreeOfParallelism,
        Func<T, Task<TResult>> projector,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        var gate = new SemaphoreSlim(degreeOfParallelism);
        var tasks = new List<Task<TResult>>();

        await foreach (var item in source.WithCancellation(ct).ConfigureAwait(false))
        {
            await gate.WaitAsync(ct).ConfigureAwait(false);
            var t = Task.Run(async () =>
            {
                try { return await projector(item).ConfigureAwait(false); }
                finally { gate.Release(); }
            }, ct);
            tasks.Add(t);
        }

        foreach (var t in tasks)
            yield return await t.ConfigureAwait(false);
    }
}

// 사용 예시
// await foreach (var r in Streaming.ReadLinesAsync(path)
//     .SelectParallel(8, line => DoIoWorkAsync(line)))
// {
//     Console.WriteLine(r);
// }

간단한 구현이라 결과 순서가 보장되지 않거나 메모리에 모았다가 내보낼 수 있습니다. 순서 보장/즉시 방출이 필요하면 Channel/Buffer 등을 활용해 개선합니다.

6. 베스트 프랙티스 체크리스트

- 생산자: async iterator 내부에서 I/O에는 반드시 await 사용, using/await using으로 리소스 수명 관리합니다.

- 소비자: await foreach에 WithCancellation(token)과 ConfigureAwait(false)를 고려합니다.

- 예외: 생산자 내부 예외는 소비 시점에 표면화됩니다. 필요한 곳에서 try/catch로 감쌉니다.

- 모으기: 최종 집계가 필요하면 ToListAsync 같은 유틸을 사용합니다. 모든 항목을 메모리에 담아야 하는지 검토합니다.

- 패키지: 복잡한 쿼리는 System.Linq.Async를 검토합니다.

IAsyncEnumerable는 “필요한 만큼, 가능한 한 빨리” 처리하는 스트리밍 파이프라인을 간결하게 만들어줍니다. 위 패턴을 프로젝트에 맞게 조합해 보시기 바랍니다.