错误处理

有两种主要的方法来处理流中的错误。你可以重试流并保证流最终会正常运行,或者处理错误并进行转换。

重试 - 现在怎么样?

当你认为错误是由于某些原因是暂时导致的,那么这种方法是适用的。通常不稳定的网络是个很好的例子。当网络不稳定时端点可能会在你多次尝试后才能回应。要点是你的首次尝试可能失败,但重试x次并且在两次尝试之间有一定的时间间隔,最终端点会回应。

retry

retry() 操作符可以让我们重试整个流,只接收一个参数,参数的值是要重试的次数,函数签名如下:

  1. retry([times])

重要的是要注意当错误回调被调用的话, retry() 操作符会有延迟。下面代码中的错误回调会立即被调用:

  1. let stream$ = Rx.Observable.of(1,2,3)
  2. .map(value => {
  3. if(value > 2) { throw 'error' }
  4. });
  5. stream$.subscribe(
  6. data => console.log(data),
  7. err => console.log(err)
  8. )

这个流很快的就死了,错误回调被调用,这个时候 retry() 操作符登场。像下面这样把它附加上即可:

  1. let stream$ = Rx.Observable.of(1,2,3)
  2. .map(value => {
  3. if(value > 2) { throw 'error' }
  4. })
  5. .retry(5)

这将运行值序列5次,最后放弃并进入错误回调。然而在这个案例中,由于编写代码的方式,它只会生成5次1,2。所以我们的代码并没有真正利用操作符的最大潜力。你可能想要的是能够在每次尝试之间改变一些东西。想象下你的 observable 看起来像这样:

  1. let urlsToHit$ = Rx.Observable.of(url, url2, url3);

在这一点上,它清楚地表明,在你的第一次尝试中,端点可能回应的不好,或者根本就没有回应,所以重试x次是很有用的。

然而在调用 ajax 的情况下,并想象一下我们的业务场景中网络不稳定,那么立即重试是没有意义的,所以我们需要再找到一个更好的操作符,那就是 retryWhen()

retryWhen

retryWhen() 操作符让我们有机会对流进行操作并恰当地处理。

  1. retryWhen( stream => {
  2. // 希望能在更好的条件下返回
  3. })

现在我们来写段简单的代码:

  1. let values$ = Rx.Observable
  2. .of( 1,2,3,4 )
  3. .map(val => {
  4. if(val === 2) { throw 'err'; }
  5. else return val;
  6. })
  7. .retryWhen( stream => {
  8. return stream;
  9. } );
  10. values$.subscribe(
  11. data => console.log('Retry when - data',data),
  12. err => console.error('Retry when - Err',err)
  13. )

这样写的话会一直返回 1,直到我们用完内存为止,由于缺少结束条件,算法总是会在值2上崩溃,并将永远重试流。我们需要做的就是以某种方式告知错误已经修复。如果流尝试点击网址而不是发出数字,响应端点将会被压垮,所以在这种情况下,我们必须写这样的东西:

  1. let values$ = Rx.Observable.interval(1000).take(5);
  2. let errorFixed = false;
  3. values$
  4. .map((val) => {
  5. if(errorFixed) { return val; }
  6. else if( val > 0 && val % 2 === 0) {
  7. errorFixed = true;
  8. throw { error : 'error' };
  9. } else {
  10. return val;
  11. }
  12. })
  13. .retryWhen((err) => {
  14. console.log('retrying the entire sequence');
  15. return err.delay(200);
  16. })
  17. .subscribe((val) => { console.log('value',val) })
  18. // 0 1 '等待200毫秒' retrying the whole sequence 0 1 2 3 4

然而,这与我们用 retry() 运算符所做的很多类似,上面的代码只会重试一次。使用 retryWhen() 真正的好处是可以改变操作符中返回的流,也就是这里调用的 delay() 操作符,像这样:

  1. .retryWhen((err) => {
  2. console.log('retrying the entire sequence');
  3. return err.delay(200)
  4. })

这会确保在流重试前有200毫秒的延迟,如果是在 ajax 场景下,可以确保端点有足够的时间重整旗鼓,然后开始响应。

陷阱

retryWhen() 中使用 delay() 操作符来确保重试晚一点发生,在这个案例中可以给网络一个恢复的机会。

retryWhen 和 delay 一起使用没有次数限制

