먼저 알아 둘 개념


hot observable / cold observable


hot

생성과 동시에 emit 되기 시작하고 구독하는 Observer는 아무때나 관찰을 시작할 수 있다. hot observable 중에는 Connectable Oabservable 이 있는데 구독을 하더라도 connect() 가 실행되기 전까지는 아이템을 emit 하지 않는다. 반대로 구독을 하지 않더라도 connect()가 실행이 되면 아이템을 emit 하고 그 중간에 구독을 하면 그 때부터는 emit된 아이템을 받아서 처리할 수 있다.


cold

구독을 하면 그 때부터 아이템을 emit 하게 된다. 일반적인 Observable이 cold observable에 속한다.

구독을 여러개 하면, 그 때마다 각각 아이템을 emit하고 받는 과정이 진행된다.



publish

일반 Observable을 Connectable Observable로 변환한다.


Connectable Observable

멀티캐스팅으로 진행되는 Observable로써 connect()를 실행하면 그 때부터 emit이 되기 시작한다.

subscribe로 구독을 하더라도 connect()를 실행하지 않고서는 emit이 되지 않고 connect()를 실행해야 emit이 시작되기 때문에 emit되는 시점을 조절을 할 수 있다.

멀티캐스팅이 되기 때문에 subscribe를 여러개 걸어두고(구독) connect()를 해서 emit을 시작 시키면

한번의 연산으로 모든 구독자들에게 emit된 값을 보낼 수가 있다.

구독과 상관없이 connect()로써 emit이 진행되므로 unsubscribe 와 별개로 직접 연결을 끊어줘야 한다.


  1. // 개미님이 쓰신 코드를 참고하였습니다.  
  2. ConnectableObservable<String> observable = Observable  
  3.     .range(04)  
  4.     .timestamp()  
  5.     .map(timestamped -> {  
  6.         System.out.println("\n________________map 연산________________\n");  
  7.         return String.format("[%d] %d", timestamped.getValue(), timestamped.getTimestampMillis());  
  8.     })  
  9.     .doOnNext(value -> count.increase())  
  10.     .publish();  
  11.   
  12. observable.subscribe(value -> {  
  13.     System.out.println("subscriber1 : " + value);  
  14. });  
  15.   
  16. observable.subscribe(value -> {  
  17.     System.out.println("subscriber2 : " + value);  
  18. });  
  19.   
  20. observable.connect();  
  21. System.out.println("연산횟수 : " + count.count());          
  22.   
  23. /** 결과 
  24.     ________________map 연산________________ 
  25.  
  26.     subscriber1 : [0] 1466580780476 
  27.     subscriber2 : [0] 1466580780476 
  28.  
  29.     ________________map 연산________________ 
  30.  
  31.     subscriber1 : [1] 1466580780476 
  32.     subscriber2 : [1] 1466580780476 
  33.  
  34.     ________________map 연산________________ 
  35.  
  36.     subscriber1 : [2] 1466580780477 
  37.     subscriber2 : [2] 1466580780477 
  38.  
  39.     ________________map 연산________________ 
  40.  
  41.     subscriber1 : [3] 1466580780477 
  42.     subscriber2 : [3] 1466580780477 
  43.     연산횟수 : 4 
  44. */  

