How to use Google Pubsub Emulator with Elixir

Google Pub/Sub is a fully-managed real-time messaging service that is part of gcloud. Google Cloud SDK comes with a pubsub emulator, which is very handy to use for local development and testing, and with most clients it’s very easy to switch between the Google service and the emulator, e.g. with ruby’s google-cloud-pubsub gem all you have to do is set the PUBSUB_EMULATOR_HOST environment variable that points to your local host/port, for example localhost:8918.

Elixir has two clients to communicate with Google Pub/Sub: Kane and google_api_pub_sub (auto-generated from protocol definition) while the Goth package is responsible for authentication, and there doesn’t seem to be a built-in support for the emulator.

Regular (non-emulator) connection configuration

Using the google_api_pub_sub package for examples, here is how one could configure and get a topic info from pubsub.

Mix.exs:

    [
      {:google_api_pub_sub, "~> 0.29"},
      {:goth, "~> 1.1.0"}
    ]`

Environment variable pointing to Gcloud credentials file:

export GOOGLE_APPLICATION_CREDENTIALS="/var/secrets/google-api-key.son"

Code:

defmodule Pubsub.Client do
  @scope "https://www.googleapis.com/auth/cloud-platform"
  alias GoogleApi.PubSub.V1.{Api.Projects, Connection}

  def get_topics(project_id) do
    response = Projects.pubsub_projects_topics_list(
      connection(),
      project_id
    )

    case response do
      {:ok, topics} -> topics
      {:error, _} -> nil
    end
  end

  defp connection do
    {:ok, token} = Goth.Token.for_scope(@scope)
     Connection.new(token.token)
  end
end

project_id = "my-google-project"
topics = Pubsub.Client.get_topics(project_id)

How to connect to PubSub emulator

Assuming you started the emulator on localhost:9818:

gcloud beta emulators pubsub start --host-port=localhost:9818

Set the environment variable to point to the emulator endpoint:

export PUBSUB_EMULATOR_HOST="localhost:9818"

Add this to environment configuration file, e.g. dev.exs:

config :goth, json: nil, project_id: "emulator-project-id"

And add some runtime initialization, like so:

defmodule Pubsub.Emulator do
  def detect() do
    emulator_host = System.get_env("PUBSUB_EMULATOR_HOST")

    unless is_nil(emulator_host) do
      Application.put_env(:google_api_pub_sub, :base_url, "http://" <> emulator_host)
    end
  end
end

Pubsub.Emulator.detect()

The above code will work fine with both emulator and real setup, just make sure the environment variable and goth config are setup correctly for both cases.

When using Broadway

If you’ve seen the light and are using the excellent Broadway package for talking to Google PubSub, then the setup is slightly different but no more complicated. Here is a regular (non-emulator) setup.

Set the google credentials via the environment, just as above:

export GOOGLE_APPLICATION_CREDENTIALS="/var/secrets/google-api-key.son"

And then setup somewhere in the app initialization, e.g.:

defmodule Pubsub.Client do
  use Broadway
  
  def start_link({%{project_id: project_id, subscription: subscription}}) do
    Broadway.start_link(MyBroadway,
      name: MyBroadway,
      producer: [
        module: {BroadwayCloudPubSub.Producer,
          subscription: "projects/#{project_id}/subscriptions/#{subscription}"
        }
      ]
    )
  end
end

Broadway uses the google_api_pub_sub package underneath.

To be able to use the emulator with Broadway, we need to provide a dummy token generator, and change the base url according to the environment variable PUBSUB_EMULATOR_HOST:

export PUBSUB_EMULATOR_HOST="localhost:9818"

config/dev.exs:

config :goth, json: nil, project_id: "emulator-project-id"

The following code detects whether it needs to connect to the emulator or the google real PubSub:


defmodule Pubsub.Client do
  use Broadway
  
  @emulator_project_id "emulator-project-id"
  @emulator_host_env "PUBSUB_EMULATOR_HOST"

  def start_link(args = %{project_id: project_id, subscription_name: subscription_name}) do
    setup_base_url(args)
    
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          BroadwayCloudPubSub.Producer,
          producer_options(args)
        }
      ],
      processors: [
        default: []
      ]
    )
  end
  
  def emulator do
    {:ok, "dummy token"}
  end

  defp setup_base_url(%{project_id: @emulator_project_id}) do
    Application.put_env(
      :google_api_pub_sub,
      :base_url,
      "http://" <> System.get_env(@emulator_host_env)
    )
  end
  defp setup_base_url(_), do: nil
  
  defp producer_options(%{project_id: project_id, subscription_name: subscription_name}) do
    options =
      case project_id do
        @emulator_project_id ->
          [
            token_generator: {__MODULE__, :emulator, []}
          ]

        _ ->
          []
      end

    options ++
      [subscription: "projects/#{project_id}/subscriptions/#{subscription_name}"]
  end
end 

Finally, set the configs to configure the PubSub client (production config will probably read in environment variables) via System.get_env:

config :app, :pubsub_client,
  project_id: "emulator-project-id",
  subscription_name: "my-subscription"

And add a supervised PubSub client to start up (application.ex):

  def start(_type, _args) do
    children = [
      ...
      {Pubsub.Client, Enum.into(Application.fetch_env!(:app, :pubsub_client), %{})}
    ]

At this point the code will work with both emulator and regular PubSub setups.

Caution: some versions of the emulator don’t seem to work with http

Google PubSub supports grpc and http protocols, and it appears the http side gets a bit less love than the grpc side, and I’ve seen versions of the emulator when any http request will result in the Internal error message, while grpc-based clients keep working just fine. Elixir clients only support http at the point of writing, so this is something to be aware of. If you happen to come across a broken version of the emulator (can be easily confirmed via curl -X GET "http://localhost:9818/v1/projects/emulator-project-id/topics" - this should return valid JSON), feel free to download this working jar and replace the one provided with your Google SDK.