點燈坊

失くすものさえない今が強くなるチャンスよ

使用 switchMap() 與 mergeMap() 實現分頁

Sam Xiao's Avatar 2020-06-29

由於 DOM Event 為 Observable,API Request 亦為 Observable,若 Table 亦必須由 N + 1 Query 呼叫其他 API 取得資料,這就造成了 Observable of Observable of Observable 的三層 Higher Order Observable,這該如何使用 RxJS 實現呢 ?

Version

macOS Catalina 10.15.5
WebStorm 2020.1.2
Vue 2.6.11
RxJS 6.5.5

Single Column

Browser

paging000

Page 1Page 2Page 3 可以直接跳到指定 page。

Data

[
  {
    "id": 1,
    "page": 1,
    "data": [
      {
        "id": 1,
        "title": "FP in JavaScript",
        "price": 100,
        "categoryId": 1,
        "publisherId": 1
      },
      {
        "id": 2,
        "title": "RxJS in Action",
        "price": 200,
        "categoryId": 2,
        "publisherId": 1
      },
      {
        "id": 3,
        "title": "Speaking JavaScript",
        "price": 300,
        "categoryId": 3,
        "publisherId": 2
      }
    ]
  },
  {
    "id": 2,
    "page": 2,
    "data": [
      {
        "id": 4,
        "title": "Domain Modeling Made Functional",
        "price": 400,
        "categoryId": 4,
        "publisherId": 3
      },
      {
        "id": 5,
        "title": "Impatient JavaScript",
        "price": 500,
        "categoryId": 3,
        "publisherId": 4
      },
      {
        "id": 6,
        "title": "Exploring ReasonML",
        "price": 600,
        "categoryId": 5,
        "publisherId": 4
      }
    ]
  },
  {
    "id": 3,
    "page": 3,
    "data": [
      {
        "id": 7,
        "title": "Node.js in Action",
        "price": 700,
        "categoryId": 6,
        "publisherId": 2
      },
      {
        "id": 8,
        "title": "JavaScript The Good Part",
        "price": 800,
        "categoryId": 3,
        "publisherId": 2
      },
      {
        "id": 9,
        "title": "Get Programming with Haskell",
        "price": 900,
        "categoryId": 7,
        "publisherId": 1
      }
    ]
  }
]

http://localhost:3000/books/:page 會回傳指定 page 的書籍,其中 category 僅回傳 categoryId,必須再呼叫其他 API 取得 category

[
  {
    "id": 1,
    "category": "FP"
  },
  {
    "id": 2,
    "category": "FRP"
  },
  {
    "id": 3,
    "category": "JS"
  },
  {
    "id": 4,
    "category": "DDD"
  },
  {
    "id": 5,
    "category": "ReasonML"
  },
  {
    "id": 6,
    "category": "Node"
  },
  {
    "id": 7,
    "category": "Haskell"
  }
]

http://localhost:3000/categories/:categoryId 會回傳 category

switchMap() + mergeMap()

<template>
  <div>
    <button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
    <button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
    <button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
    <ul>
      <li v-for="(x, i) in books$" :key="i">
        {{ x.title }} / {{ x.price }} / {{ x.category }}
      </li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { forkJoin } from 'rxjs'
import { pluck, map, mergeMap, switchMap, delay } from 'rxjs/operators'

let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'data'),
  delay(1000)
)

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'category'),
  map(category => ({...x, category })),
  delay(5000)
)

let subscriptions = function() {
  let books$ = this.flipPage$.pipe(
    pluck('data'),
    switchMap(fetchBooks$),
    mergeMap(x => forkJoin(x.map(fetchCategory$)))
  )

  return { books$ }
}

export default {
  name: 'app',
  domStreams: [
    'flipPage$'
  ],
  subscriptions
}
</script>

第 3 行

<button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
    <button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
    <button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>

v-stream:click 使用 data 帶入指定 page。

第 6 行

<ul>
  <li v-for="(x, i) in books$" :key="i">
    {{ x.title }} / {{ x.price }} / {{ x.category }}
  </li>
</ul>

只使用單一 books$ 顯示。

19 行

let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'data'),
  delay(1000)
)

呼叫 http://localhost:3000/books/:page,並加上 delay 1 秒讓效果更明顯。

24 行

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'category'),
  map(category => ({...x, category })),
  delay(5000)
)

呼叫 http://localhost:3000/categories/:categoryId,並加上 delay 5 秒鐘讓效果更為明顯。

