Node streams are a fantastic abstraction for evented programming. They’re also notoriously hard to implement. In this post, I’d like to walk through implementing a streams2 Readable stream to wrap an API.
The API
Suppose we have a web service that returns a list of customers. There might be a large number of customers, so this service paginates the results and expects us to provide a page number when requesting data. A sample request to the API might look something like the line below.
curl http://localhost:3000/customers?page=1
And here’s an example of a response.
{"customers":[{"name":"drew"},{"name":"john"}],"nextPage":"/customers?page=2","isLastPage":false}
This API makes it convenient to fetch the next page of result by providing the
url in the nextPage
key of the response. It also tells us if we’ve reached the
last page of results.
To make is simple to test our stream that will warp this API, I’ve created an express app that implements our description. Let’s take a look at the code.
var express = require('express');
var app = express();
var callCount = 0;
var customers = [
{name: "drew"},
{name: "john"},
{name: "bill"},
{name: "bob"},
{name: "sam"}
];
var pageSize = 2;
app.get('/customers', function (req, res) {
callCount += 1;
var page = parseInt(req.query.page);
var startingIndex = (page - 1) * pageSize;
var endingIndex = Math.min(startingIndex + pageSize, customers.length);
var currentCustomers = customers.slice(startingIndex, endingIndex);
res.send(JSON.stringify({
customers: currentCustomers,
nextPage: "/customers?page=" + (page + 1),
isLastPage: endingIndex === customers.length
}));
});
app.resetCallCount = function () {
callCount = 0;
};
app.getCallCount = function () {
return callCount;
};
module.exports = app;
Note that I’ve specified an artifically small pageSize as well as number of
customers to make testing easier. Also note that the callCount
variable and
its associated functions are provided so that we can make assertions about the
number of requests we’ve made to the API in our tests.
Tests
Before we dive into the implementing our stream, let’s write a few tests to specify the desired behavior. Here are the two key things we should test:
- We can treat our stream as any other node stream
- It lazily fetches results from the API as needed
Below are two tests written using mocha. The method we’ll be implementing is
customer.all()
. This will eventually return our stream that wraps the API
above.
var assert = require('assert');
var api = require('./support/api');
var customer = require('../lib/customer');
var Writable = require('stream').Writable;
var _ = require('lodash');
describe('customer', function () {
before(function () {
api.listen(3000);
});
beforeEach(function () {
api.resetCallCount();
});
describe('#all', function () {
it('returns a stream of customers from the api', function (done) {
customers = []
var stream = customer.all()
stream.on('data', function (customer) {
customers.push(customer);
});
stream.on('end', function () {
assert.equal(5, customers.length);
assert.equal('drew', customers[0].name);
assert.equal(3, api.getCallCount());
done();
});
});
it('consumes lazily when piped', function (done) {
var ws = new Writable({objectMode: true});
var writeCount = 0;
ws._write = function (chunk, enc, next) {
writeCount += 1;
if (writeCount < 2) {
next();
} else {
assert.equal(1, api.getCallCount());
done();
}
};
customer.all().pipe(ws);
});
});
});
On line 2, we require the API we discussed above. On line 3, we require the
customer
module we’re about the write. Before running any tests we start our
API server listening on port 3000. Before each test we reset the call count on
our API server.
Our first test verifies that we can interact with the return value from
customer.all()
using the stream API. We attach data
and end
events to the
stream to switch it into flowing mode. When the stream emits the end
event, we
verify that we’ve received all 5 customers, we’ve only made 3 calls to the API
and that the first customer has the name “drew”.
Our second test verifies that we fetch results from our API lazily. We ensure
this by writing a custom Writable
stream that
only asks for the first two customers from our stream and verifies that it has
only called the API once to retrieve them. Finally, we pipe the result of
customer.all()
into our Writable stream to kick things off.
Implementing the Stream
We know what our API looks like and we have tests to verify that our stream will
work correctly. Now we need to implement it. First, let’s take a look at the
customer module. It’s extremely simple because it just returns a new instance of
CustomerStream
.
var CustomerStream = require('./customer-stream');
exports.all = function () {
return new CustomerStream();
};
Now we’ve arrived at the real meat of this post. It’s time to dive into the implementation of our stream itself. Before doing so, let’s talk a bit about how to implement a Readable stream.
First, we need to inherit from the Readable stream base class. Next, we need to
implement a _read
method. Our _read
method will be called each time someone
is requesting data from our stream. Each time _read
is called, we’re expected
to call the push
method (provided by the base class) at least once. In fact,
_read
will not be called again until we’ve pushed at least one value. Calling
push
pushes a value onto the stream to allow our consume access to it.
There’s a few other things we need to know about the push
method. First, if
push ever returns false
that tells us we should not push any more values until
_read
is called again. Second, when we’re done pushing all our data we need to
call push
with null
to signal the end of the stream.
So what does our _read
method need to do? The high level steps are as follows:
- If we have any customers in memory that we’ve fetched previously, we should push them.
- If we don’t have any customers and we’re on the last page, it’s time to stop.
- If neither of the previous two statements is true, we need to fetch new results from the server, buffer them, and push them.
Let’s take a look at the code. I’ll discuss a few pieces of it in detail.
var _ = require('lodash');
var Readable = require('stream').Readable;
var request = require('request');
var util = require('util');
function CustomerStream() {
Readable.call(this, {objectMode: true});
this.customers = [];
this.nextPage = '/customers?page=1';
this.isLastPage = false;
}
util.inherits(CustomerStream, Readable);
CustomerStream.prototype.fetchNextPage = function () {
request('http://localhost:3000' + this.nextPage, function (error, response,
body) {
var response = JSON.parse(body);
this.customers = this.customers.concat(response.customers);
this.nextPage = response.nextPage;
this.isLastPage = response.isLastPage;
this.pushBufferedCustomers();
}.bind(this));
};
CustomerStream.prototype.pushBufferedCustomers = function () {
while (this.customers.length > 0) {
var customer = this.customers.shift();
if (this.push(customer) === false) {
break;
}
};
};
CustomerStream.prototype._read = function () {
if (this.customers.length > 0) {
this.pushBufferedCustomers();
} else if (this.isLastPage) {
this.push(null);
} else {
this.fetchNextPage();
}
};
module.exports = CustomerStream;
Let’s first look at the constructor on lines 6 - 13. We call the Readable
stream’s constructor making sure it is in object mode. This allows us to push
objects to the stream. Next we initialize our customers
, nextPage
and
isLastPage
variables. Finally, we make sure CustomerStream
inherits from
Readable
.
We’ll examine the _read
method next. Notice that the body of the method very
closely follows the steps we described above. If we have any customers available
we push them. If we’re on the last page we push null
to indicate that the
stream is complete. Otherwise, we fetch our next page of results.
The pushBufferedCustomers
method pops customers off the customers
array one
at a time and pushes them. If any of those pushes returns false
, it stops.
Finally, the fetchNextPage
method uses the request
library to call our API.
It then parses the body, adds the customers to the array, updates the nextPage
and updates isLastPage
. The call to pushBufferedCustomers
on line 23 is very
important. As I said earlier, each call to _read
expects at least one call to
push
. If we don’t push any results after fetching customers from the API, our
stream would hang indefinitely waiting on results.
Wrap Up
We now have a working stream that wraps our paginated API.
Edit: removed link full code example.