Elixir Streams

I previously wrote about explicitness in Elixir. One of my favorite ways the language embraces explicitness is in its distinction between eager and lazy operations on collections. Any time you use the Enum module, you're performing an eager operation. Your collection will be transformed/mapped/enumerated immediately. When you use the Stream module, you're performing lazy operations.

The Stream module provides many of the same operations as Enum, but when used with Stream they're describing future computations rather than actions to be taken immediately. Conveniently, all streams also implement the Enumerable protocol, meaning they can use any of the functions within Enum.

You can realize a stream (eagerly forcing the computation to take place) by using any of the functions inside of Enum. Let's see some examples. The examples will be a mix of iex sessions and Elixir files to help demonstrate the concepts.

A Few Examples

Here's an extremely simple example of using the Stream.map function.

iex(1)> [1, 2, 3] |> Stream.map(&(&1 + 1))  
#Stream<[enum: [1, 2, 3], funs: [#Function<45.29647706/1 in Stream.map/2>]]>

I've used an iex session because it helps to understand the return value. As you can see, a Stream struct is returned. The details are not important now, but you can see that it is storing functions to be applied in the future rather than actually performing any computation on the data immediately.

We can force the computation using the Enum.to_list function.

iex(2)> [1, 2, 3]  
|> Stream.map(&(&1 + 1))
|> Enum.to_list
[2, 3, 4]

Let's look at another example that should give us more of an idea of the advantages streams provide over traditional collections. This time, we'll operate on a Range. In Elixir, ranges are implemented as streams, meaning we can build very large ranges without immediately realizing them.

iex(1)> 1..1000000  
|> Stream.map(&("This is number #{&1}"))
|> Enum.take(2)
["This is number 1", "This is number 2"]

The important thing to understand from this example is that we're using Stream.map to represent a transform on a range with 1,000,000 items but we're only ever performing the computation on 2 of them. The Enum.take function only asks for 2 items and this means the stream will only perform computation on the first 2 items in the range.

This can be an extremely useful way to represent computation on very large (or even infinitely large) sets of data in a tractable way.

Streams are composable, meaning we can perform an arbitrary number of computation stages efficiently on large collections.

iex(2)> 1..1000000  
|> Stream.filter(&Integer.is_even/1)
|> Stream.map(&("This is number #{&1}"))
|> Enum.take(2)
["This is number 2", "This is number 4"]

Standard Library Streams

Elixir's standard library provides several streams out of the box. We've already seen Range. Let's quickly discuss some other commonly used streams.

The File.stream! function provides the lines (by default) or bytes of a file in the form a stream. This is a pleasant way to consume file data.

"./my_file.txt"
|> File.stream!
|> Stream.map(&String.strip/1)
|> Stream.with_index
|> Stream.map(fn {line, i} -> "#{i}: #{line}" end)
|> Enum.take(1)
|> IO.inspect

# => ["0: first line"]

This program lazily strips the newline from the end of each line of the file, adds the index at the beginning of each line, takes the first line and prints it. Because we've only asked for the first line, the file could be extremely large and our program would still be efficient.

Another commonly used function that returns a stream is IO.stream. As you might expect, this provides a stream interface to things such as standard in. Let's make our previous program operate on standard in rather than a file.

IO.stream(:stdio, :line)  
|> Stream.map(&String.strip/1)
|> Stream.with_index
|> Stream.map(fn {line, i} -> "#{i}: #{line}" end)
|> Enum.take(1)
|> IO.inspect

# $ elixir foo.ex < my_file.txt
# ["0: first line"]

As a final example, the GenEvent.stream function creates a stream of events from a GenEvent manager. The details of GenEvent are beyond the scope of this post, so that's all we'll say on the subject of this function. If you're interested you can read more in the docs.

Stream Construction

We've just seen a few of the streams provided by Elixir's standard library. Elixir also provides several functions in the Stream module that allow you construct your own streams. Let's take a look at a few of them. We'll start with simple constructors and get progressively more complex.

Stream.repeatedly is a simple stream constructor that builds an infinite stream by calling the provided function each time an item is requested from the stream. Let's make an infinite stream of all 1s.

iex(1)> Stream.repeatedly(fn -> 1 end) |> Enum.take(5)  
[1, 1, 1, 1, 1]

Next, there's Stream.iterate. This takes an initial value and a "generator" function. This function will be called with the previous item in the stream (starting with the initial value) and is expected to return the next item. We can make a stream of all the positive integers.

iex(1)> Stream.iterate(1, &(&1 + 1)) |> Enum.take(5)  
[1, 2, 3, 4, 5]

Now, let's talk about Stream.unfold. This function is more complex but also allows creating more flexible streams. Unfold takes two arguments: an initial value for the "accumulator" and a generator function. The generator function will be called with the accumulator and is expected to return a tuple of {next_element, new_accumulator}. The next_element value represents the next item in the stream and the new_accumulator will be passed to the generator function on the subsequent call. Let's use Stream.unfold to make an infinite stream of the fibonacci sequence.

fibs = Stream.unfold({1, 1}, fn {a, b} ->  
  {a, {b, a + b}}
end)

fibs  
|> Enum.take(10)
|> IO.inspect

# => [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]

Note that our accumulator represents two numbers in the sequence because we need both of these numbers to generate the next number.

The last constructor function we'll look at is Stream.resource. This function is well suited to building streams around external resources. It is called by providing a "start function", a "next function" and an "after function". It's best understood with an example, so let's use it to wrap Github's API.

Building an API with Streams

Many of the resources provided by Github's API return paginated results. We want to make the pagination simple but efficient for the consumers of our library. It should be invisible but also only performed if actually need the results on a given page. First, let's create a Github gateway module that will be responsible for communicating with the Github API. We'll use HTTPoison for the HTTP communication.