特別以 map()bookcategory 合併成新 object。

31 行

let books$ = this.flipPage$.pipe(
  pluck('data'),
  switchMap(fetchBooks$),
  mergeMap(x => forkJoin(x.map(fetchCategory$)))
)

分別使用了 switchMap()mergeMap()

switchMap() 是為了攤平 flipPage$fetchBooks$ 所產生的 Higher Order Observable,且只取最新的 Inner Observable。

mergeMap() 是為了攤平 fetchBooks$fetchCategory$ 所產生的 Higher Order Observable,由於沒有特殊需求,因此使用最原始的 mergeMap()

x.map(fetchCategory$) 回傳為 Observable Array,因此使用了 forkJoin() 取得內部值。

Point-free

<template>
  <div>
    <button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
    <button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
    <button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
    <ul>
      <li v-for="(x, i) in books$" :key="i">
        {{ x.title }} / {{ x.price }} / {{ x.category }}
      </li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { forkJoin, pipe } from 'rxjs'
import { pluck, map, mergeMap, switchMap, delay } from 'rxjs/operators'
import { map as fmap } from 'ramda'

let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'data'),
  delay(1000)
)

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'category'),
  map(category => ({...x, category })),
  delay(5000)
)

let subscriptions = function() {
  let books$ = this.flipPage$.pipe(
    pluck('data'),
    switchMap(fetchBooks$),
    mergeMap(pipe(fmap(fetchCategory$), forkJoin))
  )

  return { books$ }
}

export default {
  name: 'app',
  domStreams: [
    'flipPage$'
  ],
  subscriptions
}
</script>

32 行

let books$ = this.flipPage$.pipe(
  pluck('data'),
  switchMap(fetchBooks$),
  mergeMap(pipe(fmap(fetchCategory$), forkJoin))
)

forkJoin() 較特殊之處是他不是 RxJS 的 operator,而隸屬於 rxjs 之下,因此使用上無法 pipe() 串其他 operator,而出現 forkJoin(x.map(fetchCategory$)) 類似 f(g(x)) 狀況。

此外,因為 x 為 Array,因此使用了原生的 map(),這使的 mergeMap() 無法 point-free。

18 行

import { map as fmap } from 'ramda'

引用了 Ramda 的 map(),因為與 RxJS 的 map() 同名,因此 alias 成 fmap()

35 行

mergeMap(pipe(fmap(fetchCategory$), forkJoin))

如此 fmap() 可使 mergeMap() 能 point-free, 也能使用 pipe()fmap()forkJoin() 串起來。

Asynchronous Update

<template>
  <div>
    <button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
    <button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
    <button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
    <ul>
      <li v-for="(x, i) in books" :key="i">
        {{ x.title }} / {{ x.price }} / {{ x.category }}
      </li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { pluck, map, mergeMap, switchMap, delay, tap } from 'rxjs/operators'
import { map as fmap, addIndex, forEach } from 'ramda'

let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'data'),
  delay(1000)
)

let fetchCategory$ = (x, i) => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'category'),
  map(category => ({...x, i, category })),
  delay(5000)
)

let subscriptions = function() {
  this.flipPage$.pipe(
    pluck('data'),
    switchMap(fetchBooks$),
    tap(x => this.books = x),
    mergeMap(addIndex(fmap)(fetchCategory$)),
    tap(forEach(x => this.$set(this.books, x.i, x)))
  ).subscribe()
}

export default {
  name: 'app',
  domStreams: [
    'flipPage$'
  ],
  data: () => ({
    books: []
  }),
  subscriptions
}
</script>

前例使用了 forkJoin(),若 fetchCategory$() 特別慢,會明顯感覺 button 按下後,要等一段時間才會全部出現 titlepricecategory

若想要 books 的 titleprice 先出現,而 category 再 asynchronous 慢慢出現,則要使用 asynchronous update 寫法。

45 行

data: () => ({
  books: []
}),

因為 fetchCategory$() 要 asynchronous 更新,因此 books 從 Observable 改成 Data。

24 行

let fetchCategory$ = (x, i) => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'category'),
  map(category => ({...x, i, category })),
  delay(5000)
)

特別加上了 i 傳入 index,稍後 this.$set() 更新 books 會使用。

31 行

this.flipPage$.pipe(
  pluck('data'),
  switchMap(fetchBooks$),
  tap(x => this.books = x),
  mergeMap(addIndex(fmap)(fetchCategory$)),
  tap(forEach(x => this.$set(this.books, x.i, x)))
).subscribe()

