點燈坊

失くすものさえない今が強くなるチャンスよ

如何以 RxJS 顯示 Nested Observable ?

Sam Xiao's Avatar 2020-05-29

若 API 遵循 RESTful 精神設計,常會第一個 API 僅回傳 id,若要取得其值就要呼叫第二個 API,也就是所謂 N + 1 Query,RxJS 對這種常見需求有很漂亮寫法。

Version

macOS Catalina 10.15.4
WebStorm 2020.1.1
Vue 2.6.11
Vue-rx 6.2
RxJS 6.5.5

Browser

nested000

FPFRPJS 並不是從單一 API 取得,而是呼叫另外一個 API。

Double Side Effect

<template>
  <div>
    <ul v-for="(x, i) in books" :key="i">
      <li>{{ x.title }} / {{ x.price }} / {{ categories[i] }}</li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { forkJoin } from 'rxjs'
import { map, pluck, share } from 'rxjs/operators'

let fetchBooks$ = ajax('http://localhost:3000/books')

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`)

let mounted = function() {
  let books$ = fetchBooks$.pipe(
    pluck('response'),
    share()
  )

  books$.subscribe(x => this.books = x)

  books$.pipe(
    map(x => x.map(x => fetchCategory$(x))),
  ).subscribe(x => forkJoin(x).subscribe(x => {
    this.categories = x.map(x => x.response.value)
  }))
}

export default {
  name:'App',
  data: () => ({
    books: [],
    categories: []
  }),
  mounted
}
</script>

35 行

data: () => ({
  books: [],
  categories: []
}),

由於需呼叫兩個 API,直覺會使用兩個 side effect。

19 行

let books$ = fetchBooks$.pipe(
  pluck('response'),
  share()
)

呼叫第一個 API 取得 Object Array Observable,內含 titleprice property,將使用 categoryId 呼叫第二個 API。

因為稍後會使用兩次 books$ Observable,為避免產生兩次 API request,所以特別加上 share() operator。

24 行

books$.subscribe(x => this.books = x)

使用 subscribe() 寫下第一個 side effect: books data。

26 行

books$.pipe(
  map(x => x.map(x => fetchCategory$(x))),
)

最直覺會使用 map() 產生新 Observable,但由於 x 為 Array,所以必須再使用 Array 的 map() 改變其內部值。

nested001

由於 ajax() 回傳 Observable,目前結果為 Observable Array,這正是使用 forkJoin() 時機。

forkJoin()
Observable 版的 Promise.all(),可將 Observable Array 成為 Array

nested003

28 行

.subscribe(x => forkJoin(x).subscribe(x => {
  this.categories = x.map(x => x.response.value)
}))

x 為 Array,forkJoin() 之後為新的 Observable Array,再次使用 subscribe() 取得內部 Array,因此使用 Array 的 map() 取得 responsevalue

這是 RxJS 最原始使用方式,但由於在 subscribe() 中呼叫 forkJoin(),因此出現了 nested subscribe,很明顯這種寫法很糟糕

Refactoring

<template>
  <div>
    <ul v-for="(x, i) in books" :key="i">
      <li>{{ x.title }} / {{ x.price }} / {{ categories[i] }}</li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { forkJoin } from 'rxjs'
import { map, pluck, share } from 'rxjs/operators'

let fetchBooks$ = ajax('http://localhost:3000/books').pipe(
  pluck('response')
)

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'value')
)

let mounted = function() {
  let books$ = fetchBooks$.pipe(
    share()
  )

  books$.subscribe(x => this.books = x)

  books$.pipe(
    map(x => forkJoin(x.map(x => fetchCategory$(x))))
  ).subscribe(x => console.log(x))
}

export default {
  name:'App',
  data: () => ({
    books: [],
    categories: []
  }),
  mounted
}
</script>

29 行

books$.pipe(
  map(x => forkJoin(x.map(x => fetchCategories$(x))))
).subscribe(x => console.log(x))

之前寫法最少有兩點值得先改善:

  • 第二個 API 太晚從 response.value 取出值,應該在 ajax() 之後直接在 pipe() 內使用 pluck()
  • x.map() 回傳就是 Observable Array,可立即使用 forkJoin() 處理,這樣可以省下一個 subscribe()

nested001

這樣會發現結果為 Observable,且其 subscribe() 內的 x 仍然是 Observable,最後其內部才是 Observable Array。

這是因為 map() 本身回傳 Observable,且 forkJoin() 亦會回傳 Observable,因此造成 Observable of Observable。

flatMap()

<template>
  <div>
    <ul v-for="(x, i) in books" :key="i">
      <li>{{ x.title }} / {{ x.price }} / {{ categories[i] }}</li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { forkJoin } from 'rxjs'
import { flatMap, pluck, share } from 'rxjs/operators'

let fetchBooks$ = ajax('http://localhost:3000/books').pipe(
  pluck('response')
)

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'value')
)

let mounted = function() {
  let books$ = fetchBooks$.pipe(
    share()
  )

  books$.subscribe(x => this.books = x)

  books$.pipe(
    flatMap(x => forkJoin(x.map(fetchCategory$)))
  ).subscribe(x => this.categories = x)
}
export default {
  name:'App',
  data:() => ({
    books:[],
    categories:[]
  }),
  mounted
}
</script>

29 行

books$.pipe(
  flatMap(x => forkJoin(x.map(fetchCategory$))
)

根據使用 Array 經驗,只要出現 Array of Array 時,在 ECMAScript 會使用 Array.prototype.flatMap(),在 Ramda 會使用 chain() 將 Array 攤平,同樣道理,當出現 Observable of Observable 時,也可使用 flatMpa() 將 Observable 攤平。

flatMap()
將 Observable of Observable 攤平成 Observable

nested004

flatMap() 也稱為 mergeMap()

tap()

<template>
  <div>
    <ul v-for="(x, i) in books" :key="i">
      <li>{{ x.title }} / {{ x.price }} / {{ categories[i] }} </li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { forkJoin } from 'rxjs'
import { flatMap, pluck, tap } from 'rxjs/operators'

let fetchBooks$ = ajax('http://localhost:3000/books').pipe(
  pluck('response')
)

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'value')
)

let mounted = function() {
  fetchBooks$.pipe(
    tap(x => this.books = x),
    flatMap(x => forkJoin(x.map(fetchCategory$)))
  ).subscribe(x => this.categories = x)
}

export default {
  name:'App',
  data: () => ({
    books: [],
    categories: []
  }),
  mounted
}
</script>

24 行

tap(x => this.books = x),

Side effect 不是只能用 subscribe() 而已,也可使用 tap() 使 pipeline 不中斷,也不用建立新的 Observable,因此連 share() 都省了。

Single Side Effect

<template>
  <div>
    <ul v-for="(x, i) in books" :key="i">
      <li>{{ x.title }} / {{ x.price }} / {{ x.category }} </li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { forkJoin } from 'rxjs'
import { flatMap, pluck, map } from 'rxjs/operators'

let fetchBooks$ = ajax('http://localhost:3000/books').pipe(
  pluck('response')
)

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'value'),
  map(category => ({...x, category}))
)

let mounted = function() {
  fetchBooks$.pipe(
    flatMap(x => forkJoin(x.map(fetchCategory$)))
  ).subscribe(x => this.books = x)
}

export default {
  name:'App',
  data: () => ({
    books: []
  }),
  mounted
}
</script>

18 行

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'value'),
  map(category => ({...x, category}))
)

我們都知道 FP 強調盡量減少 side effect,可能由兩個 side effect 減少成單一 side effect 嗎 ?

在原本 fetchCategory() 內順便 map() 成新的包含 category property 的 book,這也是 HTML template 所預期的最後結果。

26 行

.subscribe(x => this.books = x)

如此只要單一 subscribe() 即可。

Subscription

<template>
  <div>
    <ul v-for="(x, i) in books$" :key="i">
      <li>{{ x.title }} / {{ x.price }} / {{ x.category }} </li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { forkJoin } from 'rxjs'
import { flatMap, pluck, map } from 'rxjs/operators'

let fetchBooks$ = ajax('http://localhost:3000/books').pipe(
  pluck('response')
)

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'value'),
  map(category => ({...x, category}))
)

let subscriptions = function() {
  let books$ = fetchBooks$.pipe(
    flatMap(x => forkJoin(x.map(fetchCategory$)))
  )

  return { books$ }
}

export default {
  name:'App',
  subscriptions
}
</script>

23 行

let subscriptions = function() {
  let books$ = fetchBooks$.pipe(
    flatMap(x => forkJoin(x.map(fetchCategory$)))
  )

  return { books$ }
}

也可改用 Vue-rx 的 subscription() 取代 subscribe(),如此連 Vue 的 datathis 都不用使用,也可安心使用 arrow function,更有 FP 味道。

Ramda

<template>
  <div>
    <ul v-for="(x, i) in books$" :key="i">
      <li>{{ x.title }} / {{ x.price }} / {{ x.category }} </li>
    </ul>
  </div>
</template>

<script>
import { ajax } from 'rxjs/ajax'
import { forkJoin } from 'rxjs'
import { flatMap, pluck, map } from 'rxjs/operators'
import { compose, map as fmap } from 'ramda'

let fetchBooks$ = ajax('http://localhost:3000/books').pipe(
  pluck('response')
)

let fetchCategory$ = x => ajax(`http://localhost:3000/categories/${x.categoryId}`).pipe(
  pluck('response', 'value'),
  map(category => ({...x, category}))
)

let toCategory = compose(forkJoin, fmap(fetchCategory$))

let subscriptions = function() {
  let books$ = fetchBooks$.pipe(
    flatMap(toCategory)
  )

  return { books$ }
}

export default {
  name:'App',
  subscriptions
}
</script>

24 行

let toCategory = compose(forkJoin, fmap(fetchCategory$))

可將 forkJoin()map()compose() 組合成 toCategory()

Ramda 的 map() 與 RxJS 的 map() 同名,因此將 Ramda 的 map() 取別名為 fmap()

28 行

flatMap(toCategory)

如此 flatMap() 的 callback 也 point-free 了。

Conclusion

  • 當寫出 nested subscribe 時,一定是 something wrong,試著調整 operator 使用順序,避免不必要的 subscribe
  • 可能一開始不會發現該使用 flatMap(),一旦出現 Observable of Observable 時,就是該使用 flatMap() 時機
  • 當 RxJS 與 Ramda 搭配時,Ramda 可用來 point-free RxJS operator 的 callback 部分
  • forkJoin() 不是 operator 一直是我很疑惑的設計,這使的 forkJoin() 無法與其他 RxJS operator 組合,但意外發現卻可與 Ramda function 組合

Reference

RxJS, flatMap()
RxJS, forkJoin()