Rx.NET은 이벤트와 비동기 데이터를 컬렉션처럼 다루게 해주는 라이브러리입니다. IObservable 시퀀스를 LINQ 스타일로 조합해 유지보수성과 가독성을 높일 수 있습니다. 이 글에서는 핵심 개념과 Observable 활용법을 간략히 소개합니다.
1. Rx.NET 한눈에 보기
- Push 모델: 데이터가 준비될 때 밀어줍니다(OnNext, OnError, OnCompleted).
- LINQ to Events: Select/Where 같은 연산자로 스트림을 변환/필터링합니다.
- 비동기/이벤트 통합: 타이머, UI 이벤트, 네트워크 호출을 같은 방식으로 다룹니다.
2. Observable/Observer 핵심
- IObservable: 데이터 스트림의 생산자입니다.
- IObserver: OnNext/OnError/OnCompleted를 구현하는 소비자입니다.
- 구독은 IDisposable을 반환하며, Dispose로 안전하게 해제합니다.
3. 빠른 시작: Interval 구독
using System;
using System.Reactive.Linq;
class Program
{
static void Main()
{
var sub = Observable.Interval(TimeSpan.FromSeconds(1))
.Take(5) // 5개 후 완료
.Subscribe(x => Console.WriteLine($"Tick: {x}")
, ex => Console.WriteLine($"Error: {ex.Message}")
, () => Console.WriteLine("Completed"));
Console.ReadLine();
sub.Dispose(); // 필요 시 조기 해제
}
}
Interval은 Cold Observable로, 구독마다 독립적으로 시작합니다.
4. Observable 만들기 4가지
1) 이벤트를 Observable로
using System;
using System.Reactive.Linq;
// WinForms 예시
var clicks = Observable.FromEventPattern(handler => button.Click += handler
, handler => button.Click -= handler);
var times = clicks.Select(_ => DateTimeOffset.Now);
var sub = times.Subscribe(t => label.Text = $"Clicked @ {t:HH:mm:ss}");
2) Create로 커스텀
using System;
using System.Reactive.Linq;
var custom = Observable.Create<int>(observer => {
observer.OnNext(1);
observer.OnNext(2);
observer.OnCompleted();
return System.Reactive.Disposables.Disposable.Empty;
});
3) 타이머/간단 시퀀스
var once = Observable.Timer(TimeSpan.FromSeconds(1)); // 1초 후 1회 방출
var range = Observable.Range(1, 3); // 1,2,3
4) 비동기를 스트림으로
using System;
using System.Net.Http;
using System.Reactive.Linq;
var http = new HttpClient();
IObservable<string> getJson = Observable.FromAsync(() => http.GetStringAsync("https://example.com/data"));
var sub = getJson.Subscribe(json => Console.WriteLine(json), ex => Console.WriteLine(ex.Message));
5. 실전 연산자: 필터링/변환/결합
입력 디바운스(검색창 최적화)
using System;
using System.Reactive.Linq;
var textChanges = Observable.FromEventPattern(h => textBox.TextChanged += h
, h => textBox.TextChanged -= h)
.Select(_ => textBox.Text)
.Throttle(TimeSpan.FromMilliseconds(300)) // 디바운스
.DistinctUntilChanged()
.Where(t => t.Length >= 2);
var sub = textChanges.Subscribe(query => Search(query));
결합/머지
var a = Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => (int)x);
var b = Observable.Interval(TimeSpan.FromSeconds(2)).Select(x => (int)x * 10);
var latestSum = a.CombineLatest(b, (x, y) => x + y); // 최신값 합
var zipped = a.Zip(b, (x, y) => (x, y)); // 순서 맞춰 페어링
var merged = a.Merge(b.Select(x => x / 10)); // 같은 타입 스트림 합치기
비동기 확장
// 텍스트 쿼리 -> HTTP -> 파싱 체인
var results = textChanges.SelectMany(q => Observable.FromAsync(() => http.GetStringAsync($"/api?q={q}"))).Select(Parse);
6. Cold vs Hot, Subject와 Publish
- Cold: 구독 시마다 새로 실행(Interval, Timer, FromAsync 등).
- Hot: 생산이 구독과 무관하게 진행(Subject, UI 이벤트, 센서 등).
using System;
using System.Reactive.Linq;
// Cold를 공유해 Hot처럼 사용
var shared = Observable.Interval(TimeSpan.FromSeconds(1))
.Publish()
.RefCount(); // 첫 구독 시작, 마지막 해제 시 중지
var s1 = shared.Subscribe(x => Console.WriteLine($"A:{x}"));
var s2 = shared.Subscribe(x => Console.WriteLine($"B:{x}"));
Subject는 핫 소스이지만 남용 시 메모리/결합도 문제가 생길 수 있습니다. 가능하면 Publish/RefCount로 공유하거나, 필요한 경우에만 Subject를 사용합니다.
7. 스케줄러와 UI 스레드
- SubscribeOn: 생산자 실행 스레드 지정
- ObserveOn: 이후 연산/구독 콜백 스레드 지정
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
// UI 업데이트는 UI 스레드로 마샬링(예: WPF)
IObservable<long> ticks = Observable.Interval(TimeSpan.FromMilliseconds(500))
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOn(System.Reactive.Concurrency.DispatcherScheduler.Current);
var sub = ticks.Subscribe(x => label.Content = x);
WinForms는 Control.Invoke를 감싸는 SynchronizationContext를 사용할 수 있습니다. 패키지별로 DispatcherScheduler가 포함되며, UI 초기화 이후에 Current가 활성화됩니다.
8. 오류 처리와 재시도
using System;
using System.Reactive.Linq;
IObservable<string> resilient = getJson.Timeout(TimeSpan.FromSeconds(3))
.Retry(2) // 일시 오류 재시도
.Catch(Observable.Return("{}")); // 폴백
OnError로 스트림이 종료되므로, 재시도/복구가 필요하면 Catch/Retry를 연산자로 조합합니다.
9. 수명/메모리 관리 팁
- 구독 해제: using 또는 Dispose 호출로 누수를 방지합니다.
- CompositeDisposable로 여러 구독을 묶어 관리합니다.
- TakeUntil/TakeWhile로 자동 해제를 걸어두면 안전합니다.
using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
var bag = new CompositeDisposable();
var stop = new System.Reactive.Subjects.Subject<Unit>();
var sub = Observable.Interval(TimeSpan.FromSeconds(1))
.TakeUntil(stop) // stop 신호 시 자동 해제
.Finally(() => Console.WriteLine("cleaned"))
.Subscribe(Console.WriteLine);
bag.Add(sub);// 어딘가에서 종료 시그널발행
stop.OnNext(Unit.Default);
stop.OnCompleted();
bag.Dispose();
10. 테스트 힌트
Microsoft.Reactive.Testing의 TestScheduler로 가상 시간을 사용하면 타이밍 의존 로직을 안정적으로 테스트할 수 있습니다(디바운스/타임아웃 등).
요약: Rx.NET은 이벤트와 비동기를 하나의 일관된 Observable 모델로 통합합니다. 작은 연산자를 조합해 복잡도를 낮추고, 스케줄러/수명 관리를 함께 고려하면 실전에서 강력한 효과를 얻을 수 있습니다.
'C#' 카테고리의 다른 글
| C# AssemblyLoadContext로 플러그인 아키텍처 만들기 (0) | 2026.05.08 |
|---|---|
| C# 애플리케이션에서 메모리 풀(Object Pool) 구현하기 (0) | 2026.05.08 |
| C# System.IO.Pipelines를 활용한 스트리밍 데이터 처리 (0) | 2026.05.07 |
| C# Parallel.For와 Parallel.ForEach로 데이터 병렬 처리 (0) | 2026.05.07 |
| C# Channel<T>를 이용한 고성능 생산자-소비자 패턴 구현 (1) | 2026.05.06 |