Node streams are a fantastic abstraction for evented programming. They’re also notoriously hard to implement. In this post, I’d like to walk through implementing a streams2 Readable stream to wrap an API.
Suppose we have a web service that returns a list of customers. There might be a large number of customers, so this service paginates the results and expects us to provide a page number when requesting data. A sample request to the API might look something like the line below.
And here’s an example of a response.
This API makes it convenient to fetch the next page of result by providing the url in the
nextPage key of the response. It also tells us if we’ve reached the last page of results.
To make is simple to test our stream that will warp this API, I’ve created an express app that implements our description. Let’s take a look at the code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
Note that I’ve specified an artifically small pageSize as well as number of customers to make testing easier. Also note that the
callCount variable and its associated functions are provided so that we can make assertions about the number of requests we’ve made to the API in our tests.
Before we dive into the implementing our stream, let’s write a few tests to specify the desired behavior. Here are the two key things we should test:
- We can treat our stream as any other node stream
- It lazily fetches results from the API as needed
Below are two tests written using mocha. The method we’ll be implementing is
customer.all(). This will eventually return our stream that wraps the API above.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
On line 2, we require the API we discussed above. On line 3, we require the
customer module we’re about the write. Before running any tests we start our API server listening on port 3000. Before each test we reset the call count on our API server.
Our first test verifies that we can interact with the return value from
customer.all() using the stream API. We attach
end events to the stream to switch it into flowing mode. When the stream emits the
end event, we verify that we’ve received all 5 customers, we’ve only made 3 calls to the API and that the first customer has the name “drew”.
Our second test verifies that we fetch results from our API lazily. We ensure this by writing a custom Writable stream that only asks for the first two customers from our stream and verifies that it has only called the API once to retrieve them. Finally, we pipe the result of
customer.all() into our Writable stream to kick things off.
Implementing the Stream
We know what our API looks like and we have tests to verify that our stream will work correctly. Now we need to implement it. First, let’s take a look at the customer module. It’s extremely simple because it just returns a new instance of
1 2 3 4 5
Now we’ve arrived at the real meat of this post. It’s time to dive into the implementation of our stream itself. Before doing so, let’s talk a bit about how to implement a Readable stream.
First, we need to inherit from the Readable stream base class. Next, we need to implement a
_read method. Our
_read method will be called each time someone is requesting data from our stream. Each time
_read is called, we’re expected to call the
push method (provided by the base class) at least once. In fact,
_read will not be called again until we’ve pushed at least one value. Calling
push pushes a value onto the stream to allow our consume access to it.
There’s a few other things we need to know about the
push method. First, if push ever returns
false that tells us we should not push any more values until
_read is called again. Second, when we’re done pushing all our data we need to call
null to signal the end of the stream.
So what does our
_read method need to do? The high level steps are as follows:
- If we have any customers in memory that we’ve fetched previously, we should push them.
- If we don’t have any customers and we’re on the last page, it’s time to stop.
- If neither of the previous two statements is true, we need to fetch new results from the server, buffer them, and push them.
Let’s take a look at the code. I’ll discuss a few pieces of it in detail.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
Let’s first look at the constructor on lines 6 – 13. We call the Readable stream’s constructor making sure it is in object mode. This allows us to push objects to the stream. Next we initialize our
isLastPage variables. Finally, we make sure
CustomerStream inherits from
We’ll examine the
_read method next. Notice that the body of the method very closely follows the steps we described above. If we have any customers available we push them. If we’re on the last page we push
null to indicate that the stream is complete. Otherwise, we fetch our next page of results.
pushBufferedCustomers method pops customers off the
customers array one at a time and pushes them. If any of those pushes returns
false, it stops.
fetchNextPage method uses the
request library to call our API. It then parses the body, adds the customers to the array, updates the
nextPage and updates
isLastPage. The call to
pushBufferedCustomers on line 23 is very important. As I said earlier, each call to
_read expects at least one call to
push. If we don’t push any results after fetching customers from the API, our stream would hang indefinitely waiting on results.
We now have a working stream that wraps our paginated API. If you’re interested in the code used for this blog post it can be found here.