Node.jsのStream API

node.js

Stream APIとは

NodeでStreamデータを扱うためのもの。 例えばサイズが大きいファイルの入出力をStreamとして扱うことでバッファを最小限にできる。

StreamはEventEmitterで、 Readable streamやWritable stream、ReadableとWritableを合わせたDuplex streamと Readしたものを加工してWriteするTransform streamの種類があり、 それぞれ特定の関数が実装されている必要がある。

Readable stream

Readable streamにはflowingpaused二つのモードがある。 最初はpausedモードで、readableになってからread()することで読むことができる。

const fs = require('fs');
let readable = fs.createReadStream('sample.txt');
var i = 0;
readable.on('readable', () => {
  let chunk;
  while (null !== (chunk = readable.read(10))) {
    console.log(`${i++}: ${chunk}`);
  }
});
dable.on('end', () => {
  console.log('end');
});
$ cat sample.txt
abcdefghijklmnopqrstuvwxyz
1234567890
あいうえお

$ node main.js
0: abcdefghij
1: klmnopqrst
2: uvwxyz
123
3: 4567890
あい
4: うえお

end

dataのイベントハンドラーを追加するか、後で書くpipeを使うとflowingモードになる。

const fs = require('fs');
let readable = fs.createReadStream('sample.txt');
var i = 0;
readable.on('data', (chunk) => {
  console.log(`${i++}: ${chunk}`);
});
readable.on('end', () => {
  console.log('end');
});
0: abcdefghijklmnopqrstuvwxyz
1234567890
あいうえお

end

エラーハンドリングはこんな感じ。

const fs = require('fs');
let readable = fs.createReadStream('error.txt');
readable.on('data', (chunk) => {
  console.log(`${i++}: ${chunk}`);
});
readable.on('error', (error) => {
  console.log(error);
});
$ node main.js
{ Error: ENOENT: no such file or directory, open 'error.txt'
    at Error (native) errno: -2, code: 'ENOENT', syscall: 'open', path: 'error.txt' }

実装

実装する関数は_read

const Readable = require('stream').Readable;

class Random extends Readable {
  constructor(opt) {
    super(opt); 
  }
  
  _read() {
    
    // error handling
    // if(err){ 
    //   this.emit('error', err)
    //   return
    // }
    
    this.push(Math.random()+'');
  }
}

Writable stream

output.txtに出力するWritable stream。

let writable = fs.createWriteStream('output.txt')

writable.write('hoge\n');
$ cat output.txt
hoge

入力の流量が多く、Writable streamのバッファがhighWaterMarkを超えてしまうと、write()はfalseを返す。そのまま書き込み続けるとメモリを食いつぶしてしまうので、 全てのバッファが捌けてdrainイベントが発行されるまで書き込みを止めてback-pressureとする必要がある。 ただし、pipeを使う場合このあたりはやってくれるので、あまり気にすることはない。

実装

実装する関数は_writeと、バッファされているchunkをまとめて扱うなら_writev

const Writable = require('stream').Writable;

class DummyWritable extends Writable {
  constructor(opt) {
    super(opt);
  }

  _write(chunk, encoding, callback) {
    const chunkStr = chunk.toString()
    if (chunkStr == 'this is error') {
      callback(new Error('chunk is invalid'));
    } else {
      console.log(chunkStr);
      callback();
    }
  }
}

pipe

Readable streamをWritable streamとつなげる。

const fs = require('fs');
let readable = fs.createReadStream('sample.txt');
let writable = fs.createWriteStream('output.txt');
readable.pipe(writable);
$ cat output.txt
abcdefghijklmnopqrstuvwxyz
1234567890
あいうえお

注意すべきなのは、pipeしたものをまとめてエラーハンドリングすることはできないこと。

const fs = require('fs');
let readable = fs.createReadStream('error.txt');
let writable = fs.createWriteStream('output.txt');
let piped = readable.pipe(writable);

piped.on('error', (error) => {
  console.log(error);
});
events.js:160
      throw er; // Unhandled 'error' event
      ^

Error: ENOENT: no such file or directory, open 'error.txt'
    at Error (native)

面倒だが、毎度エラーハンドリングする必要がある。

const fs = require('fs');
let readable = fs.createReadStream('error.txt');
let writable = fs.createWriteStream('output.txt');
const errorHandling = (err) => { console.log(err) }
let piped = readable.on('error', errorHandling).pipe(writable);

pipeを組み合わせると、こんな風にcsvをfetchして加工し、文字コードを変えて出力するといったこともStreamでできる。

const fetch = require('node-fetch');
const Iconv = require('iconv').Iconv;
const iconv = new Iconv('UTF-8', 'SHIFT_JIS//IGNORE');
const csv = require('csv');
const fs = require('fs');

const errorHandling = (err) => { console.log(err); };

const outputFile = fs.createWriteStream('output.csv');

fetch('http://example.com/test.csv').then((res) => {

  res.body
  .pipe(csv.parse({columns : true}))
  .on('error', errorHandling)
  .pipe(csv.transform(function(record){
    if(record['hoge'] < 100000){
      return null;
    }
    return record;
  }))
  .on('error', errorHandling)
  .pipe(csv.stringify({header: true}))
  .on('error', errorHandling)
  .pipe(iconv)
  .on('error', errorHandling)
  .pipe(outputFile)
  .on('error', errorHandling);

}).then(() => console.log("done")).catch((err) => console.log(err));

pipeではないが、readlineのcreateInterfaceに入力と出力のStreamを渡すと、 行ごとに処理することができる。

const fs = require('fs');
const readline = require('readline');

let readable = fs.createReadStream('sample.txt');
const rl = readline.createInterface({
  input: readable,
  output: process.stdout
});
abcdefghijklmnopqrstuvwxyz
1234567890
あいうえお

RxJSで扱う

StreamはEventEmitterなのでRxJSのfromEvent()でObservableとして扱うこともできる。ただしv5にはpipeがない(v4にはある)ので、pipeする場合は自分でSubscribeしてwriteする必要がありそう。

RxJSでRxをはじめる - sambaiz.net

const fs = require('fs');
const Rx = require('rxjs/Rx');
const writable = fs.createWriteStream('output.txt')

Rx.Observable.fromEvent(process.stdin, 'data')
.map((v) => `- ${v}`)
.subscribe((v) => write(v));

function write(v){
  // TODO: back-pressure
  writable.write(v);
}
$ node main.js 
aiueo
kakikukeko
^C

$ cat output.txt 
- aiueo
- kakikukeko