點燈坊

失くすものさえない今が強くなるチャンスよ

使用 exhaustMap() 捨棄完成前的 Inner Observable

Sam Xiao's Avatar 2020-06-26

Front-end 兩大 Asynchronous 就是 DOM Event 與 API Request,實務上常將這兩個 Asynchronous 視為 Observable 一起處理,這導致了 Higher Order Observable 出現,RxJS 提供了 mergeMap()concatMap()switchMap()exhaustMap() 處理,本文探討 exhaustMap()

Outline

[TOC]

Version

macOS Catalina 10.15.5
WebStorm 2020.1.2
Vue 2.6.11
RxJS 6.5.5

map()

let { interval, of } = Rx
let { take, map, delay, mergeMap } = RxOperators

let fetchOuter$ = t => interval(t).pipe(
  map(x => x + 1), 
  take(3)
)

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchInner$ = n => of(n).pipe(
  delay(delayLut[n])
)

fetchOuter$(1000).pipe(
  map(fetchInner$)
)

第 4 行

let fetchOuter$ = t => interval(t).pipe(
  map(x => x + 1), 
  take(3)
)

fetchOuter$() 產生 Outer Observable,可傳入 t 決定 interval() 時間,並只取前 3 筆。

exhaustmap000

第 9 行

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchInner$ = n => of(n).pipe(
  delay(delayLut[n])
)

fetchInner$() 產生 Inner Observable,其值由 n 傳入,並由 of() 包成 Observable 回傳。

較特殊的是尚有 delayLut Object 決定其 delay 時間,主要是要模擬 asynchronous 回傳時間並不固定,先呼叫不一定保證先回傳結果,如 1 回傳需 3 秒,但 2 回傳僅需 1 秒,也就是會發現 2雖然較晚呼叫,但卻是先得到結果。

exhaustmap001

可發現 2 時間最短只需 1 秒,其次為 32 秒,1 則需要 3 秒最晚回傳。

19 行

fetchOuter$(1000).pipe(
  map(fetchInner$)
)

fetchOuter$() 產生間隔為 1 秒的 Outer Observable,且每個值又呼叫 fetchInner$() 產生 Inner Observable。

原本應該為 map(x => fetchInner$(x)),可省略 x 使其 point-free

exhaustmap002

黑色為 Outer Observable,每個值都產生 Inner Observable,因此出現了 Observable of Observable,也就是 Higher Order Observable。

concatMap()

let { interval, of } = Rx
let { take, map, delay, concatMap } = RxOperators

let fetchOuter$ = t => interval(t).pipe(
  map(x => x + 1), 
  take(3)
)

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchInner$ = n => of(n).pipe(
  delay(delayLut[n])
)

fetchOuter$(1000).pipe(
  concatMap(fetchInner$)
)

19 行

fetchOuter$(1000).pipe(
  concatMap(fetchInner$)
)

若希望最後結果與 Outer Observable 順序相同,也就是 123,這就要改用 concatMap()

exhaustmap006

exhaustMap()

let { interval, of } = Rx
let { take, map, delay, exhaustMap } = RxOperators

let fetchOuter$ = t => interval(t).pipe(
  map(x => x + 1), 
  take(3)
)

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchInner$ = n => of(n).pipe(
  delay(delayLut[n])
)

fetchOuter$(1000).pipe(
  exhaustMap(fetchInner$)
)

19 行

fetchOuter$(1000).pipe(
  exhaustMap(fetchInner$)
)

若希望最後結果與 Outer Observable 順序相同,且每個 Inner Observale 執行完成前出現的 Inner Observable 都被捨棄,這就要改用 exhaustMap()

exhaustmap003

由於 23 皆在 1 出現之前就產生,因此被捨棄只剩下 1

exhaustMap()
在每個 Inner Observable 執行完成前出現的 Inner Observable 將被捨棄

exhaustmap010

50303030 未完成就出現, 因此 50 被捨棄。

map() + exhaust()

let { interval, of } = Rx
let { take, map, delay, exhaust } = RxOperators

let fetchOuter$ = t => interval(t).pipe(
  map(x => x + 1), 
  take(3)
)

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchInner$ = n => of(n).pipe(
  delay(delayLut[n])
)

fetchOuter$(1000).pipe(
  map(fetchInner$),
  exhaust()
)

19 行

fetchOuter$(1000).pipe(
  map(fetchInner$),
  exhaust()
)

事實上 exhaustMap() 就是 map() + exhaust()

這就要談到 concatAll()exhaust() 的不同。

