본문 바로가기

C#

C# 반응형 프로그래밍(Rx.NET) 개념과 Observable 활용

Rx.NET은 이벤트와 비동기 데이터를 컬렉션처럼 다루게 해주는 라이브러리입니다. IObservable 시퀀스를 LINQ 스타일로 조합해 유지보수성과 가독성을 높일 수 있습니다. 이 글에서는 핵심 개념과 Observable 활용법을 간략히 소개합니다.

1. Rx.NET 한눈에 보기

- Push 모델: 데이터가 준비될 때 밀어줍니다(OnNext, OnError, OnCompleted).
- LINQ to Events: Select/Where 같은 연산자로 스트림을 변환/필터링합니다.
- 비동기/이벤트 통합: 타이머, UI 이벤트, 네트워크 호출을 같은 방식으로 다룹니다.

2. Observable/Observer 핵심

- IObservable: 데이터 스트림의 생산자입니다.
- IObserver: OnNext/OnError/OnCompleted를 구현하는 소비자입니다.
- 구독은 IDisposable을 반환하며, Dispose로 안전하게 해제합니다.

3. 빠른 시작: Interval 구독

using System;
using System.Reactive.Linq;

class Program
{
	static void Main()
    {        
    	var sub = Observable.Interval(TimeSpan.FromSeconds(1))
        					.Take(5) // 5개 후 완료
                            .Subscribe(x => Console.WriteLine($"Tick: {x}")
                                     , ex => Console.WriteLine($"Error: {ex.Message}")
                                     , () => Console.WriteLine("Completed"));
        Console.ReadLine();
        sub.Dispose(); // 필요 시 조기 해제    
    }
}

Interval은 Cold Observable로, 구독마다 독립적으로 시작합니다.

4. Observable 만들기 4가지

1) 이벤트를 Observable로

using System;
using System.Reactive.Linq;

// WinForms 예시
var clicks = Observable.FromEventPattern(handler => button.Click += handler
									   , handler => button.Click -= handler);
var times = clicks.Select(_ => DateTimeOffset.Now);
var sub = times.Subscribe(t => label.Text = $"Clicked @ {t:HH:mm:ss}");

2) Create로 커스텀

using System;
using System.Reactive.Linq;

var custom = Observable.Create<int>(observer => {
                                    	observer.OnNext(1);
                                        observer.OnNext(2);
                                        observer.OnCompleted();
                                        
                                        return System.Reactive.Disposables.Disposable.Empty;
                                    });

3) 타이머/간단 시퀀스

var once = Observable.Timer(TimeSpan.FromSeconds(1)); // 1초 후 1회 방출
var range = Observable.Range(1, 3); // 1,2,3

4) 비동기를 스트림으로

using System;
using System.Net.Http;
using System.Reactive.Linq;

var http = new HttpClient();

IObservable<string> getJson = Observable.FromAsync(() => http.GetStringAsync("https://example.com/data"));
var sub = getJson.Subscribe(json => Console.WriteLine(json), ex => Console.WriteLine(ex.Message));

5. 실전 연산자: 필터링/변환/결합

입력 디바운스(검색창 최적화)

using System;
using System.Reactive.Linq;

var textChanges = Observable.FromEventPattern(h => textBox.TextChanged += h
                                            , h => textBox.TextChanged -= h)
                            .Select(_ => textBox.Text)
                            .Throttle(TimeSpan.FromMilliseconds(300)) // 디바운스    
                            .DistinctUntilChanged()    
                            .Where(t => t.Length >= 2);
var sub = textChanges.Subscribe(query => Search(query));

결합/머지

var a = Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => (int)x);
var b = Observable.Interval(TimeSpan.FromSeconds(2)).Select(x => (int)x * 10);

var latestSum = a.CombineLatest(b, (x, y) => x + y); // 최신값 합
var zipped = a.Zip(b, (x, y) => (x, y)); // 순서 맞춰 페어링
var merged = a.Merge(b.Select(x => x / 10)); // 같은 타입 스트림 합치기

비동기 확장

// 텍스트 쿼리 -> HTTP -> 파싱 체인
var results = textChanges.SelectMany(q => Observable.FromAsync(() => http.GetStringAsync($"/api?q={q}"))).Select(Parse);

6. Cold vs Hot, Subject와 Publish

- Cold: 구독 시마다 새로 실행(Interval, Timer, FromAsync 등).
- Hot: 생산이 구독과 무관하게 진행(Subject, UI 이벤트, 센서 등).

using System;
using System.Reactive.Linq; 

// Cold를 공유해 Hot처럼 사용
var shared = Observable.Interval(TimeSpan.FromSeconds(1))
                       .Publish()
                       .RefCount(); // 첫 구독 시작, 마지막 해제 시 중지
var s1 = shared.Subscribe(x => Console.WriteLine($"A:{x}"));
var s2 = shared.Subscribe(x => Console.WriteLine($"B:{x}"));

Subject는 핫 소스이지만 남용 시 메모리/결합도 문제가 생길 수 있습니다. 가능하면 Publish/RefCount로 공유하거나, 필요한 경우에만 Subject를 사용합니다.

7. 스케줄러와 UI 스레드

- SubscribeOn: 생산자 실행 스레드 지정
- ObserveOn: 이후 연산/구독 콜백 스레드 지정

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

// UI 업데이트는 UI 스레드로 마샬링(예: WPF)
IObservable<long> ticks = Observable.Interval(TimeSpan.FromMilliseconds(500))
                                    .SubscribeOn(NewThreadScheduler.Default)
                                    .ObserveOn(System.Reactive.Concurrency.DispatcherScheduler.Current);
var sub = ticks.Subscribe(x => label.Content = x);

WinForms는 Control.Invoke를 감싸는 SynchronizationContext를 사용할 수 있습니다. 패키지별로 DispatcherScheduler가 포함되며, UI 초기화 이후에 Current가 활성화됩니다.

8. 오류 처리와 재시도

using System;
using System.Reactive.Linq;

IObservable<string> resilient = getJson.Timeout(TimeSpan.FromSeconds(3))
                                       .Retry(2) // 일시 오류 재시도
                                       .Catch(Observable.Return("{}")); // 폴백

OnError로 스트림이 종료되므로, 재시도/복구가 필요하면 Catch/Retry를 연산자로 조합합니다.

9. 수명/메모리 관리 팁

- 구독 해제: using 또는 Dispose 호출로 누수를 방지합니다.
- CompositeDisposable로 여러 구독을 묶어 관리합니다.
- TakeUntil/TakeWhile로 자동 해제를 걸어두면 안전합니다.

using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;

var bag = new CompositeDisposable();
var stop = new System.Reactive.Subjects.Subject<Unit>();
var sub = Observable.Interval(TimeSpan.FromSeconds(1))
                    .TakeUntil(stop) // stop 신호 시 자동 해제    
                    .Finally(() => Console.WriteLine("cleaned"))    
                    .Subscribe(Console.WriteLine);
bag.Add(sub);// 어딘가에서 종료 시그널발행
stop.OnNext(Unit.Default);
stop.OnCompleted();
bag.Dispose();

10. 테스트 힌트

Microsoft.Reactive.Testing의 TestScheduler로 가상 시간을 사용하면 타이밍 의존 로직을 안정적으로 테스트할 수 있습니다(디바운스/타임아웃 등).

요약: Rx.NET은 이벤트와 비동기를 하나의 일관된 Observable 모델로 통합합니다. 작은 연산자를 조합해 복잡도를 낮추고, 스케줄러/수명 관리를 함께 고려하면 실전에서 강력한 효과를 얻을 수 있습니다.