ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

rxjava背压,flutterplugin迁移升级

2022-01-29 17:05:52  阅读:160  来源: 互联网

标签:Observable void 背压 flutterplugin Override rxjava new public


或者这样理解:

水坝在储水的过程中同样也向下游放水来保持坝内的水位,但是如果发大洪水,上游水量很大,而大坝处理能力有限,坝内的水位必定会上升甚至最终漫过大坝。

在RxJava中,阻塞不一定会出现异常,但是肯定会多少对系统的性能和功能造成一定的影响。


阻塞是怎么形成的?

正如上面所说的当下游不能快速处理上游发来的事件事件时,而造成的事件阻塞现象。

RxJava1.0

RxJava1.0中,Observable是支持背压的,翻下源码,可以看到在Rxjava1.0中的Buffer的大小为16

Observable.java 3551行

public final Observable<List> buffer(Observable boundary) {
return buffer(boundary, 16);
}

Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0;; i++) { //无限循环发事件
subscriber.onNext(i);
}
}
}).subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d(TAG, “” + integer);
}
});

效果如下:

由于缓存池buffer的大小为16,照理说程序运行肯定会抛出个我们熟悉的异常
MissingBackpressureException啊,但是结果却是令我们万分诧异,我的内存啊……。

其实,原因很简单,由于RxJava观察者线程和被观察者处于同一线程,在同一个线程中,被观察需要等待观察者将事件处理完毕后才会继续发送下面的事件,所以上面才会出现这样的情况。

那么,我们让他们处于不同的线程再试下

Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
for (int i = 0; ; i++) { //无限循环发事件
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {

Log.d(TAG, “” + integer);
}
});

你要的MissingBackpressureException拿去不谢。

什么?你不信Rxjava1.0中的Buffer大小是16?你不信算了……
开玩笑,我是一个不以理服人的人吗?很显然,是的!

继续吃上面的大栗子:

Observable.create(new Observable.OnSubscribe() {
@Override
public v
oid call(Subscriber<? super Integer> subscriber) {
for (int i = 0;i<16; i++) { //短时间,发送16个事件
subscriber.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d(TAG, “” + integer);
}
});

看看这是啥?这不就是下游对上游事件的相应吗?请注意,这次我们仅仅发送了16个事件,下游就能正常处理事件了,但是如果我们把循环值改成17,我们再来看看。

Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0;i<17; i++) {//短时间,发送17个事件
subscriber.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d(TAG, “” + integer);
}
});

没错,没给你闹着玩,他真的又抛出大家喜欢的异常了,没毛病。

不知道,大家看完了RxJava1.0的背压,对它有什么看法?下面是我对RxJava1.0背压的一些理解

  1. 首先,RxJava1.0的背压事件缓存池很小,只有16,不能够处理较大量的并发事件。
  2. Rxjava1.0 中上游(被观察者)无法得知下游(观察者)对事件的处理能力和事件处理进度,只能把事件一股脑的抛给下游。
  3. Rxjava1.0有很多事件不被能正确的背压,从而抛出MissingBackpressureException

RxJava2.0

RXJava2.0中Observable不再支持背压,多出了Flowable来支持背压操作

既然说Observable不再支持背压,那么我们随便搞应该就不会报哪个MissingBackpressureException了吧?

Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
for (int i = 0; ; i++) { //无限循环发事件
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {

Log.d(TAG, “” + integer);
}
});

上面的例子中我们创建了一个Observable(被观察者/上游)来发送无限循环的事件,观察者(下游)让下游来处理事件。

虽然说,不报异常了但是这内存也是看的我眼疼,崩溃也是正常现象啊,这么看来背压操作还是很有必要的啊,不然程序分分钟崩溃一次,怪我咯^&

如何解决阻塞

当然,提到如何解决阻塞问题吗,大家肯定会首先想到“背压”啊,好吧背压策略确实很神奇,但是它也不是万能的啊,你不了解也不能乱用啊,它也是哥,不要太迷信它

在上面的例子里,就上一个,不是其他的!在上面的例子里我们再RxJava2.0中是使用Observable一直发送死循环事件,由于下游没有任何背压策略,所以上游的每个事件,下游都要一一进行处理,所以程序的内存就一直开车,最后翻车也再说难免。

确实,是因为上游在短时间发送太多的事件,让下游来不及处理就造成了事件的阻塞,那么我们是否可以用一些自己的方法来解决这种阻塞呢?

使用背压啊!

“你妹的,说好的用自己的办法呢?”

首先,我们分析阻塞形成的原因,无非是因为下面的原因啊:

  1. 上游的水流过快(上游发送事件过快)
  2. 上游水流量过大(上游发送事件过多)

总结来说就是短时间发送的事件过多,下游忙不过来!

好吧,首先我们用第一种办法试下,让上游发送事件的速度慢点

//控制发送速度,减少内存消耗

Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
for (int i = 0; ; i++) { //无限循环发事件
emitter.onNext(i);
Thread.sleep(1000);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {

Log.d(TAG, “” + integer);
}
});

这样来看,我的内存就稳定,老铁稳。

那么,试试第二种方法,下游少接收点事件

//定时取样
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
for (int i = 0; ; i++) { //无限循环发事件
emitter.onNext(i);

}
}
}).sample(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {

Log.d(TAG, “” + integer);
}
});

或者是用过滤操作符,过滤掉一些上游事件