concatAll() 會依照 Outer Observable 的執行順序將 Inner Observable 攤平,但會重新調整 subscribe 時間,在 Outer Observable 完成後才會 subscribe Inner Observable,所以 concatMap() 只能保留 Outer Observable 順序,也維持 Inner Observable 個數,卻喪失了 Inner Observable 原本 timing。

exhaust() 也會以 Outer Observable 的執行順序為基準,且在一個 Inner Observable 執行完成前捨棄其他 Inner Observable,因此 exhaustMap() 會保留 Inner Observable 原本 timing,但卻不保證 Inner Observable 個數,剛好與 concatMap() 相反。

exhaustmap004

特別將 concatMap()exhaustMap()map() + exhaust()map() 的 timing 都放在一起比較:

  • cocatMap() 的順序與 Outer Observable 相同,但 timing 與 map() 不同
  • exhaustMap() 只取 Inner Observable 的第一筆 1,因為 23 都在 1 之前,因此會被捨棄
  • map() + exhaust()exhaustMap() 完全等效

Example

exhaustmap005

Higher Order Observable 最常見應用是 DOM Event 與 API Request 結合,由於 DOM Event 為 asynchronous,而 API request 亦為 asynchronous,兩者都可視為 Observable,但由於當時網路速度與 server 忙碌程度,並不保證先呼叫的 API request 就會先回傳。

當 DOM Evnet 與 API Request 一起運用時,就會出現 Higher Order Observable。

concatMap()

<template>
  <div>
    <button v-stream:click="{ subject: click$, data: 1 }">1</button>
    <h1>{{ title$ }}</h1>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { pluck, concatMap, delay } from 'rxjs/operators'

let fetchBook$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'title'),
  delay(2000)
)

let subscriptions = function() {
  let title$ = this.click$.pipe(
    pluck('data'),
    concatMap(fetchBook$)
  )

  return { title$ }
}

export default {
  name: 'app',
  domStreams: ['click$'],
  subscriptions,
}
</script>

第 3 行

<button v-stream:click="{ subject: click$, data: 1 }">1</button>

只有一個 button,快速按下 3 次。

12 行

let fetchBook$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'title'),
  delay(2000)
)

fetchBook$() 呼叫 API 回傳 title,特別加上delay 2 秒讓效果更明顯。

fetchBook$() 相當於產生 Inner Observable。

18 行

let title$ = this.click$.pipe(
  pluck('data'),
  concatMap(fetchBook$)
)

由於使用 concatMap(),因此按了 3 次會呼叫 3 次 API,只是三次 API 結果都相同,因此顯示都是 FP in JavaScript

exhaustmap008

concatMap() 會引起 3 次 API request。

exhaustMap()

<template>
  <div>
    <button v-stream:click="{ subject: click$, data: 1 }">1</button>
    <button v-stream:click="{ subject: click$, data: 2 }">2</button>
    <button v-stream:click="{ subject: click$, data: 3 }">3</button>
    <h1>{{ title$ }}</h1>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { pluck, exhaustMap, delay } from 'rxjs/operators'

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchBook$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'title'),
  delay(delayLut[x])
)

let subscriptions = function() {
  let title$ = this.click$.pipe(
    pluck('data'),
    exhaustMap(fetchBook$)
  )

  return { title$ }
}

export default {
  name: 'app',
  domStreams: ['click$'],
  subscriptions,
}
</script>

由於按 3 次都是相同 API,且結果也相同,因此後面兩次 API request 可省略。

26 行

let title$ = this.click$.pipe(
  pluck('data'),
  exhaustMap(fetchBook$)
)

concatMap() 改成 exhaustMap(),且 API request 特別 delay 2 秒鐘,因此button 連按 3 次時,後面兩次 API request 尚未結束,因此其產生的 Inner Observable 被 exhaustMap() 捨棄,只有一次 API request。

exhaustmap009

exhaustMap() 只會引起 1 次 API request。

Conclusion

  • Higher Order Observable 為學習 RxJS 無法逃避課題,而初學者常常在 mergeMap()concatMap()exhaustMap()switchMap() 之間有選擇障礙
  • 由於 exhaustMap() 本質是 map() + exhaust(),因此第一個 Inner Observable 尚未結束時,已發生 Outer Observable 都會捨棄
  • 若該 API 無論呼叫幾次結果都一樣,則適合使用 exhaustMap(),可大幅減少不必要的 API request,如 login、register、refresh 之類應用

Reference

RxJS, map()
RxJS, exhaustMap()
Wojciech, RxJS: exhaustMap operator