먼저 알아 둘 개념
hot observable / cold observable
hot
생성과 동시에 emit 되기 시작하고 구독하는 Observer는 아무때나 관찰을 시작할 수 있다. hot observable 중에는 Connectable Oabservable 이 있는데 구독을 하더라도 connect() 가 실행되기 전까지는 아이템을 emit 하지 않는다. 반대로 구독을 하지 않더라도 connect()가 실행이 되면 아이템을 emit 하고 그 중간에 구독을 하면 그 때부터는 emit된 아이템을 받아서 처리할 수 있다.
cold
구독을 하면 그 때부터 아이템을 emit 하게 된다. 일반적인 Observable이 cold observable에 속한다.
구독을 여러개 하면, 그 때마다 각각 아이템을 emit하고 받는 과정이 진행된다.
publish
일반 Observable을 Connectable Observable로 변환한다.
Connectable Observable
멀티캐스팅으로 진행되는 Observable로써 connect()를 실행하면 그 때부터 emit이 되기 시작한다.
subscribe로 구독을 하더라도 connect()를 실행하지 않고서는 emit이 되지 않고 connect()를 실행해야 emit이 시작되기 때문에 emit되는 시점을 조절을 할 수 있다.
멀티캐스팅이 되기 때문에 subscribe를 여러개 걸어두고(구독) connect()를 해서 emit을 시작 시키면
한번의 연산으로 모든 구독자들에게 emit된 값을 보낼 수가 있다.
구독과 상관없이 connect()로써 emit이 진행되므로 unsubscribe 와 별개로 직접 연결을 끊어줘야 한다.
- // 개미님이 쓰신 코드를 참고하였습니다.
- ConnectableObservable<String> observable = Observable
- .range(0, 4)
- .timestamp()
- .map(timestamped -> {
- System.out.println("\n________________map 연산________________\n");
- return String.format("[%d] %d", timestamped.getValue(), timestamped.getTimestampMillis());
- })
- .doOnNext(value -> count.increase())
- .publish();
- observable.subscribe(value -> {
- System.out.println("subscriber1 : " + value);
- });
- observable.subscribe(value -> {
- System.out.println("subscriber2 : " + value);
- });
- observable.connect();
- System.out.println("연산횟수 : " + count.count());
- /** 결과
- ________________map 연산________________
- subscriber1 : [0] 1466580780476
- subscriber2 : [0] 1466580780476
- ________________map 연산________________
- subscriber1 : [1] 1466580780476
- subscriber2 : [1] 1466580780476
- ________________map 연산________________
- subscriber1 : [2] 1466580780477
- subscriber2 : [2] 1466580780477
- ________________map 연산________________
- subscriber1 : [3] 1466580780477
- subscriber2 : [3] 1466580780477
- 연산횟수 : 4
- */
(출처 : https://moka-a.github.io/android/rxAndroid_study/)
share (publish.refCount)
refCount
Connectable Observable를 일반 Observable같이 행동하게 만든다.
share 메서드 내부를 보면 단순히 return publish().refCount(); 이 호출되고 있다.
refCount 에 count라는 이름이 붙은 만큼, 구독자가 얼마나 있는지 추적을 하면서 진행이 된다.
처음 구독(subscribe)이 되면 그 때부터 connect()가 되어 emit이 진행되고 그다음 추가로 구독이 이뤄지면 또 그 구독자들도 emit이 별도로 진행이 된다.
publish 오퍼레이터와는 다르게 구독을 하는 구독자들이 모두 없어지면 emit이 더이상 진행되지 않는다.
따라서 emit이 1부터 계속 증가가 되면서 진행된다고 했을 때,
publish()는 connect()로 연결을 한 경우에는 구독을 모두 해제하고 다시 연결을 해도 emit 을 이어서(가령 5부터) 받을 수 있지만 share()는 구독을 모두 해제하고 다시 연결을 하면 1부터 새로 받게 된다.
- Observable<String> observable = Observable
- .range(0, 4)
- .timestamp()
- .map(timestamped -> {
- System.out.println("\n________________map 연산________________\n");
- return String.format("[%d] %d", timestamped.getValue(), timestamped.getTimestampMillis());
- })
- .doOnNext(value -> count.increase())
- .publish().refCount();
- observable.subscribe(value -> {
- System.out.println("subscriber1 : " + value);
- });
- System.out.println("*******************************************************");
- observable.subscribe(value -> {
- System.out.println("subscriber2 : " + value);
- });
- System.out.println("연산횟수 : " + count.count());
- /** 결과
- ________________map 연산________________
- subscriber1 : [0] 1466581248156
- ________________map 연산________________
- subscriber1 : [1] 1466581248157
- ________________map 연산________________
- subscriber1 : [2] 1466581248157
- ________________map 연산________________
- subscriber1 : [3] 1466581248157
- *****************************************
- ________________map 연산________________
- subscriber2 : [0] 1466581248158
- ________________map 연산________________
- subscriber2 : [1] 1466581248158
- ________________map 연산________________
- subscriber2 : [2] 1466581248159
- ________________map 연산________________
- subscriber2 : [3] 1466581248159
- 연산횟수 : 8
- */
(출처 : https://moka-a.github.io/android/rxAndroid_study/)
위 결과로만 보면 사실상 일반 Observable 과 다를바가 없어 보인다.
하지만 PublishSubject와 같은 것을 이용해서 emit을 나중에 진행시키면 한번의 연산으로 모든 구독자들에게 emit된 값을 받아볼 수 있게 하는 멀티캐스팅이 가능하다.
- PublishSubject<Integer> publishSubject = PublishSubject.create();
- Observable<String> observable = publishSubject
- .timestamp()
- .map( timestamped -> {
- System.out.println("\n________________map 연산________________\n");
- count.increase();
- return String.format("[%d] %d", timestamped.getValue(), timestamped.getTimestampMillis());
- } )
- .publish().refCount();
- observable.subscribe( value -> {
- System.out.println("subscriber1 : " + value);
- });
- observable.subscribe(value -> {
- System.out.println("subscriber2 : " + value);
- });
- publishSubject.onNext( 1 );
- publishSubject.onNext( 2 );
- publishSubject.onNext( 3 );
- publishSubject.onNext( 4 );
- System.out.println("연산횟수 : " + count.count());
- /** 결과
- ________________map 연산________________
- subscriber1 : [1] 1466583114891
- subscriber2 : [1] 1466583114891
- ________________map 연산________________
- subscriber1 : [2] 1466583114891
- subscriber2 : [2] 1466583114891
- ________________map 연산________________
- subscriber1 : [3] 1466583114891
- subscriber2 : [3] 1466583114891
- ________________map 연산________________
- subscriber1 : [4] 1466583114892
- subscriber2 : [4] 1466583114892
- 연산횟수 : 4
- */
https://moka-a.github.io/android/rxAndroid_study/
https://gist.github.com/QuadFlask/145e80b4ac54d1541e2d38d9ce762a57
http://tiii.tistory.com/26
http://blog.kaush.co/2015/01/21/rxjava-tip-for-the-day-share-publish-refcount-and-all-that-jazz/
http://www.tailec.com/blog/understanding-publish-connect-refcount-share
http://reactivex.io/documentation/operators.html#connectable
https://brunch.co.kr/@tilltue/18