//过滤器 过滤操作
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
for (int i = 0; ; i++) { //无限循环发事件
emitter.onNext(i);

}
}
}).filter(new Predicate() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 100 == 0;

}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {

Log.d(TAG, “” + integer);
}
});


背压策略

上面唠唠叨叨说了那么多,基本上也给大家阐明了阻塞形成的原因和解决阻塞的方法,基本策略就是减少发送事件的频率和减少发送事件的数量。

But……

我们手动让上游发送事件的速度满下来貌似是不可取的,你想让上游的速度十多快呢?上游需要等多久呢?

还有……

我们依旧无法知道下游处理事件的能力,无法很好地处理阻塞的事件。

当然,你们肯定会说RxJava2.0不是很好地支了背压了吗?是的,确实比较好地对阻塞做了处理,咱们来看下吧。

在RxJava2.0中官方,推出了Flowable 和Subscriber用来支持背压,同样的去除了Observable对背压的支持,对的就像你上面看到的,Observable不再支持背压了,即使阻塞崩溃也不会抛出MissingBackpressureException

还是上代码看看,Flowable的用法吧。

Flowable.create(FlowableOnSubscribe source, BackpressureStrategy mode)

创建Flowable会默认让传入一个FlowableOnSubscribe和一个BackpressureStrategy,FlowableOnSubscribe很好理解就是一个就是Flowable的一个被观察者源,而BackpressureStrategy就是Flowable提供的背压策略

有哪些策略还是上源码看下吧:

public enum BackpressureStrategy {
/**

  • OnNext events are written without any buffering or dropping.
  • Downstream has to deal with any overflow.
  • Useful when one applies one of the custom-parameter onBackpressureXXX operators.

/
MISSING,
/
*

  • Signals a MissingBackpressureException in case the downstream can’t keep up.
    /
    ERROR,
    /
    *
  • Buffers all onNext values until the downstream consumes it.
    /
    BUFFER,
    /
    *
  • Drops the most recent onNext value if the downstream can’t keep up.
    /
    DROP,
    /
    *
  • Keeps only the latest onNext value, overwriting any previous value if the
  • downstream can’t keep up.
    */
    LATEST
    }

MISSING:
如果流的速度无法保持同步,可能会抛出MissingBackpressureException或IllegalStateException。

BUFFER
上游不断的发出onNext请求,直到下游处理完,也就是和Observable一样了,缓存池无限大,最后直到程序崩溃

ERROR
会在下游跟不上速度时抛出MissingBackpressureException。

DROP
会在下游跟不上速度时把onNext的值丢弃。

LATEST
会一直保留最新的onNext的值,直到被下游消费掉。


先不看上面的策略,我们最起码先看看Flowable怎么用吧

Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter e) throws Exception {
Log.d(TAG, “emit 1”);
emitter.onNext(1);
Log.d(TAG, “emit 2”);
emitter.onNext(2);
Log.d(TAG, “emit 3”);
emitter.onNext(3);
Log.d(TAG, “emit complete”);
emitter.onComplete();

}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {

}

@Override
public void onNext(Integer s) {
Log.d(TAG, "onNext: " + integer);
}

@Override
public void one rror(Throwable t) {
Log.d(TAG, “onError”+t);

}

@Override
public void onComplete() {
Log.d(TAG, “onComplete”);

}
});

上游 Flowable 构建FlowableEmitter用来发送上游事件,这里的背压策略我们采用ERROR,下游方法中出现了一个和原来

@Override
public void onSubscribe(Subscription s) {

}

Subscription.java

public interface Subscription {

public void request(long n);

public void cancel();

}

这里需要重点说明一下,在Flowable中背压采取拉取响应的方式来进行水流控制,也就是说Subscription是控制上下游水流的主要方式,一般的,我们需要调用Subscription.request()来传入一个下游目前能处理的事件的数量

那么,我们不传会怎么样?

备注:这里上下游是在不同的线程里进行的,如果在同一个线程里,它也会抛出一个MissingBackpressureException,让你去设置 调用request()方法

咦,我上游发送的事件,下游一个没收到啊

那么也就是说上游不能发射事件,是因为你没有调用request方法,因为你不调用request()上游不知道下游能处理事件的能力啊。

那么,也就是说我必须要调用request方法咯,那么我们就调用一下喽,官方说默认推荐使用Long.MAX_VALUE。

好吧,那么我们来试下吧,加上如下代码。

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE); //下游处理事件能力值
}

咦,还真正常了啊。

那么,我们设置个2试试?

s.request(2);

也就是说我们下游告诉上游我们能处理2个事件,这样上游就缓存池中取出了2个事件给发送给了下游。这点相比Rxjava1.0可以说是智能了很多,并不会一股脑的抛给下游而是又下游来主动拉取事件。

ERROR

Flowable既然可以跑了,那么咱们就来试试背压吧,我们还是采用BackpressureStrategy.ERROR这个策略,如果下游处理不过来就抛出异常。

Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
for (int i = 0;i< 128; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {
可以跑了,那么咱们就来试试背压吧,我们还是采用BackpressureStrategy.ERROR这个策略,如果下游处理不过来就抛出异常。

Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
for (int i = 0;i< 128; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {

标签:Observable,void,背压,flutterplugin,Override,rxjava,new,public
来源: https://blog.csdn.net/m0_66264673/article/details/122745367

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有