因為不再回傳 Observable,因此 flipPage$() 要加上 subscribe() 才會啟動 stream。

33 行

 switchMap(fetchBooks$),
 tap(x => this.books = x),

switchMap(fetchBooks$) 主要在於獲得 titleprice 先顯示,因此要使用 tap() 先寫入 books

35 行

mergeMap(addIndex(fmap)(fetchCategory$)),
tap(forEach(x => this.$set(this.books, x.i, x)))

Ramda 的 fmap() 的 callback 只有 x,但目前 fetchCategory$ 還需要有 index,因此使用 addIndex(fmap) 產生有 (x, i) 的 callback。

最後使用 tap() 配合 Ramda 的 forEach() 與 Vue 的 this.$set() 寫入 books

Multiple Column

Browser

paging001

除了 category 外,顯示多了 publisher

看似只多了一個 field,但寫法卻不太一樣。

Data

[
  {
    "id": 1,
    "publisher": "Manning"
  },
  {
    "id": 2,
    "publisher": "O'Reilly"
  },
  {
    "id": 3,
    "publisher": "Pragmatic"
  },
  {
    "id": 4,
    "publisher": "eBook"
  },
  {
    "id": 5,
    "publisher": "ReasonML"
  },
  {
    "id": 6,
    "publisher": "Node"
  },
  {
    "id": 7,
    "publisher": "Haskell"
  }
]

http://localhost:3000/publishers/:publisherId 會回傳 publisher

zip()

<template>
  <div>
    <button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
    <button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
    <button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
    <ul>
      <li v-for="(x, i) in books$" :key="i">
        {{ x.title }} / {{ x.price }} / {{ x.category }} / {{ x.publisher }}
      </li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { zip, forkJoin } from 'rxjs'
import { map, pluck, switchMap, delay, mergeMap } from 'rxjs/operators'

let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'data'),
  delay(1000)
)

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'category'),
  delay(5000)
)

let fetchPublisher$ = x => ajax(`http://localhost:3000/publishers/${x.publisherId}`).pipe(
  pluck('response', 'publisher'),
  delay(6000)
)

let fetchChild$ = x => {
  let category$ = fetchCategory$(x).pipe(
    map(category => ({ category }))
  )

  let publisher$ = fetchPublisher$(x).pipe(
    map(publisher => ({ publisher }))
  )

  return zip(category$, publisher$).pipe(
    map(([ { category }, { publisher }]) => ({...x, category, publisher }))
  )
}

let subscriptions = function() {
  let books$ = this.flipPage$.pipe(
    pluck('data'),
    switchMap(fetchBooks$),
    mergeMap(x => forkJoin(x.map(fetchChild$)))
  )

  return { books$ }
}

export default {
  name: 'app',
  domStreams: [
    'flipPage$'
  ],
  subscriptions
}
</script>

29 行

let fetchPublisher$ = x => ajax(`http://localhost:3000/publishers/${x.publisherId}`).pipe(
  pluck('response', 'publisher'),
  delay(6000)
)

呼叫 http://localhost:3000/publishers/:publisherId,並加上 delay 6 秒與 fetchCategory$() 有所差異。

49 行

let books$ = this.flipPage$.pipe(
  pluck('data'),
  switchMap(fetchBooks$),
  mergeMap(x => forkJoin(x.map(fetchChild$)))
)

一樣使用 swtichMap()mergeMap() 組合,差異只在 fetchChild$()

34 行

let fetchChild$ = x => {
  let category$ = fetchCategory$(x).pipe(
    map(category => ({ category }))
  )

  let publisher$ = fetchPublisher$(x).pipe(
    map(publisher => ({ publisher }))
  )

  return zip(category$, publisher$).pipe(
    map(([ { category }, { publisher }]) => ({...x, category, publisher }))
  )
}

單獨取得 category$publisher$,最後使用 zip() 組合 category$publisher$

Point-free

<template>
  <div>
    <button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
    <button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
    <button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
    <ul>
      <li v-for="(x, i) in books$" :key="i">
        {{ x.title }} / {{ x.price }} / {{ x.category }} / {{ x.publisher }}
      </li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { zip, forkJoin, pipe } from 'rxjs'
import { map, pluck, switchMap, delay, mergeMap } from 'rxjs/operators'
import { map as fmap } from 'ramda'

