Server-Sent Events (SSE) with Elixir

Server-Sent Events is an efficient way to stream data from a web server to client, supported by all modern browsers. It is a great fit for many use cases where one-way communication is sufficient, and is much simpler and has less overhead than web sockets: any sort of status updates, streaming quotes etc. This post will show how to use SSE with Elixir-based Phoenix Web Server.

Demo

I will demo an app with a single web page streaming server’s current time. While this is a simplistic use case, the demo uses proper plumbing suitable for a more realistic scenario which could, for instance, be some sort of a data ingestion pipeline using Broadway, sending thousands of messages per second to thousands of clients. Full source code can be found here. I am using Elixir 1.11.2 and Phoenix 1.5.7 for this demo.

Generate a new app:

$ mix phx.new sse_demo --no-ecto --install

Add a dependency in mix.exs and run mix deps.get:

{:sse_phoenix_pubsub, "~> 1.0"}

Package sse_phoenix_pubsub provides a bridge between any sort of custom event generator and a streaming http endpoint and utilizes highly efficient Phoenix.PubSub library that is already used for Phoenix channel communication.

Sending Server-Sent Events

lib/sse_demo/time_events_generator.ex

defmodule SseDemo.TimeEventsGenerator do
  use GenServer
  require Logger
  
  alias Phoenix.PubSub

  @default_interval 1_000

  def start_link(opts) do
    pubsub_name = Keyword.fetch!(opts, :pubsub_name)
    topic_name = Keyword.fetch!(opts, :topic_name)
    interval = Keyword.get(opts, :interval, @default_interval)

    GenServer.start_link(__MODULE__, {pubsub_name, topic_name, interval})
  end

  def init({pubsub_name, topic_name, interval}) do
    Process.send_after(self(), :send_time_event, interval)

    {:ok, %{pubsub_name: pubsub_name, topic_name: topic_name, interval: interval, last_run_at: nil}}
  end

  def handle_info(:send_time_event, %{pubsub_name: pubsub_name, topic_name: topic_name, interval: interval} = state) do
    message = Time.utc_now() |> Time.to_string
    PubSub.broadcast(pubsub_name, topic_name, {pubsub_name, message})
    Logger.debug(fn -> "Broadcast to topic #{topic_name}, message: #{message}" end)

    Process.send_after(self(), :send_time_event, interval)
    {:noreply, %{state | last_run_at: :calendar.local_time()}}
  end
end

It is a basic GenServer-based module, which takes some options on startup and periodically sends current time to SSE clients. The event generator is launched on the application startup:

--- a/lib/sse_demo/application.ex
+++ b/lib/sse_demo/application.ex
@@ -12,7 +12,9 @@ defmodule SseDemo.Application do
       # Start the PubSub system
       {Phoenix.PubSub, name: SseDemo.PubSub},
       # Start the Endpoint (http/https)
-      SseDemoWeb.Endpoint
+      SseDemoWeb.Endpoint,
+      # Start Time Events Generator
+      {SseDemo.TimeEventsGenerator, [pubsub_name: SseDemo.PubSub, topic_name: "time"]},
       # Start a worker by calling: SseDemo.Worker.start_link(arg)
       # {SseDemo.Worker, arg}

You can configure SSE clients to send a list of desired topics to subscribe to, and event generators likewise choose which topics to publish their events on, and Phoenix Pubsub will only route messages to subscribers that are subscribed to the topic the messages is being published to. Very efficient.

Receiving Server-Sent Events

I will be using the same endpoint for SSE for this demo but this certainly doesn’t have to be the case. Make sure the SSE endpoint is configured with a high value for timeout, for example 1 hour as below:

--- a/config/dev.exs
+++ b/config/dev.exs
@@ -7,7 +7,12 @@ use Mix.Config
 # watchers to your application. For example, we use it
 # with webpack to recompile .js and .css sources.
 config :sse_demo, SseDemoWeb.Endpoint,
-  http: [port: 4000],
+  http: [
+    port: 4000,
+    protocol_options: [
+      idle_timeout: 3_600_000
+    ]
+  ],
   debug_errors: true,
   code_reloader: true,
   check_origin: false,

A nice benefit of using SSE is that the browser will automatically reconnect in case of a dropped connection.

