點燈坊

失くすものさえない今が強くなるチャンスよ

使用 switchMap() 保留最新 Inner Observable

Sam Xiao's Avatar 2020-06-26

Front-end 兩大 Asynchronous 就是 DOM Event 與 API Request,實務上常將這兩個 Asynchronous 視為 Observable 一起處理,這導致了 Higher Order Observable 出現,RxJS 提供了 mergeMap()concatMap()switchMap()exhaustMap() 處理,本文探討 switchMap()

Version

macOS Catalina 10.15.5
WebStorm 2020.1.2
Vue 2.6.11
RxJS 6.5.5

map()

let { interval, of } = Rx
let { take, map, delay, mergeMap } = RxOperators

let fetchOuter$ = t => interval(t).pipe(
  map(x => x + 1), 
  take(3)
)

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchInner$ = n => of(n).pipe(
  delay(delayLut[n])
)

fetchOuter$(1000).pipe(
  map(fetchInner$)
)

第 4 行

let fetchOuter$ = t => interval(t).pipe(
  map(x => x + 1), 
  take(3)
)

fetchOuter$() 產生 Outer Observable,可傳入 t 決定 interval() 時間,並只取前 3 筆。

switchmap000

第 9 行

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchInner$ = n => of(n).pipe(
  delay(delayLut[n])
)

fetchInner$() 產生 Inner Observable,其值由 n 傳入,並由 of() 包成 Observable 回傳。

較特殊的是尚有 delayLut Object 決定其 delay 時間,主要是要模擬 asynchronous 回傳時間並不固定,先呼叫不一定保證先回傳結果,如 1 回傳需 3 秒,但 2 回傳僅需 1 秒,也就是會發現 2 雖然較晚呼叫,但卻是先得到結果。

switchmap001

可發現 2 時間最短只需 1 秒,其次為 32 秒,1 則需要 3 秒最晚回傳。

19 行

fetchOuter$(1000).pipe(
  map(fetchInner$)
)

fetchOuter$() 產生間隔為 1 秒的 Outer Observable,且每個值又呼叫 fetchInner$() 產生 Inner Observable。

原本應該為 map(x => fetchInner$(x)),可省略 x 使其 point-free

switchmap002

黑色為 Outer Observable,每個值都產生 Inner Observable,因此出現了 Observable of Observable,也就是 Higher Order Observable。

switchMap()

let { interval, of } = Rx
let { take, map, delay, switchMap } = RxOperators

let fetchOuter$ = t => interval(t).pipe(
  map(x => x + 1), 
  take(3)
)

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchInner$ = n => of(n).pipe(
  delay(delayLut[n])
)

fetchOuter$(1000).pipe(
  switchMap(fetchInner$)
)

19 行

fetchOuter$(1000).pipe(
  switchMap(fetchInner$)
)

若想在眾多 Higher Order Observable 中只取最新一筆,之前都捨棄,可改用 switchMap() 將 Higher Order Observable 攤平。

switchMap()
將 Higher Order Observable 攤平,若有新的 Inner Observable 將捨棄尚未完成的 Inner Observale

switchmap006

50 在 30 還沒結束時就出現,swtichMap() 會優先顯示 50 並捨棄尚未完成的 30。

switchmap003

map() + switchAll()

let { interval, of, from } = Rx
let { take, map, delay, switchAll } = RxOperators

let fetchOuter$ = t => interval(t).pipe(
  map(x => x + 1), 
  take(3)
)

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchInner$ = n => of(n).pipe(
  delay(delayLut[n])
)

fetchOuter$(1000).pipe(
  map(fetchInner$),
  switchAll()
)

19 行

fetchOuter$(1000).pipe(
  map(fetchInner$),
  switchAll()
)

事實上 switchMap() 就是 map() + switchAll()

這就要談到 mergeAll()switchAll() 的不同。

mergeAll() 會直接將 Inner Observable 依照原本 Inner Observable 的 timing 攤平。

switchAll() 只會取 Inner Observable 最新的一筆,其餘全部捨棄。

switchAll()
將 Inner Observable 攤平並只取同時間最新的 Inner Observable,因此個數可能捨棄

switchmap007

將 Inner Observable 攤平,ce 將取最新的 edf 將取最新的 f

switchmap004

特別將 switchMap()mergeMap()map() + switchAll()map() 的 timing 都放在一起比較:

  • mergeMap() 的 timing 與個數與 map() 完全相同,只是將 map() 攤平而已

  • switchMap() 則取 Inner Observable 最新一筆,捨棄前兩筆

  • map() + switchAll()switchMap() 完全等效

