IAsyncEnumerable는 대용량 I/O를 지연(lazy) 처리하면서 메모리를 아끼고, 자연스럽게 back-pressure(생산과 소비 균형)를 제공하는 C# 8+의 비동기 스트림입니다. 파일/네트워크/DB 등 느린 소스를 한 줄(하나)씩 받아 처리할 때 유용합니다.
1. 기본 문법: async iterator + await foreach
생산자는 async iterator로 IAsyncEnumerable을 만들고, 소비자는 await foreach로 순회합니다.
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public static class Demo
{
// 소비 측의 WithCancellation 토큰을 연결하려면 [EnumeratorCancellation]을 지정합니다.
public static async IAsyncEnumerable GenerateNumbersAsync(
int count = 10,
[EnumeratorCancellation] CancellationToken ct = default)
{
for (int i = 0; i < count; i++)
{
ct.ThrowIfCancellationRequested();
await Task.Delay(200, ct).ConfigureAwait(false);
yield return i;
}
}
public static async Task RunAsync(CancellationToken ct = default)
{
await foreach (var n in GenerateNumbersAsync().WithCancellation(ct).ConfigureAwait(false))
{
Console.WriteLine(n);
}
}
}
포인트: yield return으로 항목을 하나씩 내보내며, await foreach는 각 항목을 기다리며 처리합니다.
2. 취소와 타임아웃
열거를 중단하고 싶으면 CancellationToken을 연결합니다. await foreach에 WithCancellation을 붙이거나, 메서드 인자로 토큰을 직접 전달합니다.
using System;
using System.Threading;
using System.Threading.Tasks;
public static class CancelSample
{
public static async Task RunAsync()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
try
{
await foreach (var n in Demo.GenerateNumbersAsync(100).WithCancellation(cts.Token).ConfigureAwait(false))
{
Console.WriteLine(n);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("취소되었습니다.");
}
}
}
팁: 라이브러리 코드라면 await foreach 뒤에 ConfigureAwait(false)를 붙여 컨텍스트 캡처 비용을 줄이는 것이 좋습니다.
3. 파일/HTTP 스트리밍 예시
큰 파일을 한 줄씩, HTTP 응답을 라인 단위로 처리하는 예시입니다.
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public static class Streaming
{
public static async IAsyncEnumerable<string> ReadLinesAsync(
string path,
[EnumeratorCancellation] CancellationToken ct = default)
{
await using var stream = File.OpenRead(path);
using var reader = new StreamReader(stream);
while (!reader.EndOfStream)
{
ct.ThrowIfCancellationRequested();
var line = await reader.ReadLineAsync().ConfigureAwait(false);
if (line is null) yield break;
yield return line;
}
}
private static readonly HttpClient _http = new HttpClient();
public static async IAsyncEnumerable<string> GetLinesAsync(
string url,
[EnumeratorCancellation] CancellationToken ct = default)
{
await using var stream = await _http.GetStreamAsync(url, ct).ConfigureAwait(false);
using var reader = new StreamReader(stream);
while (!reader.EndOfStream)
{
ct.ThrowIfCancellationRequested();
var line = await reader.ReadLineAsync().ConfigureAwait(false);
if (line is null) yield break;
yield return line;
}
}
}
파일/네트워크를 한 번에 모두 읽지 않고, 들어오는 만큼 처리하므로 메모리 사용이 안정적입니다.
4. 변환/필터링/수집 유틸
간단한 비동기 Where/Select/ToListAsync 유틸입니다. 외부 패키지 없이도 파이프라인을 구성할 수 있습니다.
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public static class AsyncLinqLite
{
public static async IAsyncEnumerable<T> WhereAwait<T>(
this IAsyncEnumerable<T> source,
Func<T, ValueTask<bool>> predicate,
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var item in source.WithCancellation(ct).ConfigureAwait(false))
{
if (await predicate(item).ConfigureAwait(false))
yield return item;
}
}
public static async IAsyncEnumerable<TResult> SelectAwait<T, TResult>(
this IAsyncEnumerable<T> source,
Func<T, ValueTask<TResult>> projector,
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var item in source.WithCancellation(ct).ConfigureAwait(false))
{
yield return await projector(item).ConfigureAwait(false);
}
}
public static async Task<List<T>> ToListAsync<T>(
this IAsyncEnumerable<T> source,
CancellationToken ct = default)
{
var list = new List<T>();
await foreach (var item in source.WithCancellation(ct).ConfigureAwait(false))
list.Add(item);
return list;
}
}
// 사용 예시
// await foreach (var name in Streaming.ReadLinesAsync(path)
// .WhereAwait(line => new ValueTask<bool>(line.StartsWith("INFO")))
// .SelectAwait(line => new ValueTask<string>(line.ToUpperInvariant())))
// {
// Console.WriteLine(name);
// }
복잡한 쿼리가 필요하면 System.Linq.Async 패키지(별도 NuGet)를 사용하는 것도 방법입니다.
5. 제한된 동시성 처리
각 항목을 I/O 바운드 작업으로 가공해야 한다면 동시성(Concurrent)으로 처리 시간을 줄일 수 있습니다. 아래는 간단한 제한 동시성 Select 예시입니다.
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public static class Concurrency
{
public static async IAsyncEnumerable<TResult> SelectParallel<T, TResult>(
this IAsyncEnumerable<T> source,
int degreeOfParallelism,
Func<T, Task<TResult>> projector,
[EnumeratorCancellation] CancellationToken ct = default)
{
var gate = new SemaphoreSlim(degreeOfParallelism);
var tasks = new List<Task<TResult>>();
await foreach (var item in source.WithCancellation(ct).ConfigureAwait(false))
{
await gate.WaitAsync(ct).ConfigureAwait(false);
var t = Task.Run(async () =>
{
try { return await projector(item).ConfigureAwait(false); }
finally { gate.Release(); }
}, ct);
tasks.Add(t);
}
foreach (var t in tasks)
yield return await t.ConfigureAwait(false);
}
}
// 사용 예시
// await foreach (var r in Streaming.ReadLinesAsync(path)
// .SelectParallel(8, line => DoIoWorkAsync(line)))
// {
// Console.WriteLine(r);
// }
간단한 구현이라 결과 순서가 보장되지 않거나 메모리에 모았다가 내보낼 수 있습니다. 순서 보장/즉시 방출이 필요하면 Channel/Buffer 등을 활용해 개선합니다.
6. 베스트 프랙티스 체크리스트
- 생산자: async iterator 내부에서 I/O에는 반드시 await 사용, using/await using으로 리소스 수명 관리합니다.
- 소비자: await foreach에 WithCancellation(token)과 ConfigureAwait(false)를 고려합니다.
- 예외: 생산자 내부 예외는 소비 시점에 표면화됩니다. 필요한 곳에서 try/catch로 감쌉니다.
- 모으기: 최종 집계가 필요하면 ToListAsync 같은 유틸을 사용합니다. 모든 항목을 메모리에 담아야 하는지 검토합니다.
- 패키지: 복잡한 쿼리는 System.Linq.Async를 검토합니다.
IAsyncEnumerable는 “필요한 만큼, 가능한 한 빨리” 처리하는 스트리밍 파이프라인을 간결하게 만들어줍니다. 위 패턴을 프로젝트에 맞게 조합해 보시기 바랍니다.
'C#' 카테고리의 다른 글
| C# ValueTask와 Task 성능 비교 및 활용 전략 (0) | 2026.05.27 |
|---|---|
| C# System.Buffers.ArrayPool<T>로 메모리 재활용하기 (0) | 2026.05.26 |
| C# 고급 이벤트 패턴: EventArgs 상속과 데이터 전달 (0) | 2026.05.26 |
| C# ThreadLocal<T>로 스레드별 데이터 관리 (0) | 2026.05.26 |
| C# 암시적/명시적 변환 연산자(implicit/explicit) 구현하기 (0) | 2026.05.25 |