lib/sse_demo_web/controllers/sse_controller.ex

defmodule SseDemoWeb.SseController do
  use SseDemoWeb, :controller
  require Logger

  def subscribe(conn, params) do
    case get_topics(params) do
      topics when is_list(topics) ->
        Logger.debug(fn -> "Subscribed to topics #{inspect(topics)}" end)
        SsePhoenixPubsub.stream(conn, {SseDemo.PubSub, topics})
      _ ->
        Logger.error("No topics provided")
    end
  end

  defp get_topics(params) do
    case params["topics"] do
      str when is_binary(str) -> String.split(str, ",")
      nil -> []
    end
  end
end

lib/sse_demo_web/router.ex

--- a/lib/sse_demo_web/router.ex
+++ b/lib/sse_demo_web/router.ex
@@ -19,6 +19,17 @@ defmodule SseDemoWeb.Router do
     get "/", PageController, :index
   end
 
+  pipeline :sse do
+    plug :put_format, "text/event-stream"
+    plug :fetch_session
+  end
+
+  scope "/sse", SseDemoWeb do
+    pipe_through :sse
+
+    get "/", SseController, :subscribe
+  end
+
   # Other scopes may use custom stacks.
   # scope "/api", SseDemoWeb do
   #   pipe_through :api

Restart server and test SSE stream

$ curl http://localhost:4000/sse?topics=time
retry: 2000

data: 05:44:05.849153

data: 05:44:06.850070

data: 05:44:07.851028

^C

At this point the server is fully functional, lets add a simple web page.

Web Page Integration

Add a small javascript library to set up SSE - I will just keep it in static assets for this demo.

assets/static/js/sse.js

function ready() {
  if (!!window.EventSource) {
    setupEventSource();
  } else {
    document.getElementById('status').innerHTML = "Your browser doesn't support the EventSource API";
  }
}

function setupEventSource() {
  var source = new EventSource('/sse?topics=time');

  source.addEventListener('message', function(event) {
    updateStatus("Server sent: '" + event.data + "'");
  }, false);
  source.addEventListener('open', function(event) {
    updateStatus("Eventsource connected.");
  }, false);
  source.addEventListener('error', function(event) {
    if (event.eventPhase == EventSource.CLOSED) {
      updateStatus("Eventsource is closed.");
    }
  }, false);
}

function updateStatus(status) {
  var date = new Date;
  document.getElementById('status').innerHTML = status + "<br/>";
}

Add a reference to the javascript file in the application layout, as well a div element for placement and a body onload attribute.

lib/sse_demo_web/templates/layout/app.html.eex

-- a/lib/sse_demo_web/templates/layout/app.html.eex
+++ b/lib/sse_demo_web/templates/layout/app.html.eex
@@ -7,8 +7,9 @@
     <title>SseDemo ยท Phoenix Framework</title>
     <link rel="stylesheet" href="<%= Routes.static_path(@conn, "/css/app.css") %>"/>
     <script defer type="text/javascript" src="<%= Routes.static_path(@conn, "/js/app.js") %>"></script>
+    <script type="text/javascript" src="<%= Routes.static_path(@conn, "/js/sse.js") %>"></script>
   </head>
-  <body>
+  <body onload="ready();">
     <header>
       <section class="container">
         <nav role="navigation">
@@ -27,6 +28,7 @@
     <main role="main" class="container">
       <p class="alert alert-info" role="alert"><%= get_flash(@conn, :info) %></p>
       <p class="alert alert-danger" role="alert"><%= get_flash(@conn, :error) %></p>
+      <div id="status"></div>
       <%= @inner_content %>
     </main>
   </body>

I also simplified the index page (not material to this demo), and here is the how the time stream shows up on the web page (http://localhost:4000/):

The SSE stream is updating about every second. Just to show that we can, we can just as easily stream these updates once every millisecond - change the interval option value to 1ms in lib/sse_demo/application.ex:

{SseDemo.TimeEventsGenerator, [pubsub_name: SseDemo.PubSub, topic_name: "time", interval: 1]},

Hopefully this was enough to convince you to give Server-Sent Events a chance on your project; there are many scenarios where SSE is a great fit, without the overhead of web sockets.