当前位置 : 主页 > 编程语言 > c++ >

RxSwitcher取消订阅的开关 - 轻量总共也就几行代码 - 没有入侵性,不需要继承

来源:互联网 收集:自由互联 发布时间:2021-06-30
RxSwitcher.java package com.samzhai.rxinterceptor;import io.reactivex.Observable;import io.reactivex.ObservableSource;import io.reactivex.ObservableTransformer;import io.reactivex.functions.Predicate;import io.reactivex.subjects.BehaviorSub
RxSwitcher.java
package com.samzhai.rxinterceptor;

import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Predicate;
import io.reactivex.subjects.BehaviorSubject;

/**
 * Created by 翟勇 on 2017/11/1.
 * 用于取消RxJava的事件订阅,使用的原理是takeUtil
 * 

* 通过一个Behavior,进行事件的发送作为开关,在takeUtil里面进行判断,如果是OFF就停止订阅,如果是ON就执行订阅 */ public class RxSwitcher { private BehaviorSubject bs; /** * 初始化的时候对bs进行赋值 */ public RxSwitcher() { bs = BehaviorSubject.create(); } /** * 使用的时候就是使用这个方法,在原本的Observable的操作过程中添加compose操作符, * 并调用此方法传入compose中。 * * @param * @return */ public ObservableTransformer injectSwitcher() { return new ObservableTransformer () { @Override public ObservableSource apply(Observable upstream) { return upstream.takeUntil(bs.filter(new Predicate () { @Override public boolean test(Switcher switcher) throws Exception { // 通过判断当前是否关闭来决定是否继续操作 // 因为是takeUntil操作符,意思是反的,所以判断的是OFF,不是判断ON return switcher == Switcher.OFF; } })); } }; } /** * 临时关闭,即先关掉使流程结束,再将开关打开,以便可以下一次的操作 */ public void turnOff() { bs.onNext(Switcher.OFF); bs.onNext(Switcher.ON); } /** * 直接关闭,不能重新开始操作,除非手动调用turnOn */ public void turnDie() { bs.onNext(Switcher.OFF); } /** * 打开开关,以便可以正常使用订阅 */ public void turnOn() { bs.onNext(Switcher.ON); } /** * 枚举,作为开与关的标示 */ private enum Switcher { ON, OFF } }

网友评论