defmodule Github.Gateway do  
  use HTTPoison.Base

  @endpoint "https://github.com/api/v3"

  def endpoint do
    @endpoint
  end

  defp process_url(url) do
    @endpoint <> url
  end

  defp process_request_headers(headers) do
    headers ++ [
      {"Authorization", "Basic #{:base64.encode(credentials)}"}
    ]
  end

  defp credentials do
    "#{config[:access_token]}:x-oauth-basic"
  end

  defp config do
    Application.get_env(:house_keeper, __MODULE__)
  end
end  

Our gateway module assumes that we've configured our application with a personal access token.

Responses from Github contain two attributes relevant to our API: the Link header and the request body. The Link header tells us about the next page of results (if any exists). The request body is a JSON serialized collection of results. Let's create a module that generates a Stream around the API response for a given a url. We'll use Poison for JSON parsing.

defmodule Github.ResultStream do  
  alias Github.Gateway

  def new(url) do
    Stream.resource(
      fn -> fetch_page(url) end,
      &process_page/1,
      fn _ -> end
    )
  end

  defp fetch_page(url) do
    response = Gateway.get!(url)
    items = Poison.decode!(response.body)
    links = parse_links(response.headers["Link"])

    {items, links["next"]}
  end

  def parse_links(nil) do
    %{}
  end

  def parse_links(links_string) do
    links = String.split(links_string, ", ")

    Enum.map(links, fn link ->
      [_,name] = Regex.run(~r{rel="([a-z]+)"}, link)
      [_,url] = Regex.run(~r{<([^>]+)>}, link)
      short_url = String.replace(url, Gateway.endpoint, "")

      {name, short_url}
    end) |> Enum.into(%{})
  end

  defp process_page({nil, nil}) do
    {:halt, nil}
  end

  defp process_page({nil, next_page_url}) do
    next_page_url
    |> fetch_page
    |> process_page
  end

  defp process_page({items, next_page_url}) do
    {items, {nil, next_page_url}}
  end
end  

There's a lot of code here, so let's focus on it piece by piece.

  def new(url) do
    Stream.resource(
      fn -> fetch_page(url) end,
      &process_page/1,
      fn _ -> end
    )
  end

The new function returns a Stream built using the Stream.resource function. We provided it a "start function" that fetches the first page, a "next fuction" that processes each page and fetches a new page if necessary, and an empty "after function". Our "after function" is empty because there's nothing to clean up.

Let's look at the fetch_page next.

  defp fetch_page(url) do
    response = Gateway.get!(url)
    items = Poison.decode!(response.body)
    links = parse_links(response.headers["Link"])

    {items, links["next"]}
  end

fetch_page is our "start function". We use the HTTPoison-based gateway previously shown to fetch the url provided. We parse the JSON of the body with Poison and then parse the Link header into a map of link name => url. The details of the link parsing aren't very important.

Finally, let's look at our "next_function" process_page.

  defp process_page({nil, nil}) do
    {:halt, nil}
  end

  defp process_page({nil, next_page_url}) do
    next_page_url
    |> fetch_page
    |> process_page
  end

  defp process_page({items, next_page_url}) do
    {items, {nil, next_page_url}}
  end

The process_page function has three clauses. The clause to be executed is determined by the tuple passed to the function. This is known as pattern matching. Each clause is expected to return a tuple where the first element contains a list of items to add to the stream and the second item is the value of the accumulator for the next call to process_page. We'll look at each clause individually.

  defp process_page({nil, nil}) do
    {:halt, nil}
  end

In the first clause, we handle the case where there are no items and no next link. This means we've reached the last page and added all the items to the stream. We should stop paginating, so we return the special value :halt that tells Stream.resource our stream has ended.

  defp process_page({nil, next_page_url}) do
    next_page_url
    |> fetch_page
    |> process_page
  end

The second clause is invoked in the case when we've consumed the items on a page and there is a non-nil next page link to follow. In this case, we fetch the next page and process it.

  defp process_page({items, next_page_url}) do
    {items, {nil, next_page_url}}
  end

The third and final clause is the "normal" case because it is invoked when we have items to add to the stream. We return the items as the first element of the tuple, adding them to the stream. We then return the tuple {nil, next_page_url} as our accumulator, saying "I need more items and here's the next url to fetch".

Ok, we've looked at all the code required to paginate Github API responses! This example is a little more advanced than the others we've seen, but it is a remarkably small amount of code to provide invisible, lazy pagination of arbitrary responses from the Github API.

Finally, we'll use our Github.ResultStream module to expose a nice API to our users for fetching the repositories for an organization.

defmodule Github do  
  alias Github.ResultStream

  def repos(organization) do
    ResultStream.new("/orgs/#{organization}/repos")
  end
end  

We can use our new API to fetch the name of all the repositories for the elixir-lang organization.

"elixir-lang"
|> Github.repos
|> Stream.map(fn repo -> repo["full_name"] end)
|> Enum.take(1)

# => ["elixir-lang/elixir"]

Remind yourself when reading this code that it is lazy, meaning we only consume the first page of results to get the first repository's name.

This enumerable-like API allows our users to do familiar things with the results. For example, we could build a stream of the names of all the repositories for both the elixir-lang and tryghost organizations using Stream.flat_map.

["elixir-lang", "tryghost"]
|> Stream.flat_map(&Github.repos/1)
|> Stream.map(fn repo -> repo["full_name"] end)
|> Enum.take(1)

# => ["elixir-lang/elixir"]

Again, this is lazy and efficient in that it only paginates over the results as we need them. That's pretty amazing.

Wrap Up

I hope this gives you a sense of the power and flexibility provided by Elixir's Stream facilities. Use them in your libraries!