let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'data'),
  delay(1000)
)

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'category'),
  delay(5000)
)

let fetchPublisher$ = x => ajax(`http://localhost:3000/publishers/${x.publisherId}`).pipe(
  pluck('response', 'publisher'),
  delay(6000)
)

let fetchChild$ = x => {
  let category$ = fetchCategory$(x).pipe(
    map(category => ({ category }))
  )

  let publisher$ = fetchPublisher$(x).pipe(
    map(publisher => ({ publisher }))
  )

  return zip(category$, publisher$).pipe(
    map(([ { category }, { publisher }]) => ({...x, category, publisher }))
  )
}

let subscriptions = function() {
  let books$ = this.flipPage$.pipe(
    pluck('data'),
    switchMap(fetchBooks$),
    mergeMap(pipe(fmap(fetchChild$), forkJoin))
  )

  return { books$ }
}

export default {
  name: 'app',
  domStreams: [
    'flipPage$'
  ],
  subscriptions
}
</script>

50 行

let books$ = this.flipPage$.pipe(
  pluck('data'),
  switchMap(fetchBooks$),
  mergeMap(pipe(fmap(fetchChild$), forkJoin))
)

一樣使用 Ramda 的 fmap() 取代 x.map(),最後使用 pipe() 組合 fmap()forkJoin()

Asynchronous Update

<template>
  <div>
    <button v-stream:click="{ subject: flipPage$, data: 1 }">Page 1</button>
    <button v-stream:click="{ subject: flipPage$, data: 2 }">Page 2</button>
    <button v-stream:click="{ subject: flipPage$, data: 3 }">Page 3</button>
    <ul>
      <li v-for="(x, i) in books" :key="i">
        {{ x.title }} / {{ x.price }} / {{ x.category }} / {{ x.publisher }}
      </li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { zip } from 'rxjs'
import { map, pluck, switchMap, tap, delay, mergeMap } from 'rxjs/operators'
import { map as fmap, addIndex, forEach } from 'ramda'

let fetchBooks$ = x => ajax(`http://localhost:3000/books/${x}`).pipe(
  pluck('response', 'data'),
  delay(1000)
)

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'category'),
  delay(5000)
)

let fetchPublisher$ = x => ajax(`http://localhost:3000/publishers/${x.publisherId}`).pipe(
  pluck('response', 'publisher'),
  delay(6000)
)

let fetchChild$ = (x, i) => {
  let category$ = fetchCategory$(x).pipe(
    map(category => ({ category }))
  )

  let publisher$ = fetchPublisher$(x).pipe(
    map(publisher => ({ publisher }))
  )

  return zip(category$, publisher$).pipe(
    map(([ { category }, { publisher }]) => ({...x, i, category, publisher }))
  )
}

let subscriptions = function() {
  this.flipPage$.pipe(
    pluck('data'),
    switchMap(fetchBooks$),
    tap(x => this.books = x),
    mergeMap(addIndex(fmap)(fetchChild$)),
    tap(forEach(x => this.$set(this.books, x.i, x)))
  ).subscribe()
}

export default {
  name: 'app',
  domStreams: [
    'flipPage$'
  ],
  data: () => ({
    books: []
  }),
  subscriptions
}
</script>

50 行

this.flipPage$.pipe(
  pluck('data'),
  switchMap(fetchBooks$),
  tap(x => this.books = x),
  mergeMap(addIndex(fmap)(fetchChild$)),
  tap(forEach(x => this.$set(this.books, x.i, x)))
).subscribe()

一樣使用 addIndex(fmap) 產生有 index 的 callback,最後使用 Ramda 的 forEach() 與 Vue 的 this.$set() 寫入 books

Conclusion

  • 本例融合了 switchMap()mergeMap(),其中 switchMap() 是為了攤平 DOM Event 與 API request 造成的 Higher Order Observable;而 mergeMap() 是為了攤平 N+1 Query 所造成的 Higher Order Observable
  • 因為實務上 user 可以快速按下 Next button 或指定頁數換頁,因此使用 switchMap() 只取最新 Inner Observable
  • N + 1 Query 為傳統 Higher Order Observable,因此使用最基礎的 mergeMap() 即可

Sample Code

完整範例可在我的 GitHub 上找到

Reference

RxJS, switchMap()
RxJS, mergeMap()
RxJS, forkJon()
RxJS, zip()
Ramda, map()
Ramda, addIndex()