Node.jsのStream API
node.jsStream APIとは
NodeでStreamデータを扱うためのもの。 例えばサイズが大きいファイルの入出力をStreamとして扱うことでバッファを最小限にできる。
StreamはEventEmitterで、 Readable streamやWritable stream、ReadableとWritableを合わせたDuplex streamと Readしたものを加工してWriteするTransform streamの種類があり、 それぞれ特定の関数が実装されている必要がある。
Readable stream
Readable streamにはflowing
とpaused
の
二つのモードがある。
最初はpaused
モードで、readableになってからread()することで読むことができる。
const fs = require('fs');
let readable = fs.createReadStream('sample.txt');
var i = 0;
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read(10))) {
console.log(`${i++}: ${chunk}`);
}
});
dable.on('end', () => {
console.log('end');
});
$ cat sample.txt
abcdefghijklmnopqrstuvwxyz
1234567890
あいうえお
$ node main.js
0: abcdefghij
1: klmnopqrst
2: uvwxyz
123
3: 4567890
あい
4: うえお
end
dataのイベントハンドラーを追加するか、後で書くpipeを使うとflowing
モードになる。
const fs = require('fs');
let readable = fs.createReadStream('sample.txt');
var i = 0;
readable.on('data', (chunk) => {
console.log(`${i++}: ${chunk}`);
});
readable.on('end', () => {
console.log('end');
});
0: abcdefghijklmnopqrstuvwxyz
1234567890
あいうえお
end
エラーハンドリングはこんな感じ。
const fs = require('fs');
let readable = fs.createReadStream('error.txt');
readable.on('data', (chunk) => {
console.log(`${i++}: ${chunk}`);
});
readable.on('error', (error) => {
console.log(error);
});
$ node main.js
{ Error: ENOENT: no such file or directory, open 'error.txt'
at Error (native) errno: -2, code: 'ENOENT', syscall: 'open', path: 'error.txt' }
実装
実装する関数は_read
。
const Readable = require('stream').Readable;
class Random extends Readable {
constructor(opt) {
super(opt);
}
_read() {
// error handling
// if(err){
// this.emit('error', err)
// return
// }
this.push(Math.random()+'');
}
}
Writable stream
output.txtに出力するWritable stream。
let writable = fs.createWriteStream('output.txt')
writable.write('hoge\n');
$ cat output.txt
hoge
入力の流量が多く、Writable streamのバッファがhighWaterMarkを超えてしまうと、write()はfalseを返す。そのまま書き込み続けるとメモリを食いつぶしてしまうので、 全てのバッファが捌けてdrainイベントが発行されるまで書き込みを止めてback-pressureとする必要がある。 ただし、pipeを使う場合このあたりはやってくれるので、あまり気にすることはない。
実装
実装する関数は_write
と、バッファされているchunkをまとめて扱うなら_writev
。
const Writable = require('stream').Writable;
class DummyWritable extends Writable {
constructor(opt) {
super(opt);
}
_write(chunk, encoding, callback) {
const chunkStr = chunk.toString()
if (chunkStr == 'this is error') {
callback(new Error('chunk is invalid'));
} else {
console.log(chunkStr);
callback();
}
}
}
pipe
Readable streamをWritable streamとつなげる。
const fs = require('fs');
let readable = fs.createReadStream('sample.txt');
let writable = fs.createWriteStream('output.txt');
readable.pipe(writable);
$ cat output.txt
abcdefghijklmnopqrstuvwxyz
1234567890
あいうえお
注意すべきなのは、pipeしたものをまとめてエラーハンドリングすることはできないこと。
const fs = require('fs');
let readable = fs.createReadStream('error.txt');
let writable = fs.createWriteStream('output.txt');
let piped = readable.pipe(writable);
piped.on('error', (error) => {
console.log(error);
});
events.js:160
throw er; // Unhandled 'error' event
^
Error: ENOENT: no such file or directory, open 'error.txt'
at Error (native)
面倒だが、毎度エラーハンドリングする必要がある。
const fs = require('fs');
let readable = fs.createReadStream('error.txt');
let writable = fs.createWriteStream('output.txt');
const errorHandling = (err) => { console.log(err) }
let piped = readable.on('error', errorHandling).pipe(writable);
pipeを組み合わせると、こんな風にcsvをfetchして加工し、文字コードを変えて出力するといったこともStreamでできる。
const fetch = require('node-fetch');
const Iconv = require('iconv').Iconv;
const iconv = new Iconv('UTF-8', 'SHIFT_JIS//IGNORE');
const csv = require('csv');
const fs = require('fs');
const errorHandling = (err) => { console.log(err); };
const outputFile = fs.createWriteStream('output.csv');
fetch('http://example.com/test.csv').then((res) => {
res.body
.pipe(csv.parse({columns : true}))
.on('error', errorHandling)
.pipe(csv.transform(function(record){
if(record['hoge'] < 100000){
return null;
}
return record;
}))
.on('error', errorHandling)
.pipe(csv.stringify({header: true}))
.on('error', errorHandling)
.pipe(iconv)
.on('error', errorHandling)
.pipe(outputFile)
.on('error', errorHandling);
}).then(() => console.log("done")).catch((err) => console.log(err));
pipeではないが、readlineのcreateInterfaceに入力と出力のStreamを渡すと、 行ごとに処理することができる。
const fs = require('fs');
const readline = require('readline');
let readable = fs.createReadStream('sample.txt');
const rl = readline.createInterface({
input: readable,
output: process.stdout
});
abcdefghijklmnopqrstuvwxyz
1234567890
あいうえお
RxJSで扱う
StreamはEventEmitterなのでRxJSのfromEvent()でObservableとして扱うこともできる。ただしv5にはpipeがない(v4にはある)ので、pipeする場合は自分でSubscribeしてwriteする必要がありそう。
const fs = require('fs');
const Rx = require('rxjs/Rx');
const writable = fs.createWriteStream('output.txt')
Rx.Observable.fromEvent(process.stdin, 'data')
.map((v) => `- ${v}`)
.subscribe((v) => write(v));
function write(v){
// TODO: back-pressure
writable.write(v);
}
$ node main.js
aiueo
kakikukeko
^C
$ cat output.txt
- aiueo
- kakikukeko