Node.js - guide to streams

Published on
Table of Contents

What are Node.js streams?

Streams are a core part of working with Node.js. If you use a framework like Next or Nest.js you might go a long time between using streams, but behind the scenes they're in use every time you run your web server.

They are a method to handle the reading and writing of data (file contents, network data etc). Very useful if processing data like audio/video content too.

A quick example of reading a file contents without using streams and using streams:

  • If you use fs.readFileSync() it will return the entire file at once. This is ok for small files, but impractical if reading and parsing a huge file.

  • But if you use streams, it will return small chunks at a time. This reduces your memory usage, especially for large files. It also means you can start processing it sooner (you don't have to wait for the entire data to be received/loaded into memory)

Overview of the different types of streams in Node.js

Readable

These are streams which lets you read data from it. Example: fs.createReadStream()

Writable

These are streams which lets you write data.

Example: fs.createWriteStream()

Duplex

These are streams which let you both read and write data.

Transform

Streams which can transform (modify) data as it is written.

Using streams in node JS

Events in streams

Example of listening to data events and then writing to another stream:

const fs = require('fs');
// create a couple of streams
const readStream = fs.createReadStream('./input-csv.txt');
const writableStream = fs.createWriteStream('./output-file.txt');

// listen to the 'data' event
readStream.on('data', (chunkOfData) => {
   writableStream.write(doSomeProcessingOrTransform(chunkOfData));
});

(note: you can use pipes to do this in a more simple way!)

Writing to a stream

Here is a simple example of writing to a stream in nodejs:

const fs = require('fs');
const writeStream = fs.createWriteStream('./output-file.txt', {flags: 'a'});
writeStream.write('Hello, world!');

Reading from a stream

You read from a stream by listening for the on('data', ...) event, which will send you chunks of data.

e.g.

const readStream = fs.createReadStream(filePath);

readStream.on('data', (chunk) => {
    console.log('received chunk of data', chunk);
})

You should also listen to errors with the on('error', ...) event listener:

readStream.on('error', (error) => {
    console.log('received error', error);
})

Turn a read stream chunks into one string (with async/await)

You can use a function like this to take a read stream and be able to await it and get a string output:

function fromStreamToString (yourReadStream) {
  const chunks = [];
  
  return new Promise((res, rej) => {
    // when we receive a new chunk, push it to chunks array
    stream.on('data', (chunk) => chunks.push(Buffer.from(chunk)));
    
    // when we get the 'end' event, call the resolve promise function with the output
    // of turning the chunks into a string:
    stream.on('end', () => res(Buffer.concat(chunks).toString('utf8')));
    
    // on stream error call the promise reject function:
    stream.on('error', (err) => rej(err));
  })
}

const strOutput = await fromStreamToString(yourReadStream)

Piping multiple streams together

You can use the .pipe() function to take a stream (like reading from a file) and pipe it to another stream (like a writable stream that will write to a file).

const readStream = fs.createReadStream('./from-file.txt')
const outputStream = fs.createWriteStream('./output-file.txt')

readStream.pipe(outputStream)

outputStream.on('finish', () => {
    console.log('done!')
})

Using transformations on a stream

You can use transforms on a stream to change its contents as its read/written.

You can use the Transform for this (found on the stream import)

const stream = require('stream');
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');

function reverseChunk(input) {
    return input.toString().split("").reverse().join("")
}

const reversedStream = new stream.Transform({
    transform (data, _encoding, callback) {
        this.push(reverseChunk(data));
        callback();
    }
});

readStream // original read stream
    .pipe(reversedStream) // reverse the strings
    .pipe(writeStream) // pipe to an output stream
    .on('finish', () => {
    console.log("Done! output.txt now has reversed text");
});

Error handling in streams

When streams encounter errors, you can't catch them with a regular try {...} catch() {...} block.

You have to listen to the 'error' event.

e.g.


const someStream = fs.createReadStream('input.txt');
someStream.on('error', (error) => {
    console.error("An error occurred!", error);
})