實務上有些資料會不斷變動,因此希望每隔一段時間就重新呼叫 API 讀取資料,為避免 asynchronous 資料尚未完成就呼叫 API,我們會希望回傳成功後經過一段時間再呼叫,這種常見的需求該如何使用 RxJS 實現呢 ?
Version
macOS Catalina 10.15.4
WebStorm 2020.1
Vue 2.6.11
RxJS 6.5.5
Browser
顯示所有書籍的 title
與 price
。
Data
[
{
"id": 1,
"title": "FP in JavaScript",
"price": 100,
"categoryId": 1,
"image": "fpjs.jpg"
},
{
"id": 2,
"title": "RxJS in Action",
"price": 200,
"categoryId": 2,
"image": "rxjs.jpg"
},
{
"id": 3,
"title": "Speaking JavaScript",
"price": 300,
"categoryId": 3,
"image": "spjs.jpg"
}
]
http://localhost:3000/books
回傳以上 Object Array。
mergeMap()
<template>
<div>
<ul>
<li v-for="(x, i) in books$" :key="i">
Title: {{ x.title }}, Price: {{ x.price }}
</li>
</ul>
</div>
</template>
<script>
import { ajax } from 'rxjs/ajax'
import { interval } from 'rxjs'
import { mergeMap, pluck } from 'rxjs/operators'
let fetchBooks$ = ajax(`http://localhost:3000/books`).pipe(
pluck('response')
)
let subscriptions = function() {
let books$ = interval(1000).pipe(
mergeMap(_ => fetchBooks$)
)
return { books$ }
}
export default {
name: 'app',
subscriptions,
}
</script>
第 3 行
<ul>
<li v-for="(x, i) in books$" :key="i">
Title: {{ x.title }}, Price: {{ x.price }}
</li>
</ul>
使用 <li>
與 v-for
顯示所有 books$
資料。
16 行
let fetchBooks$ = ajax(`http://localhost:3000/books`).pipe(
pluck('response')
)
fetchBooks$()
回傳為 Observable。
21 行
let books$ = interval(1000).pipe(
mergeMap(_ => fetchBooks$)
)
若需求為每 1
秒鐘呼叫一次 API 重新讀取資料,傳統會使用 setInterval()
搭配 callback 去讀取 API,但 RxJS 給了我們另外一種思維。
既然每一秒都要讀取 API,可視為 stream,使用 interval(1000)
產生 Observable,再由此 Observable 透過 map()
產生另外一個讀取 API 所回傳的 Observable,但這會產生 Higher Order Observable,因此直覺會使用 mergeMap()
。
mergeMap()
將 Higher Order Observable 攤平並保持原 Inner Observable 的 timing 順序與個數
由 marble diagram 可發現 mergeMap()
會忠實反應 Inner Observable 的 timing,就本例而言,若在發出第一次 API request 之後有資料新增,理論上第二次 API request 才會看到新增資料,但因為 asynchronous 特性,並不保證第一次 API request 率先回傳,很可能第二次 API request 先回傳,因此可能先看到資料新增,之後又看到資料不見。
此外,也會發現是每隔 1
秒就發出 API request,而非原需求 API request 完成之後的 1
秒才發出 API request。
concatMap()
let books$ = interval(1000).pipe(
concatMap(_ => fetchBooks$)
)
若改用 concatMap()
,可改善依照 Outer Observable 順序回傳 Inner Observable 需求,也就是不會先看到第二次 API request,再看到第一次 API request 情形。
此外,concatMap()
也會等第一次 API request 完成後,隔 1
秒才發出下一次 API request,算是符合原需求。
不過 concatMap()
並非完美,由於 interval(1000)
持續發出,concatMap()
可能永遠也無法消化 interval(1000)
所發出的 Outer Observable。
concatMap()
將 Higher Order Observable 攤平並保持原 Outer Observable 的順序與 Inner Observable 個數
50
雖然在 30
還沒結束就出現,concatMap()
會依照原 Outer Observable 的順序顯示 Inner Observable。
switchMap()
let books$ = interval(1000).pipe(
switchMap(_ => fetchBooks$)
)
若改用 switchMap()
乍看之下也不會錯,但 swtichMap()
是每 1
秒鐘持續發出 API request,若前一個 API request 尚未完成,將以新的 API request 結果取代。
但這並不是 API request 完成後的 1
秒才發出下一個 API request,因此不符合原需求。
switchMap()
將 Higher Order Observable 攤平,若有新的 Inner Observable 將捨棄尚未完成的 Inner Observale
50
在 30
還沒結束時就出現,swtichMap()
會優先顯示 50
並捨棄尚未完成的 30
。
exhaustMap()
let books$ = interval(1000).pipe(
exhaustMap(_ => fetchBooks$)
)
exhaustMap()
會等前一個 API request 完成後的下一秒才呼叫下一個 API request,因此符合需求,也不會有 concatMap()
永遠無法 Outer Observable 的窘境,exhaustMap()
會捨棄之。
exhaustMap()
將 Higher Order Observable 攤平,在原 Inner Observable 尚未完成前將捨棄新出現的 Inner Observable
50
在 30
還沒結束時就出現,exhaustMap()
會優先顯示 30
並捨棄尚未出現的 50
。
Conclusion
- 就需求而言,
concatMap()
已經很接近,但exhaustMap()
才能完全符合需求
Reference
RxJS, mergeMap()
RxJS, concatMap()
RxJS, switchMap()
RxJS, exhaustMap()