본문 바로가기

C#

C# Parallel.For와 Parallel.ForEach로 데이터 병렬 처리

데이터 병렬 처리는 CPU 코어를 최대한 활용해 반복 작업을 빠르게 처리하는 기법입니다. C#에서는 Task Parallel Library(TPL)의 Parallel.For와 Parallel.ForEach로 간단하게 구현할 수 있습니다. 실무에서 성능 이슈를 만났을 때, 이 두 메서드로 안전하고 빠르게 가속하는 방법과 주의점을 정리합니다.

1. 기본 사용법: 반복을 코어에 분배합니다

반복문을 자동으로 분할해 여러 스레드에서 동시에 실행합니다. 순서 보장은 없으며, 각 반복은 독립적이어야 안전합니다.

using System;
using System.Linq;
using System.Threading.Tasks;

var data = Enumerable.Range(1, 10_000).ToArray();

// Parallel.For: 인덱스 기반
Parallel.For(0, data.Length, i =>
{
    data[i] = data[i] * data[i]; // CPU 바운드 예시
});

// Parallel.ForEach: 컬렉션 기반
Parallel.ForEach(data, x =>
{
    var r = Math.Sqrt(x); // 각 요소 처리
});

한 반복에서 예외가 발생하면 전체가 AggregateException으로 래핑되어 던져집니다. 공유 상태를 수정할 때는 동기화가 필요합니다.

2. ParallelOptions로 병렬성 제어와 취소

코어 수 제한과 취소 토큰으로 제어하면 예측 가능성과 응답성을 높일 수 있습니다.

using System;
using System.Threading;
using System.Threading.Tasks;

var cts = new CancellationTokenSource();
var options = new ParallelOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount, // 필요시 코어 수 조정
    CancellationToken = cts.Token
};

// 예시: 500ms 후 취소
var cancelTask = Task.Run(() =>
{
    Thread.Sleep(500);
    cts.Cancel();
});

try
{
    Parallel.For(0, 10_000_000, options, i =>
    {
        // CPU 바운드 작업
        double v = Math.Sqrt(i);
        // 필요시 명시적 체크
        options.CancellationToken.ThrowIfCancellationRequested();
    });
}
catch (OperationCanceledException)
{
    Console.WriteLine("작업이 취소되었습니다.");
}

MaxDegreeOfParallelism은 항상 높을수록 좋은 것이 아닙니다. 캐시 미스, 스레드 경합이 늘 수 있으니 벤치마크로 최적값을 찾습니다.

3. 예외 처리와 빠른 중단: Break vs Stop

Parallel.For/ForEach는 여러 스레드에서 예외가 모일 수 있어 AggregateException으로 전달됩니다. 일부 조건에서 조기 종료가 필요할 때는 Break/Stop을 구분합니다.

using System;
using System.Threading.Tasks;

try
{
    Parallel.ForEach(new[] { 1, 2, 3, 4 }, x =>
    {
        if (x == 3) throw new InvalidOperationException("테스트 예외");
    });
}
catch (AggregateException ex)
{
    foreach (var e in ex.Flatten().InnerExceptions)
        Console.WriteLine(e.Message);
}

// Break: 특정 인덱스 이후 반복은 생략(For 전용, 순방향 기준)
var result = Parallel.For(0, 1_000_000, (i, state) =>
{
    if (i == 12_345) state.Break();
});

if (result.LowestBreakIteration.HasValue)
    Console.WriteLine($"Break 호출: {result.LowestBreakIteration}");

Stop은 모든 남은 반복을 가능한 빨리 멈추려는 의도입니다. Break는 범위를 제한하지만 이미 예약된 작업은 일부 더 실행될 수 있습니다.

4. 안전한 집계: 스레드 로컬 패턴으로 락 최소화

공유 합계/통계를 계산할 때 매 반복마다 락을 잡으면 성능이 급락합니다. 스레드 로컬 누적 후 한 번에 병합합니다.

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

var numbers = Enumerable.Range(1, 1_000_000).ToArray();
long sum = 0;

// 스레드 로컬 초기값 -> 본문에서 지역 누적 -> 최종 병합
Parallel.For<long>(
    0, numbers.Length,
    () => 0L,
    (i, state, local) => local + numbers[i],
    local => Interlocked.Add(ref sum, local)
);

Console.WriteLine(sum);

double 누적이 필요하면 Interlocked.CompareExchange로 커스텀하거나, 스레드 로컬을 합칠 때만 lock을 사용합니다.

5. 큰 컬렉션 최적화: Partitioner로 범위 분할

인덱스 범위를 묶어 처리하면 스케줄링 오버헤드를 줄이고 캐시 지역성을 높일 수 있습니다.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

var data = Enumerable.Range(1, 10_000_000).ToArray();
long sum = 0;

// 인덱스 범위 파티셔닝
var ranges = Partitioner.Create(0, data.Length);
Parallel.ForEach(ranges, range =>
{
    long local = 0;
    for (int i = range.Item1; i < range.Item2; i++)
        local += data[i];

    Interlocked.Add(ref sum, local);
});

Console.WriteLine(sum);

순서가 중요 없다면 Partitioner.Create(컬렉션)으로 동적 파티셔닝도 가능합니다. 작은 작업을 크게 묶어(grain size) 호출 횟수를 줄이면 이득이 큽니다.

6. 공유 컬렉션: Concurrent 계열 사용

여러 반복이 동시에 컬렉션에 쓰기한다면 ConcurrentBag/Queue/Dictionary를 사용합니다. List/Dictionary는 보호 없이 쓰면 안 됩니다.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;

var data = Enumerable.Range(1, 1000);
var bag = new ConcurrentBag<int>();

Parallel.ForEach(data, x =>
{
    bag.Add(x * x);
});

Console.WriteLine($"Count = {bag.Count}");

Concurrent 컬렉션은 편리하지만 오버헤드가 있습니다. 가능하면 섹션 4처럼 스레드 로컬 후 병합이 더 빠릅니다.

7. 언제 쓰고, 언제 피해야 할까요?

권장: CPU 바운드(해시, 암호화, 이미지 처리, 수치 계산), 반복당 작업량이 충분히 큰 경우, 순서가 중요 없는 경우입니다.

비권장: I/O 바운드나 비동기 작업(Parallel은 async/await와 바로 섞기 어렵습니다), 반복당 작업량이 매우 작아 스케줄링 비용이 더 큰 경우, 강한 순서 보장이 필요한 경우입니다. I/O 바운드는 Task.WhenAll 또는 .NET 6의 Parallel.ForEachAsync를 고려합니다.

8. 성능 팁과 진단

Stopwatch 또는 BenchmarkDotNet으로 벤치마크하고, MaxDegreeOfParallelism을 조절해 최적점을 찾습니다. 워밍업을 포함하고, 릴리스 빌드 + 서버 GC 설정을 확인합니다. 성능 회귀는 ETW/PerfView나 dotnet-trace로 확인합니다.

9. 실무 체크리스트

1) 반복이 독립적인가요? 2) 공유 상태는 스레드 안전한가요? 3) 집계는 스레드 로컬로 병합하나요? 4) 적절한 병렬도와 취소를 설정했나요? 5) 데이터 파티셔닝으로 캐시 지역성을 챙겼나요? 6) 벤치마크로 실제 이득을 검증했나요?

Parallel.For/ForEach는 올바른 문제에 쓰면 매우 강력합니다. 위 원칙과 패턴을 적용하면 안정적으로 성능을 끌어올릴 수 있습니다.