TakeUntil 的官方文档对这个操作符的解释是:
Emit values until provided observable emits.
即它可以被赋予另一个起锚定作用的 Observable,当该锚定 Observable emit 值时,原始的 Observable 就停止发射值,进入 complete 操作。
看一个实际的例子:
import { interval, timer } from 'rxjs'; import { takeUntil } from 'rxjs/operators'; //emit value every 1s const source = interval(1000); //after 5 seconds, emit value const timer$ = timer(5000); //when timer emits after 5s, complete source const example = source.pipe(takeUntil(timer$)); //output: 0,1,2,3 const subscribe = example.subscribe(val => console.log(val));
source Observable 每个1秒的时间间隔,发射一个从 0 开始递增间隔为 1 的整数序列。
我们构造了一个 timer Observable,超时时间间隔为 5 秒,也就是说,在第五秒时,该 Observable 会发送一个值。这个 timer Observable 传入 takeUntil,作为一个 notification Observable,五秒钟之后,source Observable 就会停止发射整数。
最后上述程序执行的输出:4 秒之内打印 0~4,然后结束。
另一个例子:
const interval = interval(1000); const clicks = fromEvent(document, 'click'); const result = interval.pipe(takeUntil(clicks)); result.subscribe(x => console.log(x));
这个例子里,interval 作为原始的 Observable,clicks 作为 notification Observable,整个程序的表现形式是,每个 1 秒有一个递增1的整数序列打印,直至页面发生点击事件时,原始 interval Observable 终止。
看另一个例子:
// RxJS v6+ import { interval } from 'rxjs/observable/interval'; import { takeUntil, filter, scan, map, withLatestFrom } from 'rxjs/operators'; //emit value every 1s const source = interval(1000); //is number even? const isEven = val => val % 2 === 0; //only allow values that are even const evenSource = source.pipe(filter(isEven)); //keep a running total of the number of even numbers out const evenNumberCount = evenSource.pipe(scan((acc, _) => acc + 1, 0)); //do not emit until 5 even numbers have been emitted const fiveEvenNumbers = evenNumberCount.pipe(filter(val => val > 5)); const example = evenSource.pipe( //also give me the current even number count for display withLatestFrom(evenNumberCount), map(([val, count]) => `Even number (${count}) : ${val}`), //when five even numbers have been emitted, complete source observable takeUntil(fiveEvenNumbers) ); /* Even number (1) : 0, Even number (2) : 2 Even number (3) : 4 Even number (4) : 6 Even number (5) : 8 */ const subscribe = example.subscribe(val => console.log(val));
我们逐行分析这个例子的逻辑:
const evenSource = source.pipe(filter(isEven));
产生一个每隔1秒发射一个偶数的 Observable.
const evenNumberCount = evenSource.pipe(scan((acc, _) => acc + 1, 0));
对产生的偶数的个数进行累加。
const fiveEvenNumbers = evenNumberCount.pipe(filter(val => val > 5));
当产生的偶数个数大于 5 时,发射值。这个 Observable 作为 takeUntil 的 notification Observable 使用。
const example = evenSource.pipe( //also give me the current even number count for display withLatestFrom(evenNumberCount), map(([val, count]) => `Even number (${count}) : ${val}`), //when five even numbers have been emitted, complete source observable takeUntil(fiveEvenNumbers) );
- 使用 eventSource 和 eventNumberCount,通过
withLatestFrom
将两个 Observable 进行连接,从而在 map Operator 里,可以同时打印出当前发射的偶数值和偶数总量。通过 takeUntil 传入一个只有在偶数总数个数大于 5 时才发射值的 Observable,可以做到偶数总数大于 5 之后,让 interval 停止值的发送。
最后的执行效果:
到此这篇关于Rxjs TakeUntil 操作符内容梳理总结的文章就介绍到这了,更多相关Rxjs TakeUntil 内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!