package com.samzhai.rxinterceptor; import io.reactivex.Observable; import io.reactivex.ObservableSource; import io.reactivex.ObservableTransformer; import io.reactivex.functions.Predicate; import io.reactivex.subjects.BehaviorSubject; /** * Created by 翟勇 on 2017/11/1. * 用于取消RxJava的事件订阅,使用的原理是takeUtil ** 通过一个Behavior,进行事件的发送作为开关,在takeUtil里面进行判断,如果是OFF就停止订阅,如果是ON就执行订阅 */ public class RxSwitcher { private BehaviorSubject
bs; /** * 初始化的时候对bs进行赋值 */ public RxSwitcher() { bs = BehaviorSubject.create(); } /** * 使用的时候就是使用这个方法,在原本的Observable的操作过程中添加compose操作符, * 并调用此方法传入compose中。 * * @param * @return */ public ObservableTransformer injectSwitcher() { return new ObservableTransformer () { @Override public ObservableSource apply(Observable upstream) { return upstream.takeUntil(bs.filter(new Predicate () { @Override public boolean test(Switcher switcher) throws Exception { // 通过判断当前是否关闭来决定是否继续操作 // 因为是takeUntil操作符,意思是反的,所以判断的是OFF,不是判断ON return switcher == Switcher.OFF; } })); } }; } /** * 临时关闭,即先关掉使流程结束,再将开关打开,以便可以下一次的操作 */ public void turnOff() { bs.onNext(Switcher.OFF); bs.onNext(Switcher.ON); } /** * 直接关闭,不能重新开始操作,除非手动调用turnOn */ public void turnDie() { bs.onNext(Switcher.OFF); } /** * 打开开关,以便可以正常使用订阅 */ public void turnOn() { bs.onNext(Switcher.ON); } /** * 枚举,作为开与关的标示 */ private enum Switcher { ON, OFF } }