Transform Streams and piping


1 | What is Transform Stream?


A Transform Stream is a Stream where the output is computed in some way from the input.


Custom Transform Streams implementations must implement the stream.Transform.prototype._transform() method and may also implement the stream.Transform.prototype._flush() method (out of scope of this book).


2 | The stream.Transform.prototype._transform() method


The stream.Transform.prototype._transform() transforms an input to an output. Let us visually represent what is happening:


Transform stream Source

On top of a Transform Stream, there is a Readable Stream (1). This Readable Stream (1) can be a standalone Readable Stream, or it can also be the Readable Stream of a Transform Stream.


The Readable Stream (1) passes a chunk of data (one chunk after the other) to the Writable Stream (2) of the Transform Stream. Then this Writable Stream (2) passes the chunk of data to the stream.Transform.prototype._transform() method. The method processes the chunk of data and pushes the data to the Readable Stream (3) of the Transform Stream with the method stream.Transform.prototype.push().


The stream.Transform.prototype._transform() method takes 3 arguments:


  1. The mandatory chunk argument. By default, it is either a Buffer or a String. It can also be any other valid JavaScript value or reference (except null), if the Transform Stream has the option writableObjectMode set tot true.
  2. The optional encoding argument. If the chunk argument is a String, the encoding argument specifies its format.
  3. The mandatory callback argument. This argument is a (callback) function that gets called after the supplied chunk has been processed. This function must be called only when the current chunk is completely consumed: therefore, it must be the last statement or appear in the last statement within the body of the method stream.Transform.prototype._transform(). The first argument passed to the callback must be an Error object if an error occurred while processing the input or null otherwise. If a second argument is passed to the callback, it will be forwarded on to the stream.Transform.prototype.push() method.

3 | How to implement a Transform Stream?


There are 2 different ways of implementing a Transform Stream (in our order of preference and recommendation):


  1. By using the constructor function stream.Transform() and the method stream.Transform.prototype._transform().
  2. By using a subclass of stream.Transform and the method stream.Transform.prototype._transform().

In this chapter, we will be exploring only the first way, which is the most simple straightforward way to implement Transform Streams.


The constructor function stream.Transform()


Custom Transform Streams that call the constructor function stream.Transform() must implement the method stream.Readable.prototype._transform().


stream.Transform() takes an object options as its argument. This object lists the possible options for both the Writable Stream and the Readable Stream of the Transform Stream. There are also options that concern either the Writable Stream only, the Readable Stream only or the Transform Stream as a whole. Here are the most useful ones, for those that concern only one side of the Stream or the Stream as a whole:



Here are the other options:



We strongly advise that you set the options readableObjectMode and writableObjectMode to true or, in an equivalent way, the option objectMode to true when working with Transform Streams and iterable objects.


4 | Piping Readable, Transform and Writable Streams


Example 1: process.stdin and process.stdout


In this example, we use 2 built-in Node.js Streams:


  1. process.stdin is a Readable Stream that reads what a user types in the Command Line Interface (CLI). stdin stands for standard input. When process.stdin appears in a program, the program will display a cursor to the screen and wait for the input of the user.
  2. process.stdout is a Writable Stream that writes data to the screen.

const stream = require('stream');

// Implementation of the Transform Stream
const shout = new stream.Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().trim().toUpperCase() + '!\n');
    callback();
  }
});

// Piping
stream.pipeline(
  process.stdin,
  shout,
  process.stdout,
  (err) => {
    if (err) {
      console.error('failed', err);
    } else {
      console.log('Successful copy!')
    }
  }
);

In the previous code, the shout Transformable Stream capitalizes and adds an exclamation mark to what the user types in (chunk), with all whitespace removes from the beginning and the end of the String.


We pipe the Readable Stream process.stdin, the shout Transformable Stream and the Writable Stream process.stdout.


Example 2: multiple Transform Streams


Here is an example of several Transforms Streams, that are used one after the other to perform several transformations on an initial Readable Stream.


For each Transform Stream, we perform a Transformation by defining the stream.Transform.prototype._transform via the property transform of the object passed as an argument to each constructor function stream.Transform().


Since our initial Readable Stream consists of the elements of an iterable object, we make sure to set the property objectMode to true in all Transform Streams. With the objectMode set to true, we make sure that:


  1. Each chunk of data passed to the different Streams is always a single element of the array.
  2. The format of the chunk does not change and does not need to be explicitly converted into a value different than a Buffer or a String.