到目前为止,当我们想要重试整个流x次时使用的是 retry() 操作符,当我们想要在重试之间有一些延迟时间时使用的是 retryWhen() 操作符,但是如果我们两者都想要,可以做到吗?可以的。我们需要考虑一下要以某种方式记住到目前为止我们的尝试次数。引入一个外部变量用来保持这个数量是非常诱人的,但那不是函数式做事的方式,记住副作用是被禁止的。那么我们该如何解决呢?有一个叫做 scan() 的操作符,它允许我们累积每次迭代的值。所以如果在 retryWhen() 中使用 scan 的话,我们就可以追踪尝试的次数:

  1. let ATTEMPT_COUNT = 3;
  2. let DELAY = 1000;
  3. let delayWithTimes$ = Rx.Observable.of(1,2,3)
  4. .map( val => {
  5. if(val === 2) throw 'err'
  6. else return val;
  7. })
  8. .retryWhen(e => e.scan((errorCount, err) => {
  9. if (errorCount >= ATTEMPT_COUNT) {
  10. throw err;
  11. }
  12. return errorCount + 1;
  13. }, 0).delay(DELAY));
  14. delayWithTimes$.subscribe(
  15. val => console.log('delay and times - val',val),
  16. err => console.error('delay and times - err',err)
  17. )

转换 - 这个没什么好看的

这个方法是当出现错误时你选择将错误重制成一个有效的 Observable 。

所以我们可以通过创建一个 Observable 来体现这一点,这个 Observable 的使命就是报错

  1. let error$ = Rx.Observable.throw('crash');
  2. error$.subscribe(
  3. data => console.log( data ),
  4. err => console.log( err ),
  5. () => console.log('complete')
  6. )

这段代码只会执行错误回调而不会执行完成回调。

修补它

我们可以通过引入 catch() 操作符来进行修补。它是这样使用的:

  1. let errorPatched$ = error$.catch(err => { return Rx.Observable.of('Patched' + err) });
  2. errorPatched$.subscribe((data) => console.log(data) );

如你所见,使用 .catch() 进行修补并返回一个新的 Observable 修复 流。问题是这是否是你想要的。流确实存活下来最终完成了,它可以发出崩溃之后发生的任何值。

如果这不是你想要的,那么上面的重试方法可能会更适合你,决定权在你手中。

多个流呢?

你没想到会这么容易吧?当你编写 RxJS 代码时,通常会处理多个流,如果你知道在哪放置 catch() 操作符的话,那么使用 catch() 的方法是很棒的。

  1. let badStream$ = Rx.Observable.throw('crash');
  2. let goodStream$ = Rx.Observable.of(1,2,3,);
  3. let merged$ = Rx.Observable.merge(
  4. badStream$,
  5. goodStream$
  6. );
  7. merged$.subscribe(
  8. data => console.log(data),
  9. err => console.error(err),
  10. () => console.log('merge completed')
  11. )

猜猜发生了什么?1)错误和值都发出了,流也完成了 2)错误和值都发出了 3)只发出了错误

遗憾的是发生的是 3)。这意味着我们几乎没有处理错误。

修复 - 所以我们需要修复错误。我们使用 catch() 操作符进行修复。问题在哪呢?

来试试这个?

  1. let mergedPatched$ = Rx.Observable.merge(
  2. badStream$,
  3. goodStream$
  4. ).catch(err => Rx.Observable.of(err));
  5. mergedPatched$.subscribe(
  6. data => console.log(data),
  7. err => console.error(err),
  8. () => console.log('patchedMerged completed')
  9. )

在这个案例中,得到结果的是 crashpatchedMerged completed 。所以流是完成了的,但我们还是没有得到 goodStream$ 的值。所以这是一个更好的解决方法,但还不够好。

更好的修复 - 在 merge() 后面添加 catch() 操作符可以确保流完成,但是还不够好。我们来尝试下在 merge 之前进行 catch 操作。

  1. let preMergedPatched$ = Rx.Observable.merge(
  2. badStream$.catch(err => Rx.Observable.of(err)),
  3. goodStream$
  4. ).catch(err => Rx.Observable.of(err));
  5. preMergedPatched$.subscribe(
  6. data => console.log(data),
  7. err => console.error(err),
  8. () => console.log('pre patched merge completed')
  9. )

瞧,我们得到了值,我们捕获了错误并将错误作为一个新的 Observable 发出,并且流也完成了。

陷阱 catch() 所放的位置很重要。

适者生存

还有另外一种情况可能会很有意思。上面的场景假设你希望发出所有的,错误信息、值、所有的一切。

如果你关心的不是这些呢,你只关心流的值怎么办?我们来假设一下,有一个叫做 onErrorResumeNext() 的操作符

  1. let secondBadStream$ = Rx.Observable.throw('bam');
  2. let gloriaGaynorStream$ = Rx.Observable.of('I will survive');
  3. let emitSurviving = Rx.Observable.onErrorResumeNext(
  4. badStream$,
  5. secondBadStream$,
  6. gloriaGaynorStream$
  7. );
  8. emitSurviving.subscribe(
  9. data => console.log(data),
  10. err => console.error(err),
  11. () => console.log('Survival of the fittest, completed')
  12. )

输出的结果是 ‘I will survive’ 和 ‘Survival of the fittest, completed’ 。