Goのio packageのReader/Writer/Closer/Seeker interfaceとストリーム処理

golangaws

Goのio packageにはデータの読み書きに関わるインタフェース、Reader/Writer/Closer/Seeker およびこれらを組み合わせた ReadSeeker などが定義されている。

最大でlen(p)バイトpに読み込んで、読み込んだバイト数を返す。 最後まで読んだらerrでio.EOFを返すが、これは最後のバイトと同時でもその後でも良いことになっている。

type Reader interface {
	Read(p []byte) (n int, err error)
}

var EOF = errors.New("EOF")

データを書き込み、そのバイト数を返す。全て書き込めなかった(len(p) != n)場合はエラーを返す必要がある。

type Writer interface {
	Write(p []byte) (n int, err error)
}

2回以上呼んだときの挙動は規定されていない。

type Closer interface {
	Close() error
}

オフセットと起点を渡して読み書きする地点を変更し、Startからのオフセットを返す。

type Seeker interface {
	Seek(offset int64, whence int) (int64, error)
}

const (
	SeekStart   = 0 // seek relative to the origin of the file
	SeekCurrent = 1 // seek relative to the current offset
	SeekEnd     = 2 // seek relative to the end
)

ReaderとWriterを繋げてストリーム処理を行うことで、メモリ使用量を抑えることができる。 io.Copy(Writer, Reader)はEOFまでRead()してWrite()し、 io.Pipe()Write()したデータをRead()できるようにする。 io.Pipe()はバッファを持たないので、非同期で読み書きしないとdeadlockする。

import (
	"encoding/json"
	"io"
)

func marshal(data interface{}, writer io.Writer) {
	b, err := json.Marshal(data)
	if err != nil {
		panic(err)
	}
	writer.Write(b)
}

func encode(data interface{}, writer io.Writer) {
	// It can be used to connect code expecting an io.Reader with code expecting an io.Writer
	// The data is copied directly from the Write to the corresponding Read (or Reads); there is no internal buffering.
	r, w := io.Pipe()
	go func() {
		if err := json.NewEncoder(w).Encode(data); err != nil {
			panic(err)
		}
		w.Close() // => EOF
	}()
	// Copy copies from src to dst until either EOF is reached on src or an error occurs
	if _, err := io.Copy(writer, r); err != nil {
		panic(err)
	}
}

ベンチマークを取る。

go testでベンチマークを取ってpprofで時間がかかっている箇所を調べる - sambaiz-net

ちなみに、ioutil packageはgo1.16でdeprecatedになりDiscardもio packageに移ったのでそちらを使っている。

import (
	"io"
	"strings"
	"testing"
)

var data = map[string]string{
	strings.Repeat("A", 10000): strings.Repeat("B", 10000),
	strings.Repeat("C", 10000): strings.Repeat("D", 10000),
}

func BenchmarkMarshal(b *testing.B) {
	for i := 0; i < b.N; i++ {
		marshal(data, io.Discard)
	}
}

func BenchmarkEncode(b *testing.B) {
	for i := 0; i < b.N; i++ {
		encode(data, io.Discard)
	}
}

一度に処理するjson.Marshal()と比べるとメモリ使用量が1/50となっている。

$ go test -bench . -benchmem
...
BenchmarkMarshal-4         27168             48238 ns/op           41377 B/op         11 allocs/op
BenchmarkEncode-4          25454             53223 ns/op             850 B/op         16 allocs/op

インタフェースが噛み合わず一旦全てロードしなくてはいけないこともある。 aws-sdk-goのs3.PutUpload()io.ReadSeeker を取るので、Seek() できない bytes.Buffer などを渡すことができず bytes.NewReader(buf.Bytes()) することになるわけだが、 s3manager.Upload()というのもあって、 こちらはio.Readerを取るのでそのままアップロードできる。