Example

switchmap005

Higher Order Observable 最常見應用是 DOM Event 與 API Request 結合,由於 DOM Event 為 asynchronous,而 API request 亦為 asynchronous,兩者都可視為 Observable,但由於當時網路速度與 server 忙碌程度,並不保證先呼叫的 API request 就會先回傳。

當 DOM Evnet 與 API Request 一起運用時,就會出現 Higher Order Observable。

mergeMap()

<template>
  <div>
    <button v-stream:click="{ subject: click$, data: 1 }">1</button>
    <button v-stream:click="{ subject: click$, data: 2 }">2</button>
    <button v-stream:click="{ subject: click$, data: 3 }">3</button>
    <h1>{{ title$ }}</h1>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { pluck, mergeMap, delay } from 'rxjs/operators'

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchBook$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'title'),
  delay(delayLut[x])
)

let subscriptions = function() {
  let title$ = this.click$.pipe(
    pluck('data'),
    mergeMap(fetchBook$)
  )

  return { title$ }
}

export default {
  name: 'app',
  domStreams: ['click$'],
  subscriptions,
}
</script>

第 3 行

<button v-stream:click="{ subject: click$, data: 1 }">1</button>
<button v-stream:click="{ subject: click$, data: 2 }">2</button>
<button v-stream:click="{ subject: click$, data: 3 }">3</button>

3 個 button 都使用 click$ subject,唯 data 不一樣,這模擬了 table 分頁的 下一頁,依次按下 123,就類似 下一頁 快速按了 3 次,我們預期最終只顯示 第三頁,而 第一頁第二頁 全都捨棄不顯示。

14 行

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchBook$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'title'),
  delay(delayLut[x])
)

fetchBook$() 呼叫 API 回傳 title,特別加上 delayLutdelay() 模擬 API request 不見的先呼叫就先回傳。

fetchBook$() 相當於產生 Inner Observable。

26 行

let title$ = this.click$.pipe(
  pluck('data'),
  mergeMap(fetchBook$)
)

因為 click$ 為 Outer Observable,fetchBook$ 回傳 Inner Observable,這勢必產生 Higher Order Observable,直覺會使用 mergeMap() 攤平 Inner Observable。

但實際執行會發現依次顯示:

  • RxJS in Action
  • FP in JavaScript
  • Speaking JavaScript

而非只顯示 Speaking JavaScript

switchmap008

mergeMap() 會引起 3 次 API request。

switchMap()

<template>
  <div>
    <button v-stream:click="{ subject: click$, data: 1 }">1</button>
    <button v-stream:click="{ subject: click$, data: 2 }">2</button>
    <button v-stream:click="{ subject: click$, data: 3 }">3</button>
    <h1>{{ title$ }}</h1>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { pluck, switchMap, delay } from 'rxjs/operators'

let delayLut = {
  1: 3000,
  2: 1000,
  3: 2000
}

let fetchBook$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'title'),
  delay(delayLut[x])
)

let subscriptions = function() {
  let title$ = this.click$.pipe(
    pluck('data'),
    switchMap(fetchBook$)
  )

  return { title$ }
}

export default {
  name: 'app',
  domStreams: ['click$'],
  subscriptions,
}
</script>

26 行

let title$ = this.click$.pipe(
  pluck('data'),
  switchMap(fetchBook$)
)

mergeMap() 改用 switchMap() 後,將只取最新的 Speaking JavaScript 一筆,前兩筆 RxJS in ActionFP in JavaScript 將被捨棄,這正是我們所要的。

switchmap009

雖然 switchMap() 只顯示最新一筆,但仍然會引起 3 次 API request,只是經過 switchMap() 處理後才捨棄前兩筆。

Conclusion

  • Higher Order Observable 為學習 RxJS 無法逃避課題,而初學者常常在 mergeMap()concatMap()exhaustMap()switchMap() 之間有選擇障礙
  • 由於 switchMap() 本質是 map() + switchAll(),因此只取 Inner Observable 最新的一筆
  • 若要確保 DOM Event 只取最新的 API request,可改用 switchMap()
  • 雖然 switchMap() 會保留最新 Inner Observable,但仍會發出 API request,只是 switchMap() 捨棄舊有 Inner Observable 而已

Reference

RxJS, map()
RxJS, switchAll()
RxJS, switchMap()