https://github.com/ReactiveX/rxjs
Rx(ReactiveX)とは
非同期処理をうまく扱えるようにするライブラリ。いろんな言語で実装されている。 非同期処理の結果はObservableなStreamに流される。 ObservableはIteratableのように扱うことができる。
RxはObserver pattern を拡張したもの。 Observer patternというのは、Subjectが、Observeしている全てのObserverに対して通知を送るデザインパターン。 C#などのeventのそれ。
C#のdelegateとevent - sambaiz.net
試してみる
inputのkeyupイベントのObservableを作成し、それをsubscribe()して出力している。
<html>
<head>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
</head>
<body>
<input type="text" id="input" />
<script>
const inputForm = document.querySelector('#input');
const keyups = Rx.Observable.fromEvent(inputForm, 'keyup');
keyups.subscribe(
data => console.log(data),
err => console.log(err)
);
</script>
</body>
</html>
入力するとこんなのが出力される。
KeyboardEvent {isTrusted: true, key: "a", code: "KeyA", location: 0, ctrlKey: false…}
KeyboardEvent {isTrusted: true, key: "b", code: "KeyB", location: 0, ctrlKey: false…}
KeyboardEvent {isTrusted: true, key: "c", code: "KeyC", location: 0, ctrlKey: false…}
Observable
create
next()でObservableに値をemitし、complete()で終了させる。
error()でエラーをemitするとそれ以後の値はemitされない。
Rx.Observable.create(function (observer) {
observer.next("AAAAA");
observer.next("BBBBB");
observer.next("CCCCC");
observer.complete();
}).subscribe(
data => console.log(data),
err => {},
() => console.log("completed")
);
AAAA
BBBB
CCCC
completed
from
配列などのIteratableをObservableに変換する。
Rx.Observable.from([1,2,3]).subscribe(
data => console.log(data),
err => {},
() => console.log("completed")
);
1
2
3
completed
fromEvent
上で使ったやつ。
Rx.Observable.fromEvent(document.querySelector('#input'), 'keyup').subscribe(
data => console.log(data),
err => {},
() => console.log("completed")
);
KeyboardEvent {isTrusted: true, key: "a", code: "KeyA", location: 0, ctrlKey: false…}
fromPromise
PromiseもObservableに変換できる。
Rx.Observable.fromPromise(Promise.resolve("ok")).subscribe(
data => console.log(data),
err => {},
() => console.log("completed")
);
ok
completed
interval
一定時間ごとにemitし続ける。
Rx.Observable.interval(1000).subscribe(
data => console.log(data),
err => {},
() => console.log("completed")
);
0
1
2
3
RxJSでObservableを結合する(merge, forkJoin, concat, combineLatest) - sambaiz.net
Operator
Observableのメソッド。新しいObservableを作って返す。
上で試したkeyupのObservableにいろいろやってみる。
pluck
nestされたプロパティを指定する。この例だと.target.value。
const keyups = Rx.Observable.fromEvent(inputForm, 'keyup')
.pluck('target', 'value');
h
ho
hog
hoge
filter
フィルタリングする。この例だと長さが2より大きいものだけがemitされる。
const keyups = Rx.Observable.fromEvent(inputForm, 'keyup')
.pluck('target', 'value')
.filter(text => text.length > 2 );
hog
hoge
map
map。この例だとvalue: ${text}のフォーマットでemitされる。
const keyups = Rx.Observable.fromEvent(inputForm, 'keyup')
.pluck('target', 'value')
.filter(text => text.length > 2 )
.map(text => `value: ${text}`);
value: hog
value: hoge
reduce
reduce。emitされるのはcompleteされたときなので、takeUntil()で 渡したObservableが何かemitしたときにcompleteさせるようにしている。
const keyups = Rx.Observable.fromEvent(inputForm, 'keyup')
.takeUntil(Rx.Observable.interval(5000))
.pluck('target', 'value')
.filter(text => text.length > 2 )
.map(text => `value: ${text}`)
.reduce((acc, curr) => `${acc} ${curr}`, "");
keyups.subscribe(
data => console.log(data),
err => console.log(err),
() => console.log("completed")
);
value: aaa value: aaaa value: aaaaa value: aaaaaa value: aaaaaaa value: aaaaaaaa value: aaaaaaaaa value: aaaaaaaaaa value: aaaaaaaaaaa
completed
Subject
Observerでもあり、Observableでもあるブリッジのようなもの。
これまでのObservableはSubscribeされるまでemitしない"Cold"なものだったが、 SubjectはそんなObservableをSubscribeし、それをトリガーにemitするので、 “Cold"なObservableを常にemitし得る"Hot"なものに変えることができる。
// ColdなObservable
const cold = Rx.Observable.from([1,2,3]);
// Coldだと、いつから、何回読んでも同じ値が得られる
// 1, 2, 3, completed
cold.subscribe(
data => console.log(data),
err => {},
() => console.log("completed")
);
// 1, 2, 3, completed
cold.subscribe(
data => console.log(data),
err => {},
() => console.log("completed")
);
(Publish)Subject
Subscribeした時点からemitされたアイテムをemitする。それまでにemitされたアイテムはしない。
const subject = new Rx.Subject();
subject.subscribe(
data => console.log(`1: ${data}`),
err => {},
() => console.log("1: completed")
);
subject.next("AAA") // 1: AAA
subject.subscribe(
data => console.log(`2: ${data}`),
err => {},
() => console.log("2: completed")
);
subject.next("BBB");
subject.complete();
1: AAA
1: BBB
2: BBB
1: completed
2: completed
AsyncSubject
complete時に最後にemitされた値だけをemitする。
const subject = new Rx.AsyncSubject();
subject.subscribe(
data => console.log(`1: ${data}`),
err => {},
() => console.log("1: completed")
);
subject.next("AAA");
subject.next("BBB");
subject.complete();
subject.subscribe(
data => console.log(`2: ${data}`),
err => {},
() => console.log("2: completed")
);
1: BBB
1: completed
2: BBB
2: completed
BehaviorSubject
Subscribeしたとき、最近のアイテムをemitする。あとはSubjectと同じ。
const subject = new Rx.BehaviorSubject("ZZZ")
subject.subscribe(
data => console.log(`1: ${data}`),
err => {},
() => console.log("1: completed")
);
subject.next("AAA");
subject.next("BBB");
subject.subscribe(
data => console.log(`2: ${data}`),
err => {},
() => console.log("2: completed")
);
subject.next("CCC");
subject.complete();
1: ZZZ
1: AAA
1: BBB
2: BBB
1: CCC
2: CCC
1: completed
2: completed
ReplaySubject
いつSubscribeしてもbufferにある全てのアイテムをemitする。
const subject = new Rx.ReplaySubject(2) // buffer size = 2
subject.subscribe(
data => console.log(`1: ${data}`),
err => {},
() => console.log("1: completed")
);
subject.next("AAA");
subject.next("BBB");
subject.next("CCC");
subject.next("DDD");
subject.subscribe(
data => console.log(`2: ${data}`),
err => {},
() => console.log("2: completed")
);
subject.complete();
buffer size = 2 なので2がSubscribeしたときにはAAAとBBBはもうない。
1: AAA
1: BBB
1: CCC
1: DDD
2: CCC
2: DDD
1: completed
2: completed