Note that in the last Transform Stream , the transfrom function must push Strings or Buffers. Otherwise, Node.js throws an error.


// Creation of an iterable object: array of integers
let array = [];

for (let i = 0; i <= 100; i += 1) {
  array.push(i);
}

// Implementation of the Readable Stream
const stream = require('stream');
const source = stream.Readable.from(array);

// Implementation of the Writable Stream
const fs = require('fs');
const destination = fs.createWriteStream('destination.txt');

// Implementation of a 1st Transform Stream
const double = new stream.Transform({
  writableObjectMode: true,
  readableObjectMode: true,

  transform(chunk, encoding, callback) {
    this.push(chunk * 2);
    callback();
  }
});

// Implementation of a 2nd Transform Stream
const stringify = new stream.Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    this.push(String(chunk));
    callback();
  }
});

// Implementation of a 3rd Transform Stream
const punctuate = new stream.Transform({

  transform(chunk, encoding, callback) {
    this.push(chunk + '!');
    callback();
  }
});

// Implementation of a 4th Transform Stream
const addNewLine = new stream.Transform({

  transform(chunk, encoding, callback) {
    this.push(chunk + '\n');
    callback();
  }
});

// Piping
stream.pipeline(
  source,
  double,
  stringify,
  punctuate,
  addNewLine,
  destination,
  (err) => {
    if (err) {
      console.error('failed', err);
    } else {
      console.log('Successful copy!')
    }
  }
);

Example 3: neutral Transform Stream


The built-in Transform Stream PassThrough is a neutral Transform Stream. It does not transform anything: it simply passes the input across to the output. This seems quite useless, except if you you want to perform some testing or monitor the data that is piped.


In the following example, we use the PassThrough Stream to count the number of chunks that are piped. Since PassThrough is a Transform Stream, it has a Readable side. Therefore, we can register an event listener on the standard data event emitted by Readable Streams when they read data in flowing mode.


Note that we have to make sure that the chunks of data that are passed to the PassThrough Stream are Buffers or Strings, otherwise Node.js throws an error. By default, the objectMode of a PassThrough Stream, and more specifically its writableObjectMode is set to false, so it can only be passed Buffers or Strings.


// Creation of an iterable object: array of integers
let array = [];

for (let i = 0; i <= 99; i += 1) {
  array.push(String(i));
}

// Implementation of the Readable Stream
const stream = require('stream');
const source = stream.Readable.from(array);

// Implementation of the Writable Stream
const fs = require('fs');
const destination = fs.createWriteStream('destination.txt');

// Implementation of the neutral Transform Stream
const neutral = new stream.PassThrough();

// Gather data about the piping
let total = 0;
neutral.on('data', (chunk) => {
  total += 1;
  console.log('Chunks:', total);
})

// Piping
stream.pipeline(
  source,
  neutral,
  destination,
  (err) => {
    if (err) {
      console.error('failed', err);
    } else {
      console.log('Successful copy!')
    }
  }
);

Example 4: a throttle Transform Stream


In this example, we define a Transform Stream that does not transform the data but "throttles" it. This lets us control the flow of data that is piped. In our case, each chunk of data is processed every 100 milliseconds. The "throttle" Stream is the 1st Transform Stream in our example.


// Creation of an iterable object: array of integers
let array = [];

for (let i = 0; i <= 100; i += 1) {
  array.push(i);
}

// Implementation of the Readable Stream
const stream = require('stream');
const source = stream.Readable.from(array);

// Implementation of the Writable Stream
const fs = require('fs');
const destination = fs.createWriteStream('destination.txt');

// Implementation of a 1st Transform Stream: the "throttle" Stream
const throttle = new stream.Transform({
  objectMode: true,

  transform(chunk, encoding, callback) {
    console.log(chunk);
    this.push(chunk);
    setTimeout(callback, 100);
  }
});

// Implementation of a 2nd Transform Stream
const punctuate = new stream.Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    let punctuated = chunk.toString() + '!\n';
    this.push(punctuated);
    callback();
  }
});

// Piping
stream.pipeline(
  source,
  throttle,
  punctuate,
  destination,
  (err) => {
    if (err) {
      console.error('failed', err);
    } else {
      console.log('Successful copy!')
    }
  }
);

Author: Dimitri Alamkan
Initial publication date:
Last updated: