由於 DOM Event 為 Observable,API Request 亦為 Observable,若 Table 亦必須由 N + 1 Query 呼叫其他 API 取得資料,這就造成了 Observable of Observable of Observable 的三層 Higher Order Observable,這該如何使用 RxJS 實現呢 ?
Version
macOS Catalina 10.15.5
WebStorm 2020.1.2
Vue 2.6.11
RxJS 6.5.5
Single Column
Browser
Page 1
、Page 2
、Page 3
可以直接跳到指定 page。
Data
[
{
"id": 1,
"page": 1,
"data": [
{
"id": 1,
"title": "FP in JavaScript",
"price": 100,
"categoryId": 1,
"publisherId": 1
},
{
"id": 2,
"title": "RxJS in Action",
"price": 200,
"categoryId": 2,
"publisherId": 1
},
{
"id": 3,
"title": "Speaking JavaScript",
"price": 300,
"categoryId": 3,
"publisherId": 2
}
]
},
{
"id": 2,
"page": 2,
"data": [
{
"id": 4,
"title": "Domain Modeling Made Functional",
"price": 400,
"categoryId": 4,
"publisherId": 3
},
{
"id": 5,
"title": "Impatient JavaScript",
"price": 500,
"categoryId": 3,
"publisherId": 4
},
{
"id": 6,
"title": "Exploring ReasonML",
"price": 600,
"categoryId": 5,
"publisherId": 4
}
]
},
{
"id": 3,
"page": 3,
"data": [
{
"id": 7,
"title": "Node.js in Action",
"price": 700,
"categoryId": 6,
"publisherId": 2
},
{
"id": 8,
"title": "JavaScript The Good Part",
"price": 800,
"categoryId": 3,
"publisherId": 2
},
{
"id": 9,
"title": "Get Programming with Haskell",
"price": 900,
"categoryId": 7,
"publisherId": 1
}
]
}
]
http://localhost:3000/books/:page
會回傳指定 page
的書籍,其中 category
僅回傳 categoryId
,必須再呼叫其他 API 取得 category
。
[
{
"id": 1,
"category": "FP"
},
{
"id": 2,
"category": "FRP"
},
{
"id": 3,
"category": "JS"
},
{
"id": 4,
"category": "DDD"
},
{
"id": 5,
"category": "ReasonML"
},
{
"id": 6,
"category": "Node"
},
{
"id": 7,
"category": "Haskell"
}
]
http://localhost:3000/categories/:categoryId
會回傳 category
。
switchMap() + mergeMap()
<template>
<div>
<button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
<button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
<button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
<ul>
<li v-for="(x, i) in books$" :key="i">
{{ x.title }} / {{ x.price }} / {{ x.category }}
</li>
</ul>
</div>
</template>
<script>
import { ajax } from 'rxjs/ajax'
import { forkJoin } from 'rxjs'
import { pluck, map, mergeMap, switchMap, delay } from 'rxjs/operators'
let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
pluck('response', 'data'),
delay(1000)
)
let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
pluck('response', 'category'),
map(category => ({...x, category })),
delay(5000)
)
let subscriptions = function() {
let books$ = this.flipPage$.pipe(
pluck('data'),
switchMap(fetchBooks$),
mergeMap(x => forkJoin(x.map(fetchCategory$)))
)
return { books$ }
}
export default {
name: 'app',
domStreams: [
'flipPage$'
],
subscriptions
}
</script>
第 3 行
<button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
<button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
<button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
v-stream:click
使用 data
帶入指定 page。
第 6 行
<ul>
<li v-for="(x, i) in books$" :key="i">
{{ x.title }} / {{ x.price }} / {{ x.category }}
</li>
</ul>
只使用單一 books$
顯示。
19 行
let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
pluck('response', 'data'),
delay(1000)
)
呼叫 http://localhost:3000/books/:page
,並加上 delay 1
秒讓效果更明顯。
24 行
let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
pluck('response', 'category'),
map(category => ({...x, category })),
delay(5000)
)
呼叫 http://localhost:3000/categories/:categoryId
,並加上 delay 5
秒鐘讓效果更為明顯。
特別以 map()
將 book
與 category
合併成新 object。
31 行
let books$ = this.flipPage$.pipe(
pluck('data'),
switchMap(fetchBooks$),
mergeMap(x => forkJoin(x.map(fetchCategory$)))
)
分別使用了 switchMap()
與 mergeMap()
。
switchMap()
是為了攤平 flipPage$
與 fetchBooks$
所產生的 Higher Order Observable,且只取最新的 Inner Observable。
mergeMap()
是為了攤平 fetchBooks$
與 fetchCategory$
所產生的 Higher Order Observable,由於沒有特殊需求,因此使用最原始的 mergeMap()
。
由 x.map(fetchCategory$)
回傳為 Observable Array,因此使用了 forkJoin()
取得內部值。
Point-free
<template>
<div>
<button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
<button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
<button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
<ul>
<li v-for="(x, i) in books$" :key="i">
{{ x.title }} / {{ x.price }} / {{ x.category }}
</li>
</ul>
</div>
</template>
<script>
import { ajax } from 'rxjs/ajax'
import { forkJoin, pipe } from 'rxjs'
import { pluck, map, mergeMap, switchMap, delay } from 'rxjs/operators'
import { map as fmap } from 'ramda'
let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
pluck('response', 'data'),
delay(1000)
)
let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
pluck('response', 'category'),
map(category => ({...x, category })),
delay(5000)
)
let subscriptions = function() {
let books$ = this.flipPage$.pipe(
pluck('data'),
switchMap(fetchBooks$),
mergeMap(pipe(fmap(fetchCategory$), forkJoin))
)
return { books$ }
}
export default {
name: 'app',
domStreams: [
'flipPage$'
],
subscriptions
}
</script>
32 行
let books$ = this.flipPage$.pipe(
pluck('data'),
switchMap(fetchBooks$),
mergeMap(pipe(fmap(fetchCategory$), forkJoin))
)
forkJoin()
較特殊之處是他不是 RxJS 的 operator,而隸屬於 rxjs
之下,因此使用上無法 pipe()
串其他 operator,而出現 forkJoin(x.map(fetchCategory$))
類似 f(g(x))
狀況。
此外,因為 x
為 Array,因此使用了原生的 map()
,這使的 mergeMap()
無法 point-free。
18 行
import { map as fmap } from 'ramda'
引用了 Ramda 的 map()
,因為與 RxJS 的 map()
同名,因此 alias 成 fmap()
。
35 行
mergeMap(pipe(fmap(fetchCategory$), forkJoin))
如此 fmap()
可使 mergeMap()
能 point-free, 也能使用 pipe()
將 fmap()
與 forkJoin()
串起來。
Asynchronous Update
<template>
<div>
<button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
<button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
<button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
<ul>
<li v-for="(x, i) in books" :key="i">
{{ x.title }} / {{ x.price }} / {{ x.category }}
</li>
</ul>
</div>
</template>
<script>
import { ajax } from 'rxjs/ajax'
import { pluck, map, mergeMap, switchMap, delay, tap } from 'rxjs/operators'
import { map as fmap, addIndex, forEach } from 'ramda'
let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
pluck('response', 'data'),
delay(1000)
)
let fetchCategory$ = (x, i) => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
pluck('response', 'category'),
map(category => ({...x, i, category })),
delay(5000)
)
let subscriptions = function() {
this.flipPage$.pipe(
pluck('data'),
switchMap(fetchBooks$),
tap(x => this.books = x),
mergeMap(addIndex(fmap)(fetchCategory$)),
tap(forEach(x => this.$set(this.books, x.i, x)))
).subscribe()
}
export default {
name: 'app',
domStreams: [
'flipPage$'
],
data: () => ({
books: []
}),
subscriptions
}
</script>
前例使用了 forkJoin()
,若 fetchCategory$()
特別慢,會明顯感覺 button 按下後,要等一段時間才會全部出現 title
、price
與 category
。
若想要 books 的 title
、price
先出現,而 category
再 asynchronous 慢慢出現,則要使用 asynchronous update 寫法。
45 行
data: () => ({
books: []
}),
因為 fetchCategory$()
要 asynchronous 更新,因此 books
從 Observable 改成 Data。
24 行
let fetchCategory$ = (x, i) => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
pluck('response', 'category'),
map(category => ({...x, i, category })),
delay(5000)
)
特別加上了 i
傳入 index
,稍後 this.$set()
更新 books
會使用。
31 行
this.flipPage$.pipe(
pluck('data'),
switchMap(fetchBooks$),
tap(x => this.books = x),
mergeMap(addIndex(fmap)(fetchCategory$)),
tap(forEach(x => this.$set(this.books, x.i, x)))
).subscribe()
因為不再回傳 Observable,因此 flipPage$()
要加上 subscribe()
才會啟動 stream。
33 行
switchMap(fetchBooks$),
tap(x => this.books = x),
switchMap(fetchBooks$)
主要在於獲得 title
與 price
先顯示,因此要使用 tap()
先寫入 books
。
35 行
mergeMap(addIndex(fmap)(fetchCategory$)),
tap(forEach(x => this.$set(this.books, x.i, x)))
Ramda 的 fmap()
的 callback 只有 x
,但目前 fetchCategory$
還需要有 index
,因此使用 addIndex(fmap)
產生有 (x, i)
的 callback。
最後使用 tap()
配合 Ramda 的 forEach()
與 Vue 的 this.$set()
寫入 books
。
Multiple Column
Browser
除了 category
外,顯示多了 publisher
。
看似只多了一個 field,但寫法卻不太一樣。
Data
[
{
"id": 1,
"publisher": "Manning"
},
{
"id": 2,
"publisher": "O'Reilly"
},
{
"id": 3,
"publisher": "Pragmatic"
},
{
"id": 4,
"publisher": "eBook"
},
{
"id": 5,
"publisher": "ReasonML"
},
{
"id": 6,
"publisher": "Node"
},
{
"id": 7,
"publisher": "Haskell"
}
]
http://localhost:3000/publishers/:publisherId
會回傳 publisher
。
zip()
<template>
<div>
<button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
<button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
<button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
<ul>
<li v-for="(x, i) in books$" :key="i">
{{ x.title }} / {{ x.price }} / {{ x.category }} / {{ x.publisher }}
</li>
</ul>
</div>
</template>
<script>
import { ajax } from 'rxjs/ajax'
import { zip, forkJoin } from 'rxjs'
import { map, pluck, switchMap, delay, mergeMap } from 'rxjs/operators'
let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
pluck('response', 'data'),
delay(1000)
)
let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
pluck('response', 'category'),
delay(5000)
)
let fetchPublisher$ = x => ajax(`http://localhost:3000/publishers/${x.publisherId}`).pipe(
pluck('response', 'publisher'),
delay(6000)
)
let fetchChild$ = x => {
let category$ = fetchCategory$(x).pipe(
map(category => ({ category }))
)
let publisher$ = fetchPublisher$(x).pipe(
map(publisher => ({ publisher }))
)
return zip(category$, publisher$).pipe(
map(([ { category }, { publisher }]) => ({...x, category, publisher }))
)
}
let subscriptions = function() {
let books$ = this.flipPage$.pipe(
pluck('data'),
switchMap(fetchBooks$),
mergeMap(x => forkJoin(x.map(fetchChild$)))
)
return { books$ }
}
export default {
name: 'app',
domStreams: [
'flipPage$'
],
subscriptions
}
</script>
29 行
let fetchPublisher$ = x => ajax(`http://localhost:3000/publishers/${x.publisherId}`).pipe(
pluck('response', 'publisher'),
delay(6000)
)
呼叫 http://localhost:3000/publishers/:publisherId
,並加上 delay 6
秒與 fetchCategory$()
有所差異。
49 行
let books$ = this.flipPage$.pipe(
pluck('data'),
switchMap(fetchBooks$),
mergeMap(x => forkJoin(x.map(fetchChild$)))
)
一樣使用 swtichMap()
與 mergeMap()
組合,差異只在 fetchChild$()
。
34 行
let fetchChild$ = x => {
let category$ = fetchCategory$(x).pipe(
map(category => ({ category }))
)
let publisher$ = fetchPublisher$(x).pipe(
map(publisher => ({ publisher }))
)
return zip(category$, publisher$).pipe(
map(([ { category }, { publisher }]) => ({...x, category, publisher }))
)
}
單獨取得 category$
與 publisher$
,最後使用 zip()
組合 category$
與 publisher$
。
Point-free
<template>
<div>
<button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
<button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
<button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
<ul>
<li v-for="(x, i) in books$" :key="i">
{{ x.title }} / {{ x.price }} / {{ x.category }} / {{ x.publisher }}
</li>
</ul>
</div>
</template>
<script>
import { ajax } from 'rxjs/ajax'
import { zip, forkJoin, pipe } from 'rxjs'
import { map, pluck, switchMap, delay, mergeMap } from 'rxjs/operators'
import { map as fmap } from 'ramda'
let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
pluck('response', 'data'),
delay(1000)
)
let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
pluck('response', 'category'),
delay(5000)
)
let fetchPublisher$ = x => ajax(`http://localhost:3000/publishers/${x.publisherId}`).pipe(
pluck('response', 'publisher'),
delay(6000)
)
let fetchChild$ = x => {
let category$ = fetchCategory$(x).pipe(
map(category => ({ category }))
)
let publisher$ = fetchPublisher$(x).pipe(
map(publisher => ({ publisher }))
)
return zip(category$, publisher$).pipe(
map(([ { category }, { publisher }]) => ({...x, category, publisher }))
)
}
let subscriptions = function() {
let books$ = this.flipPage$.pipe(
pluck('data'),
switchMap(fetchBooks$),
mergeMap(pipe(fmap(fetchChild$), forkJoin))
)
return { books$ }
}
export default {
name: 'app',
domStreams: [
'flipPage$'
],
subscriptions
}
</script>
50 行
let books$ = this.flipPage$.pipe(
pluck('data'),
switchMap(fetchBooks$),
mergeMap(pipe(fmap(fetchChild$), forkJoin))
)
一樣使用 Ramda 的 fmap()
取代 x.map()
,最後使用 pipe()
組合 fmap()
與 forkJoin()
。
Asynchronous Update
<template>
<div>
<button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
<button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
<button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
<ul>
<li v-for="(x, i) in books" :key="i">
{{ x.title }} / {{ x.price }} / {{ x.category }} / {{ x.publisher }}
</li>
</ul>
</div>
</template>
<script>
import { ajax } from 'rxjs/ajax'
import { zip } from 'rxjs'
import { map, pluck, switchMap, tap, delay, mergeMap } from 'rxjs/operators'
import { map as fmap, addIndex, forEach } from 'ramda'
let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
pluck('response', 'data'),
delay(1000)
)
let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
pluck('response', 'category'),
delay(5000)
)
let fetchPublisher$ = x => ajax(`http://localhost:3000/publishers/${x.publisherId}`).pipe(
pluck('response', 'publisher'),
delay(6000)
)
let fetchChild$ = (x, i) => {
let category$ = fetchCategory$(x).pipe(
map(category => ({ category }))
)
let publisher$ = fetchPublisher$(x).pipe(
map(publisher => ({ publisher }))
)
return zip(category$, publisher$).pipe(
map(([ { category }, { publisher }]) => ({...x, i, category, publisher }))
)
}
let subscriptions = function() {
this.flipPage$.pipe(
pluck('data'),
switchMap(fetchBooks$),
tap(x => this.books = x),
mergeMap(addIndex(fmap)(fetchChild$)),
tap(forEach(x => this.$set(this.books, x.i, x)))
).subscribe()
}
export default {
name: 'app',
domStreams: [
'flipPage$'
],
data: () => ({
books: []
}),
subscriptions
}
</script>
50 行
this.flipPage$.pipe(
pluck('data'),
switchMap(fetchBooks$),
tap(x => this.books = x),
mergeMap(addIndex(fmap)(fetchChild$)),
tap(forEach(x => this.$set(this.books, x.i, x)))
).subscribe()
一樣使用 addIndex(fmap)
產生有 index
的 callback,最後使用 Ramda 的 forEach()
與 Vue 的 this.$set()
寫入 books
。
Conclusion
- 本例融合了
switchMap()
與mergeMap()
,其中switchMap()
是為了攤平 DOM Event 與 API request 造成的 Higher Order Observable;而mergeMap()
是為了攤平 N+1 Query 所造成的 Higher Order Observable - 因為實務上 user 可以快速按下
Next
button 或指定頁數換頁,因此使用switchMap()
只取最新 Inner Observable - N + 1 Query 為傳統 Higher Order Observable,因此使用最基礎的
mergeMap()
即可
Sample Code
完整範例可在我的 GitHub 上找到
Reference
RxJS, switchMap()
RxJS, mergeMap()
RxJS, forkJon()
RxJS, zip()
Ramda, map()
Ramda, addIndex()