Try T.M Engineer Blog

多摩市で生息するエンジニアが「アウトプットする事は大事だ」と思って初めたブログ

Node.js Streamを勉強してみた

Node.jsを理解していくにあたり、Streamの理解は必須になってくるので、Streamのドキュメントを(まずは、さらっと理解程度に)読んでみた。

Streamとは?

Node.jsでストリーミングデータを操作するためのインターフェースのこと。インターフェースなので、ストリームという実装があるわけではなく、概念が決まっているだけ。

その概念は色々なもの( fs, process.stdin/stdout/stderrなど, HTTP request on the server/response on the client)で使用される。

ストリームには、以下4種類のインターフェースが定義されている。

  • Writable: データを書き込むことができるストリーム
  • Readable: データを読み取ることができるストリーム
  • Duplex: ReadableとWritableの両方ができるストリーム
  • Transform: Duplexデータ読み込みおよび書き込み時に、データの変更、変換ができるストリーム

オブジェクトモード

Node.js APIによって作成されたすべてのストリームは、文字列または、Bufferオブジェクトでのみ動作する。

バッファリング

WritableとReadable共にデータを内部バッファに保存する仕組みを持つ。

理由としては、一度に大量のデータを読み込み、または書き込みを行わないために、予め閾値(highWaterMark)を決めており、その分のデータを内部バッファに保存するためだそう。

注意点として、highWaterMarkは閾値であり、制限ではないとのこと。「一般に、厳密なメモリ制限は適用されません」とのこと。

EventEmitter

ストリームは、EventEmitterを継承している。EventEmitterとはイベント駆動でプログラムを動かすライブラリのこと。

たとえば以下の場合、 .onで、イベント名と実行内容を定義して、 .emit(イベント名)でイベントを発火させて実行する。

import EventEmitter from 'events'
const event = new EventEmitter();

console.log('1')
event.on('event', () => { // `.on`でイベントと実行内容を定義
    console.log('2')
})
console.log('3')

event.emit('event') // `.emit`でイベントを発火。結果は、132となる

ストリームもこのEventEmitterを継承しているため、動かし方も上記のような方法になる。

import fs from "fs"

const readStream = fs.createReadStream("assets/sample1.txt", { encoding: "utf-8" })
readStream.on("data", chunk => console.log(chunk)); // `.on`でイベント名と実行内容を定義

ストリームの種類とイベント・API

Writable: 書き込み

用意されているイベント: close, drain, error, finish, pipe, unpipe

Readable: 読み込み

用意されているイベント: close, data, end, error, pause, readable, resume

Duplex & Transform: 書き込み、読み込み、変換

用意されているイベント: WritableとReadableの両方

stream API

ストリームを実装できるように、同期版APIが用意されている。

stream/promises API

ストリームを実装できるように、非同期版APIが用意されている。(ぇ、そうなの?)

感覚を掴んでみる

Node.jsの Fyle System (fs) モジュールは、Streamの仕組みが使われている。
createReadStreamcreateWriteStreamを使えば、Streamを生成できるので、 createReadStream を使ってStreamの感覚を掴んでみることにする。

import * as fs from "fs"

// こうすることで、28byteずつ読み込む
const readStream = fs.createReadStream("assets/sample1.txt", {
  encoding: "utf-8",
  highWaterMark: 28
});

// 28byteずつ"data"イベントが発火する
readStream.on("data", chunk => console.log(chunk))
// すべての読み込みが完了したときにイベントが発火する
readStream.on("end", () => console.log('すべての読み込みが完了しました。'))

Streamについて、調べていると以下のような事を仰っている記事に辿り着いた。

qiita.com

Promise におけるフロー制御に Stream を導入することが難しい

なるほど。これを読むと、たしかにStreamとPromiseの相性の悪さがわかります。

"data"イベントを使用するのではなくfor awaitを使って制御するのが良さそうですね。以後、参考にさせていただきますm(= =)m

import * as fs from "fs"

// こうすることで、28byteずつ読み込む
const readStream = fs.createReadStream("assets/sample1.txt", {
  encoding: "utf-8",
  highWaterMark: 28
});

// すべての読み込みが完了したときにイベントが発火する
readStream.on("end", () => console.log('すべての読み込みが完了しました。'))

// async/awaitを使用する
async function main() {
  for await ( const chunk of readStream ) {
    console.log(chunk)
  }  
}

main()

まとめ

というわけで、今回は以下を学んだ。

そもそもStreamを学びたいと思った理由は、AWS S3のファイルを取得した際、型がStreamだったので「やべぇ・・・わからん!」となったのがキッカケ。 とりあえず、それを解決する程度には理解できたと思う。。。

  • Streamはインターフェースであり、Node.jsの様々なモジュールで使用されている。
  • StreamはEventEmitterであり、イベント駆動型でPromiseでのフロー制御には気をつけること。