(출처 : https://moka-a.github.io/android/rxAndroid_study/)


share (publish.refCount)


refCount

Connectable Observable를 일반 Observable같이 행동하게 만든다.


share 메서드 내부를 보면 단순히 return publish().refCount(); 이 호출되고 있다.


refCount 에 count라는 이름이 붙은 만큼, 구독자가 얼마나 있는지 추적을 하면서 진행이 된다.

처음 구독(subscribe)이 되면 그 때부터 connect()가 되어 emit이 진행되고 그다음 추가로 구독이 이뤄지면 또 그 구독자들도 emit이 별도로 진행이 된다.

publish 오퍼레이터와는 다르게 구독을 하는 구독자들이 모두 없어지면 emit이 더이상 진행되지 않는다.


따라서 emit이 1부터 계속 증가가 되면서 진행된다고 했을 때,

publish()는 connect()로 연결을 한 경우에는 구독을 모두 해제하고 다시 연결을 해도 emit 을 이어서(가령 5부터) 받을 수 있지만 share()는 구독을 모두 해제하고 다시 연결을 하면 1부터 새로 받게 된다.



  1. Observable<String> observable = Observable  
  2.     .range(04)  
  3.     .timestamp()  
  4.     .map(timestamped -> {  
  5.         System.out.println("\n________________map 연산________________\n");  
  6.         return String.format("[%d] %d", timestamped.getValue(), timestamped.getTimestampMillis());  
  7.     })  
  8.     .doOnNext(value -> count.increase())  
  9.     .publish().refCount();  
  10.   
  11. observable.subscribe(value -> {  
  12.     System.out.println("subscriber1 : " + value);  
  13. });  
  14. System.out.println("*******************************************************");  
  15. observable.subscribe(value -> {  
  16.     System.out.println("subscriber2 : " + value);  
  17. });  
  18.   
  19. System.out.println("연산횟수 : " + count.count());    
  20.   
  21. /** 결과 
  22.     ________________map 연산________________ 
  23.  
  24.     subscriber1 : [0] 1466581248156 
  25.  
  26.     ________________map 연산________________ 
  27.  
  28.     subscriber1 : [1] 1466581248157 
  29.  
  30.     ________________map 연산________________ 
  31.  
  32.     subscriber1 : [2] 1466581248157 
  33.  
  34.     ________________map 연산________________ 
  35.  
  36.     subscriber1 : [3] 1466581248157 
  37.     ***************************************** 
  38.     ________________map 연산________________ 
  39.  
  40.     subscriber2 : [0] 1466581248158 
  41.  
  42.     ________________map 연산________________ 
  43.  
  44.     subscriber2 : [1] 1466581248158 
  45.  
  46.     ________________map 연산________________ 
  47.  
  48.     subscriber2 : [2] 1466581248159 
  49.  
  50.     ________________map 연산________________ 
  51.  
  52.     subscriber2 : [3] 1466581248159 
  53.     연산횟수 : 8 
  54. */  

(출처 : https://moka-a.github.io/android/rxAndroid_study/)


위 결과로만 보면 사실상 일반 Observable 과 다를바가 없어 보인다.

하지만 PublishSubject와 같은 것을 이용해서 emit을 나중에 진행시키면 한번의 연산으로 모든 구독자들에게 emit된 값을 받아볼 수 있게 하는 멀티캐스팅이 가능하다.


  1. PublishSubject<Integer> publishSubject = PublishSubject.create();  
  2.   
  3. Observable<String> observable = publishSubject  
  4.         .timestamp()  
  5.         .map( timestamped -> {  
  6.             System.out.println("\n________________map 연산________________\n");  
  7.             count.increase();  
  8.             return String.format("[%d] %d", timestamped.getValue(), timestamped.getTimestampMillis());  
  9.         } )  
  10.         .publish().refCount();  
  11.   
  12. observable.subscribe( value -> {  
  13.     System.out.println("subscriber1 : " + value);  
  14. });  
  15.   
  16. observable.subscribe(value -> {  
  17.     System.out.println("subscriber2 : " + value);  
  18. });  
  19.   
  20. publishSubject.onNext( 1 );  
  21. publishSubject.onNext( 2 );  
  22. publishSubject.onNext( 3 );  
  23. publishSubject.onNext( 4 );  
  24.   
  25. System.out.println("연산횟수 : " + count.count());  
  26.   
  27. /** 결과 
  28.     ________________map 연산________________ 
  29.  
  30.     subscriber1 : [1] 1466583114891 
  31.     subscriber2 : [1] 1466583114891 
  32.  
  33.     ________________map 연산________________ 
  34.  
  35.     subscriber1 : [2] 1466583114891 
  36.     subscriber2 : [2] 1466583114891 
  37.  
  38.     ________________map 연산________________ 
  39.  
  40.     subscriber1 : [3] 1466583114891 
  41.     subscriber2 : [3] 1466583114891 
  42.  
  43.     ________________map 연산________________ 
  44.  
  45.     subscriber1 : [4] 1466583114892 
  46.     subscriber2 : [4] 1466583114892 
  47.     연산횟수 : 4 
  48. */  



https://moka-a.github.io/android/rxAndroid_study/

https://gist.github.com/QuadFlask/145e80b4ac54d1541e2d38d9ce762a57

http://tiii.tistory.com/26

http://blog.kaush.co/2015/01/21/rxjava-tip-for-the-day-share-publish-refcount-and-all-that-jazz/

http://www.tailec.com/blog/understanding-publish-connect-refcount-share

http://reactivex.io/documentation/operators.html#connectable

https://brunch.co.kr/@tilltue/18

Posted by 윤연식
,