merge
2つのstreamの両方の値がemitされる。
Rx.Observable.merge(
stream1,
stream2
).subscribe(
data => console.log(`merge ${data}`),
err => console.log(`merge ${err}`)
);
forkJoin
completeしたときの最後の値を配列としてemitする。 非同期で一つ値をemitするようなstreamで、Promise.allのようなことをしたいときはこれ。
Rx.Observable.forkJoin(
stream1,
stream2
).subscribe(
data => console.log(` forkJoin: ${data}`),
err => console.log(` forkJoin: ${err}`)
)
concat
前のstreamがcompleteしたら次のstreamの値がemitされる。
Rx.Observable.concat(
stream1,
stream2
).subscribe(
data => console.log(` concat ${data}`),
err => console.log(` concat ${err}`)
);
combineLatest
stream自体を結合するのではなく値を結合する。 この例だと、stream1でemitされた値がa、stream2で最後にemitされた値がbのときa+bをemitする。 combineする値がない場合はemitされない。
stream1.combineLatest(stream2, (a, b) => a + b).subscribe(
data => console.log(` combineLatest ${data}`),
err => console.log(` combineLatest ${err}`)
);
同時に実行したときの結果
const stream1 = Rx.Observable.interval(100).map(v => `stream 1-${v+1}`).take(3);
const stream2 = Rx.Observable.interval(100).map(v => `stream 2-${v+1}`).take(3).delay(150);
merge stream 1-1
concat stream 1-1
merge stream 1-2
concat stream 1-2
merge stream 2-1 <- mergeではstream1はcompleteしていないが、stream2がemitされる
merge stream 1-3
concat stream 1-3
combineLatest stream 1-3stream 2-1 <- stream2の値がemitされたのでcombineする
merge stream 2-2
combineLatest stream 1-3stream 2-2
forkJoin: stream 1-3,stream 2-3 <- stream1とstream2がcompleteしたのでforkJoinでemitされる
combineLatest stream 1-3stream 2-3
merge stream 2-3
concat stream 2-1 <- concatではstream1がcompleteしたので、stream2がemitされる
concat stream 2-2
concat stream 2-3