Node streams are a fantastic abstraction for evented programming. They’re also notoriously hard to implement. In this post, I’d like to walk through implementing a streams2 Readable stream to wrap an API.

The API

Suppose we have a web service that returns a list of customers. There might be a large number of customers, so this service paginates the results and expects us to provide a page number when requesting data. A sample request to the API might look something like the line below.

curl http://localhost:3000/customers?page=1

And here’s an example of a response.

{"customers":[{"name":"drew"},{"name":"john"}],"nextPage":"/customers?page=2","isLastPage":false}

This API makes it convenient to fetch the next page of result by providing the url in the nextPage key of the response. It also tells us if we’ve reached the last page of results.

To make is simple to test our stream that will warp this API, I’ve created an express app that implements our description. Let’s take a look at the code.

var express = require('express');
var app = express();

var callCount = 0;
var customers = [
  {name: "drew"},
  {name: "john"},
  {name: "bill"},
  {name: "bob"},
  {name: "sam"}
];
var pageSize = 2;

app.get('/customers', function (req, res) {
  callCount += 1;

  var page = parseInt(req.query.page);
  var startingIndex = (page - 1) * pageSize;
  var endingIndex = Math.min(startingIndex + pageSize, customers.length);

  var currentCustomers = customers.slice(startingIndex, endingIndex);

  res.send(JSON.stringify({
    customers: currentCustomers,
    nextPage: "/customers?page=" + (page + 1),
    isLastPage: endingIndex === customers.length
  }));
});

app.resetCallCount = function () {
  callCount = 0;
};

app.getCallCount = function () {
  return callCount;
};

module.exports = app;

Note that I’ve specified an artifically small pageSize as well as number of customers to make testing easier. Also note that the callCount variable and its associated functions are provided so that we can make assertions about the number of requests we’ve made to the API in our tests.

Tests

Before we dive into the implementing our stream, let’s write a few tests to specify the desired behavior. Here are the two key things we should test:

  1. We can treat our stream as any other node stream
  2. It lazily fetches results from the API as needed

Below are two tests written using mocha. The method we’ll be implementing is customer.all(). This will eventually return our stream that wraps the API above.

var assert = require('assert');
var api = require('./support/api');
var customer = require('../lib/customer');
var Writable = require('stream').Writable;
var _ = require('lodash');

describe('customer', function () {
  before(function () {
    api.listen(3000);
  });

  beforeEach(function () {
    api.resetCallCount();
  });

  describe('#all', function () {
    it('returns a stream of customers from the api', function (done) {
      customers = []

      var stream = customer.all()

      stream.on('data', function (customer) {
        customers.push(customer);
      });

      stream.on('end', function () {
        assert.equal(5, customers.length);
        assert.equal('drew', customers[0].name);
        assert.equal(3, api.getCallCount());

        done();
      });
    });

    it('consumes lazily when piped', function (done) {
      var ws = new Writable({objectMode: true});
      var writeCount = 0;

      ws._write = function (chunk, enc, next) {
        writeCount += 1;

        if (writeCount < 2) {
          next();
        } else {
          assert.equal(1, api.getCallCount());
          done();
        }
      };

      customer.all().pipe(ws);
    });
  });
});

On line 2, we require the API we discussed above. On line 3, we require the customer module we’re about the write. Before running any tests we start our API server listening on port 3000. Before each test we reset the call count on our API server.

Our first test verifies that we can interact with the return value from customer.all() using the stream API. We attach data and end events to the stream to switch it into flowing mode. When the stream emits the end event, we verify that we’ve received all 5 customers, we’ve only made 3 calls to the API and that the first customer has the name “drew”.

Our second test verifies that we fetch results from our API lazily. We ensure this by writing a custom Writable stream that only asks for the first two customers from our stream and verifies that it has only called the API once to retrieve them. Finally, we pipe the result of customer.all() into our Writable stream to kick things off.

Implementing the Stream

We know what our API looks like and we have tests to verify that our stream will work correctly. Now we need to implement it. First, let’s take a look at the customer module. It’s extremely simple because it just returns a new instance of CustomerStream.

var CustomerStream = require('./customer-stream');

exports.all = function () {
  return new CustomerStream();
};

Now we’ve arrived at the real meat of this post. It’s time to dive into the implementation of our stream itself. Before doing so, let’s talk a bit about how to implement a Readable stream.

First, we need to inherit from the Readable stream base class. Next, we need to implement a _read method. Our _read method will be called each time someone is requesting data from our stream. Each time _read is called, we’re expected to call the push method (provided by the base class) at least once. In fact, _read will not be called again until we’ve pushed at least one value. Calling push pushes a value onto the stream to allow our consume access to it.

There’s a few other things we need to know about the push method. First, if push ever returns false that tells us we should not push any more values until _read is called again. Second, when we’re done pushing all our data we need to call push with null to signal the end of the stream.

So what does our _read method need to do? The high level steps are as follows:

  1. If we have any customers in memory that we’ve fetched previously, we should push them.
  2. If we don’t have any customers and we’re on the last page, it’s time to stop.
  3. If neither of the previous two statements is true, we need to fetch new results from the server, buffer them, and push them.

Let’s take a look at the code. I’ll discuss a few pieces of it in detail.

var _ = require('lodash');
var Readable = require('stream').Readable;
var request = require('request');
var util = require('util');

function CustomerStream() {
  Readable.call(this, {objectMode: true});

  this.customers = [];
  this.nextPage = '/customers?page=1';
  this.isLastPage = false;
}
util.inherits(CustomerStream, Readable);

CustomerStream.prototype.fetchNextPage = function () {
  request('http://localhost:3000' + this.nextPage, function (error, response,
body) {
    var response = JSON.parse(body);

    this.customers = this.customers.concat(response.customers);
    this.nextPage = response.nextPage;
    this.isLastPage = response.isLastPage;

    this.pushBufferedCustomers();
  }.bind(this));
};

CustomerStream.prototype.pushBufferedCustomers = function () {
  while (this.customers.length > 0) {
    var customer = this.customers.shift();

    if (this.push(customer) === false) {
      break;
    }
  };
};

CustomerStream.prototype._read = function () {
  if (this.customers.length > 0) {
    this.pushBufferedCustomers();
  } else if (this.isLastPage) {
    this.push(null);
  } else {
    this.fetchNextPage();
  }
};

module.exports = CustomerStream;

Let’s first look at the constructor on lines 6 - 13. We call the Readable stream’s constructor making sure it is in object mode. This allows us to push objects to the stream. Next we initialize our customers, nextPage and isLastPage variables. Finally, we make sure CustomerStream inherits from Readable.

We’ll examine the _read method next. Notice that the body of the method very closely follows the steps we described above. If we have any customers available we push them. If we’re on the last page we push null to indicate that the stream is complete. Otherwise, we fetch our next page of results.

The pushBufferedCustomers method pops customers off the customers array one at a time and pushes them. If any of those pushes returns false, it stops.

Finally, the fetchNextPage method uses the request library to call our API. It then parses the body, adds the customers to the array, updates the nextPage and updates isLastPage. The call to pushBufferedCustomers on line 23 is very important. As I said earlier, each call to _read expects at least one call to push. If we don’t push any results after fetching customers from the API, our stream would hang indefinitely waiting on results.

Wrap Up

We now have a working stream that wraps our paginated API.

Edit: removed link full code example.