到目前为止我一直在使用Rx Java,但我开始使用来自projectreactor.io的reactor-core,因为它遵循反应流规范. 在下面的测试中,我创建了一个生成随机数的热Flux(ConnectableFlux).我立即连接()它预取2
在下面的测试中,我创建了一个生成随机数的热Flux(ConnectableFlux).我立即连接()它预取256个值(我可以在日志中看到它们实际上有258个).我等待5秒钟来模拟订阅者直到一段时间后才会订阅.
主线程唤醒后,RnApp订阅了ConnectableFlux,randomNumberGenerator.subscribe(new RnApp());.然后调用RnApp.onSubscribe()并请求10个元素.之后,java.lang.IllegalStateException:Queue full?!异常被引发(调用RnApp.onError()),为什么?
订户:
public class RnApp implements Subscriber<Float>{ private Subscription subscription; private List<Float> randomNumbers = new ArrayList<Float>(); @Override public void onComplete() { System.out.println("onComplete"); } @Override public void onError(Throwable err) { err.printStackTrace(); } @Override public void onNext(Float f) { if(this.randomNumbers.size()>=10){ this.subscription.cancel(); }else{ this.randomNumbers.add(f); } } @Override public void onSubscribe(Subscription subs) { this.subscription = subs; this.subscription.request(10); } }
出版商测试:
@Test public void randomNumberReading() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); ConnectableFlux<Float> randomNumberGenerator = ConnectableFlux.<Float>create( (c) -> { SecureRandom sr = new SecureRandom(); int i = 1; while(true){ try { Thread.sleep(10); } catch (Exception e) { e.printStackTrace(); } System.out.println("-----------------------------------------------------"+(i++)); c.onNext(sr.nextFloat()); } }).log().subscribeOn(Computations.concurrent()).publish(); randomNumberGenerator.connect(); Thread.sleep(5000); randomNumberGenerator.subscribe(new RnApp()); latch.await(); }
日志:
11:12:05.125 [main] DEBUG r.core.util.Logger$LoggerFactory - Using Slf4j logging framework 11:12:05.363 [concurrent-1] INFO reactor.core.publisher.FluxLog - onSubscribe(io.pivotal.literx.Part10SubscribeOnPublishOn$$Lambda$1/1586600255@29d4caeb) 11:12:05.371 [concurrent-1] INFO reactor.core.publisher.FluxLog - request(256) -----------------------------------------------------1 11:12:06.000 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.39189225) -----------------------------------------------------2 ... -----------------------------------------------------257 11:12:08.683 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.34729618) -----------------------------------------------------258 11:12:08.697 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.7729547) java.lang.IllegalStateException: Queue full?! at reactor.core.publisher.FluxPublish$State.onNext(FluxPublish.java:246) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onNext(FluxSubscribeOn.java:134) at reactor.core.publisher.FluxLog$LoggerBarrier.doNext(FluxLog.java:130) at reactor.core.subscriber.SubscriberBarrier.onNext(SubscriberBarrier.java:85) at reactor.core.subscriber.SubscriberWithContext.onNext(SubscriberWithContext.java:92) at io.pivotal.literx.Part10SubscribeOnPublishOn.lambda$1(Part10SubscribeOnPublishOn.java:132) at reactor.core.publisher.FluxGenerate$ForEachBiConsumer.accept(FluxGenerate.java:145) at reactor.core.publisher.FluxGenerate$ForEachBiConsumer.accept(FluxGenerate.java:114) at reactor.core.publisher.FluxGenerate$SubscriberProxy.request(FluxGenerate.java:245) at reactor.core.subscriber.SubscriberBarrier.doRequest(SubscriberBarrier.java:146) at reactor.core.publisher.FluxLog$LoggerBarrier.doRequest(FluxLog.java:160) at reactor.core.subscriber.SubscriberBarrier.request(SubscriberBarrier.java:135) at reactor.core.util.DeferredSubscription.set(DeferredSubscription.java:71) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onSubscribe(FluxSubscribeOn.java:129) at reactor.core.publisher.FluxLog$LoggerBarrier.doOnSubscribe(FluxLog.java:122) at reactor.core.subscriber.SubscriberBarrier.onSubscribe(SubscriberBarrier.java:67) at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:72) at reactor.core.publisher.FluxLog.subscribe(FluxLog.java:67) at reactor.core.publisher.FluxSubscribeOn$SourceSubscribeTask.run(FluxSubscribeOn.java:363) at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:919) at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:883) at reactor.core.publisher.WorkQueueProcessor$QueueSubscriberLoop.run(WorkQueueProcessor.java:842) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)与RxJava一样,如果您使用的是create(),那么您可以自己处理取消和背压.您可以使用标准运算符构建生成器:
ConnectableFlux<Double> secureRandomFlux = Flux.using( () -> new SecureRandom(), sr -> Flux.interval(10, TimeUnit.MILLISECONDS) .map(v -> sr.nextDouble()) .onBackpressureDrop() sr -> { } ).publish();