Google Pubsub gRPC-based client

Google Pubsub supports both REST and gRPC endpoints, and the latter is clearly the winner in terms of performance. For instance, it takes about 1 ms to receive a REST message with the pubsub emulator on my laptop but I can get out 50 of them via the gRPC endpoint in the same single millisecond (using HTTP 2 streaming over the same TCP connection of course so not a completely fair comparison but at the same time it’s not like I am forcing the REST endpoint to stick with HTTP 1.1).

Currently (January of 2021) all Elixir Google pubsub clients (kane, google_api_pub_sub) only support REST, but we already have an excellent elixir-grpc package and it’s easy to use it for communication to Google Pubsub. I added a thin layer on top of it to provide some extra niceties like emulator support, and released as a hex package, google_pubsub_grpc. It’s a beta package for now, - once the grpc package is “properly” released and loses its beta version, this one will follow.

Installation

Add dependencies to mix.exs:

        {:goth, "~> 1.2.0"},
        {:cowlib, "~> 2.9.0", override: true},
        {:google_protos, "~> 0.1.0"},
        {:google_pubsub_grpc, "~> 0.2.1-beta.1"}

Configuration

For emulator, set environment variable PUBSUB_EMULATOR_HOST to the emulator (e.g. 0.0.0.0:8786) and set the project id in Goth’s config:

  config :goth, json: nil, project_id: "emulator-project-id"

For google service, configure Goth accordingly, for instance by simply setting the GOOGLE_APPLICATION_CREDENTIALS environment variable to point to the json key file.

Sample code

To get a channel (connection to server):

iex(1)> {:ok, channel} = Google.Pubsub.GRPC.channel()
{:ok,
 %GRPC.Channel{
   accepted_compressors: [],
   adapter: GRPC.Adapter.Gun,
   adapter_payload: %{conn_pid: #PID<0.279.0>},
   codec: GRPC.Codec.Proto,
   compressor: nil,
   cred: nil,
   headers: [authorization: "Dummy token"],
   host: "0.0.0.0",
   interceptors: [],
   port: 8786,
   scheme: "http"
 }}

To get the list of all topics:

iex(5)> request = Google.Pubsub.V1.ListTopicsRequest.new(project: Google.Pubsub.GRPC.full_project_id())
%Google.Pubsub.V1.ListTopicsRequest{
  page_size: 0,
  page_token: "",
  project: "projects/emulator-project-id"
}
iex(6)> {:ok, response} = channel |> Google.Pubsub.V1.Publisher.Stub.list_topics(request)
{:ok,
 %Google.Pubsub.V1.ListTopicsResponse{
   next_page_token: "",
   topics: [
     %Google.Pubsub.V1.Topic{
       kms_key_name: "",
       labels: %{},
       message_storage_policy: nil,
       name: "projects/emulator-project-id/topics/actions"
     },
     %Google.Pubsub.V1.Topic{
       kms_key_name: "",
       labels: %{},
       message_storage_policy: nil,
       name: "projects/emulator-project-id/topics/messages"
     }
   ]
 }}

To create a new subscription on a topic:

iex(8)> {:ok, subscription} =                                                                          
...(8)>   channel |> Google.Pubsub.V1.Subscriber.Stub.create_subscription(%Google.Pubsub.V1.Subscription{
...(8)>     topic: Google.Pubsub.GRPC.full_topic_name("my-topic"),
...(8)>     name: Google.Pubsub.GRPC.full_subscription_name("my-subscription")
...(8)>   })
{:ok,
 %Google.Pubsub.V1.Subscription{
   ack_deadline_seconds: 10,
   dead_letter_policy: nil,
   detached: false,
   enable_message_ordering: false, 
   expiration_policy: nil,
   filter: "",
   labels: %{},
   message_retention_duration: %Google.Protobuf.Duration{
     nanos: 0,
     seconds: 604800
   },
   name: "projects/emulator-project-id/subscriptions/my-subscription",
   push_config: %Google.Pubsub.V1.PushConfig{
     attributes: %{},
     authentication_method: nil,
     push_endpoint: ""
   },
   retain_acked_messages: false,
   retry_policy: nil,
   topic: "projects/emulator-project-id/topics/my-topic"
 }}

To publish a message:

iex(10)> message = %Google.Pubsub.V1.PubsubMessage{data: "string"}
%Google.Pubsub.V1.PubsubMessage{
  attributes: nil,
  data: "string",
  message_id: nil,
  ordering_key: nil,
  publish_time: nil
}
iex(12)> request = %Google.Pubsub.V1.PublishRequest{
...(12)>   topic: Google.Pubsub.GRPC.full_topic_name("my-topic"),
...(12)>   messages: [message]
...(12)> }
%Google.Pubsub.V1.PublishRequest{
  messages: [
    %Google.Pubsub.V1.PubsubMessage{
      attributes: nil,
      data: "string",
      message_id: nil,
      ordering_key: nil,
      publish_time: nil
    }
  ],
  topic: "projects/emulator-project-id/topics/my-topic"
}
iex(14)> {:ok, response} = channel |> Google.Pubsub.V1.Publisher.Stub.publish(request)
{:ok, %Google.Pubsub.V1.PublishResponse{message_ids: ["1"]}}

To pull messages from a subscription:

iex(16)> request = %Google.Pubsub.V1.PullRequest{
...(16)>   subscription: Google.Pubsub.GRPC.full_subscription_name("my-subscription"),
...(16)>   max_messages: 10
...(16)> }
%Google.Pubsub.V1.PullRequest{
  max_messages: 10,
  return_immediately: nil,
  subscription: "projects/emulator-project-id/subscriptions/my-subscription"
}
iex(17)> {:ok, response} = channel |> Google.Pubsub.V1.Subscriber.Stub.pull(request)
{:ok,
 %Google.Pubsub.V1.PullResponse{
   received_messages: [
     %Google.Pubsub.V1.ReceivedMessage{
       ack_id: "projects/emulator-project-id/subscriptions/my-subscription:1",
       delivery_attempt: 0,
       message: %Google.Pubsub.V1.PubsubMessage{
         attributes: %{},
         data: "string", 
         message_id: "1",
         ordering_key: "",
         publish_time: %Google.Protobuf.Timestamp{nanos: 0, seconds: 1610913863}
       }
     }
   ]
 }}

To acknowledge received messages:

iex(20)> request = %Google.Pubsub.V1.AcknowledgeRequest{
...(20)>   subscription: Google.Pubsub.GRPC.full_subscription_name("my-subscription"),
...(20)>   ack_ids: Enum.map(response.received_messages, fn m -> m.ack_id end)
...(20)> }
%Google.Pubsub.V1.AcknowledgeRequest{
  ack_ids: ["projects/emulator-project-id/subscriptions/my-subscription:1"],
  subscription: "projects/emulator-project-id/subscriptions/my-subscription"
}
iex(21)> {:ok, response} = channel |> Google.Pubsub.V1.Subscriber.Stub.acknowledge(request)
{:ok, %Google.Protobuf.Empty{}}

More usage examples on https://github.com/vjebelev/google_pubsub_grpc. Will update the broadway google adapter to work via gRPC as well, as time allows.