點燈坊

失くすものさえない今が強くなるチャンスよ

使用 exhaustMap() 每隔一段時間重新讀取 API

Sam Xiao's Avatar 2020-06-26

實務上有些資料會不斷變動,因此希望每隔一段時間就重新呼叫 API 讀取資料,為避免 asynchronous 資料尚未完成就呼叫 API,我們會希望回傳成功後經過一段時間再呼叫,這種常見的需求該如何使用 RxJS 實現呢 ?

Version

macOS Catalina 10.15.4
WebStorm 2020.1
Vue 2.6.11
RxJS 6.5.5

Browser

exhaustmap000

顯示所有書籍的 titleprice

Data

[
  {
    "id": 1,
    "title": "FP in JavaScript",
    "price": 100,
    "categoryId": 1,
    "image": "fpjs.jpg"
  },
  {
    "id": 2,
    "title": "RxJS in Action",
    "price": 200,
    "categoryId": 2,
    "image": "rxjs.jpg"
  },
  {
    "id": 3,
    "title": "Speaking JavaScript",
    "price": 300,
    "categoryId": 3,
    "image": "spjs.jpg"
  }
]

http://localhost:3000/books 回傳以上 Object Array。

mergeMap()

<template>
  <div>
    <ul>
      <li v-for="(x, i) in books$" :key="i">
        Title: {{ x.title }}, Price: {{ x.price }}
      </li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { interval } from 'rxjs'
import { mergeMap, pluck } from 'rxjs/operators'

let fetchBooks$ = ajax(`http://localhost:3000/books`).pipe(
  pluck('response')
)

let subscriptions = function() {
  let books$ = interval(1000).pipe(
    mergeMap(_ => fetchBooks$)
  )

  return { books$ }
}

export default {
  name: 'app',
  subscriptions,
}
</script>

第 3 行

<ul>
  <li v-for="(x, i) in books$" :key="i">
    Title: {{ x.title }}, Price: {{ x.price }}
  </li>
</ul>

使用 <li>v-for 顯示所有 books$ 資料。

16 行

let fetchBooks$ = ajax(`http://localhost:3000/books`).pipe(
  pluck('response')
)

fetchBooks$() 回傳為 Observable。

21 行

let books$ = interval(1000).pipe(
  mergeMap(_ => fetchBooks$)
)

若需求為每 1 秒鐘呼叫一次 API 重新讀取資料,傳統會使用 setInterval() 搭配 callback 去讀取 API,但 RxJS 給了我們另外一種思維。

既然每一秒都要讀取 API,可視為 stream,使用 interval(1000) 產生 Observable,再由此 Observable 透過 map() 產生另外一個讀取 API 所回傳的 Observable,但這會產生 Higher Order Observable,因此直覺會使用 mergeMap()

mergeMap()
將 Higher Order Observable 攤平並保持原 Inner Observable 的 timing 順序與個數

exhaustMap003

由 marble diagram 可發現 mergeMap() 會忠實反應 Inner Observable 的 timing,就本例而言,若在發出第一次 API request 之後有資料新增,理論上第二次 API request 才會看到新增資料,但因為 asynchronous 特性,並不保證第一次 API request 率先回傳,很可能第二次 API request 先回傳,因此可能先看到資料新增,之後又看到資料不見。

此外,也會發現是每隔 1 秒就發出 API request,而非原需求 API request 完成之後的 1 秒才發出 API request。

concatMap()

let books$ = interval(1000).pipe(
  concatMap(_ => fetchBooks$)
)

若改用 concatMap(),可改善依照 Outer Observable 順序回傳 Inner Observable 需求,也就是不會先看到第二次 API request,再看到第一次 API request 情形。

此外,concatMap() 也會等第一次 API request 完成後,隔 1 秒才發出下一次 API request,算是符合原需求。

不過 concatMap() 並非完美,由於 interval(1000) 持續發出,concatMap() 可能永遠也無法消化 interval(1000) 所發出的 Outer Observable。

concatMap()
將 Higher Order Observable 攤平並保持原 Outer Observable 的順序與 Inner Observable 個數

exhaustmap004

50 雖然在 30 還沒結束就出現,concatMap() 會依照原 Outer Observable 的順序顯示 Inner Observable。

switchMap()

let books$ = interval(1000).pipe(
  switchMap(_ => fetchBooks$)
)

若改用 switchMap() 乍看之下也不會錯,但 swtichMap() 是每 1 秒鐘持續發出 API request,若前一個 API request 尚未完成,將以新的 API request 結果取代。

但這並不是 API request 完成後的 1 秒才發出下一個 API request,因此不符合原需求。

switchMap()
將 Higher Order Observable 攤平,若有新的 Inner Observable 將捨棄尚未完成的 Inner Observale

exhaustmap001

5030 還沒結束時就出現,swtichMap() 會優先顯示 50 並捨棄尚未完成的 30

exhaustMap()

let books$ = interval(1000).pipe(
  exhaustMap(_ => fetchBooks$)
)

exhaustMap() 會等前一個 API request 完成後的下一秒才呼叫下一個 API request,因此符合需求,也不會有 concatMap() 永遠無法 Outer Observable 的窘境,exhaustMap() 會捨棄之。

exhaustMap()
將 Higher Order Observable 攤平,在原 Inner Observable 尚未完成前將捨棄新出現的 Inner Observable

exhaustMap002

5030 還沒結束時就出現,exhaustMap() 會優先顯示 30 並捨棄尚未出現的 50

Conclusion

  • 就需求而言,concatMap() 已經很接近,但 exhaustMap() 才能完全符合需求

Reference

RxJS, mergeMap()
RxJS, concatMap()
RxJS, switchMap()
RxJS, exhaustMap()