yamaday0u Blog Written by yamaday0u

【Node.js】Stream APIを使ってデータを少しずつ処理してみよう

Node.js

こんにちは、やまだゆうです。

今回はNode.jsのStreamについて勉強してみました。本記事で紹介しているサンプルプログラムはぼくのGithubで公開していますので、クローンして触ってみることをおすすめします。

この記事の内容

  1. Streamとは
  2. fsモジュールを使ってファイルをストリームで複製する
  3. Transformを使って変換する

スポンサーリンク

Streamとは

Node.jsのStreamはストリーミングデータを扱うためのインターフェースです。Streamを活用することにより、対象のデータを小さな単位のデータに分けてちょっとずつ処理をすることができます。

Youtubeのストリーミング再生を考えるとイメージしやすいと思います。

Youtubeの動画って動画ファイルを全体を読み込んでから再生するわけではなく、今見ている部分のちょっと先までを少しずつ読み込んで再生していますよね。

Ksv_gracisによるPixabayからの画像

Streamの種類

Node.jsのStreamには以下の4つの種類があります。Types of streamsStream | Node.js v21.2.0 Documentation)

  • Readable:読み込み用のStream
  • Writable:書き込み用のStream
  • Duplex:ReadableかつWritableなStream
  • Transform:修正可能なDuplexなStream

fsモジュールを使ってファイルをストリームで複製する

簡単な例でStreamを試してみましょう。abc.csvというファイルの内容をStreamで別のファイルに複製してみましょう。

column1,column2,column3
a,b,c
d,e,f
g,h,i
j,k,l
m,n,o

以下のメインプログラムを実行します。

const { createReadStream, createWriteStream } = require("fs");

const readableStream = createReadStream("abc.csv");
const writableStream = createWriteStream("def.csv");

readableStream.pipe(writableStream);

実行結果は以下のとおりです。abc.csvファイルと同じディレクトリにdef.csvファイルが作成されます。

column1,column2,column3
a,b,c
d,e,f
g,h,i
j,k,l
m,n,o

pipe()とは

上記のメインプログラムにpipe()関数を使用しました。pipe()関数はReadable StreamとWritable Streamをつなぐ役割を担います。

readable.pipe(destination[, options])

pipe()関数は引数にWritable型を取り、Readable型のデータをWritable型に変換します。pipe()の型定義はこんな感じになっています。

class internal extends EventEmitter {
        pipe<T extends NodeJS.WritableStream>(
            destination: T,
            options?: {
                end?: boolean | undefined;
            },
        ): T;
        // ...
    }

Promiseと併用したい場合はpipeline()を使おう

Node.jsのバージョン15からはStreams Promises APIが追加されました。Promiseと併用してStreamの処理を実装するときはこちらを使いましょう。

const { createReadStream, createWriteStream } = require("fs");
const { pipeline } = require("stream/promises");

const readableStream = createReadStream("abc.csv");
const writableStream = createWriteStream("def.csv");

const run = async () => {
  await pipeline( // 基本のpipeline()は第1引数にReadable型、第2引数にWritable型を取る
    readableStream,
    writableStream
  )
};

try {
  run();
} catch (error) {
  console.log(error);
}

スポンサーリンク

Transformを使って変換する

Transformは入力されたデータに任意の計算を実行して出力するStreamです。

元のファイルを全部大文字にして新しいファイルに出力するプログラムのサンプルを見てみましょう。「the simplified constructor approach」と呼ばれる方法でTransformインスタンスを作成します。

const { createReadStream, createWriteStream } = require("fs");
const { pipeline } = require("stream/promises");

const readableStream = createReadStream("abc.csv");
const writableStream = createWriteStream("def.csv");

const { Transform } = require("stream");
const toUpperCaseTransformer = new Transform({ // 読み取ったデータを大文字に変換するTransformを作成
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
  }
});

const run = async () => {
  await pipeline(
    readableStream,
    toUpperCaseTransformer, // pipeline()の第2引数としてTrasnformを渡す
    writableStream
  )
};

try {
  run();
} catch (error) {
  console.log(error);
}
COLUMN1,COLUMN2,COLUMN3
A,B,C
D,E,F
G,H,I
J,K,L
M,N,O

chunkの中身はなに?

先ほどのTransformの定義の中でchunkという引数がありました。この中にはどんなデータがあるのか見てみましょう。

const toUpperCaseTransformer = new Transform({ // 読み取ったデータを大文字に変換するTransformを作成
  transform(chunk, encoding, callback) {
    console.log(chunk);
    this.push(chunk.toString().toUpperCase());
  }
});

// 結果
// <Buffer 63 6f 6c 75 6d 6e 31 2c 63 6f 6c 75 6d 6e 32 2c 63 6f 6c 75 6d 6e 33 0a 61 2c 62 2c 63 0a 64 2c 65 2c 66 0a 67 2c 68 2c 69 0a 6a 2c 6b 2c 6c 0a 6d 2c ... 3 more bytes>

Buffer型のデータでした。さらにこれをtoString()で文字列に変換して確認します。

const toUpperCaseTransformer = new Transform({ // 読み取ったデータを大文字に変換するTransformを作成
  transform(chunk, encoding, callback) {
    console.log(chunk.toString());
    this.push(chunk.toString().toUpperCase());
  }
});

// 結果
// column1,column2,column3
// a,b,c
// d,e,f
// g,h,i
// j,k,l
// m,n,o

Readable Streamで読み取ったデータが1行ずつchunkとして渡ってきていることがわかりました!

参考資料

yamaday0uを応援お願いします!あなたの1クリックが励みになります。
>> にほんブログ村