on
Google Pubsub gRPC-based client
Google Pubsub supports both REST and gRPC endpoints, and the latter is clearly the winner in terms of performance. For instance, it takes about 1 ms to receive a REST message with the pubsub emulator on my laptop but I can get out 50 of them via the gRPC endpoint in the same single millisecond (using HTTP 2 streaming over the same TCP connection of course so not a completely fair comparison but at the same time it’s not like I am forcing the REST endpoint to stick with HTTP 1.1).
Currently (January of 2021) all Elixir Google pubsub clients (kane, google_api_pub_sub) only support REST, but we already have an excellent elixir-grpc package and it’s easy to use it for communication to Google Pubsub. I added a thin layer on top of it to provide some extra niceties like emulator support, and released as a hex package, google_pubsub_grpc. It’s a beta package for now, - once the grpc package is “properly” released and loses its beta version, this one will follow.
Installation
Add dependencies to mix.exs:
{:goth, "~> 1.2.0"},
{:cowlib, "~> 2.9.0", override: true},
{:google_protos, "~> 0.1.0"},
{:google_pubsub_grpc, "~> 0.2.1-beta.1"}
Configuration
For emulator, set environment variable PUBSUB_EMULATOR_HOST
to the emulator (e.g. 0.0.0.0:8786
) and set the project id in Goth’s config:
config :goth, json: nil, project_id: "emulator-project-id"
For google service, configure Goth accordingly, for instance by simply setting the GOOGLE_APPLICATION_CREDENTIALS
environment variable to point to the json key file.
Sample code
To get a channel (connection to server):
iex(1)> {:ok, channel} = Google.Pubsub.GRPC.channel()
{:ok,
%GRPC.Channel{
accepted_compressors: [],
adapter: GRPC.Adapter.Gun,
adapter_payload: %{conn_pid: #PID<0.279.0>},
codec: GRPC.Codec.Proto,
compressor: nil,
cred: nil,
headers: [authorization: "Dummy token"],
host: "0.0.0.0",
interceptors: [],
port: 8786,
scheme: "http"
}}
To get the list of all topics:
iex(5)> request = Google.Pubsub.V1.ListTopicsRequest.new(project: Google.Pubsub.GRPC.full_project_id())
%Google.Pubsub.V1.ListTopicsRequest{
page_size: 0,
page_token: "",
project: "projects/emulator-project-id"
}
iex(6)> {:ok, response} = channel |> Google.Pubsub.V1.Publisher.Stub.list_topics(request)
{:ok,
%Google.Pubsub.V1.ListTopicsResponse{
next_page_token: "",
topics: [
%Google.Pubsub.V1.Topic{
kms_key_name: "",
labels: %{},
message_storage_policy: nil,
name: "projects/emulator-project-id/topics/actions"
},
%Google.Pubsub.V1.Topic{
kms_key_name: "",
labels: %{},
message_storage_policy: nil,
name: "projects/emulator-project-id/topics/messages"
}
]
}}
To create a new subscription on a topic:
iex(8)> {:ok, subscription} =
...(8)> channel |> Google.Pubsub.V1.Subscriber.Stub.create_subscription(%Google.Pubsub.V1.Subscription{
...(8)> topic: Google.Pubsub.GRPC.full_topic_name("my-topic"),
...(8)> name: Google.Pubsub.GRPC.full_subscription_name("my-subscription")
...(8)> })
{:ok,
%Google.Pubsub.V1.Subscription{
ack_deadline_seconds: 10,
dead_letter_policy: nil,
detached: false,
enable_message_ordering: false,
expiration_policy: nil,
filter: "",
labels: %{},
message_retention_duration: %Google.Protobuf.Duration{
nanos: 0,
seconds: 604800
},
name: "projects/emulator-project-id/subscriptions/my-subscription",
push_config: %Google.Pubsub.V1.PushConfig{
attributes: %{},
authentication_method: nil,
push_endpoint: ""
},
retain_acked_messages: false,
retry_policy: nil,
topic: "projects/emulator-project-id/topics/my-topic"
}}
To publish a message:
iex(10)> message = %Google.Pubsub.V1.PubsubMessage{data: "string"}
%Google.Pubsub.V1.PubsubMessage{
attributes: nil,
data: "string",
message_id: nil,
ordering_key: nil,
publish_time: nil
}
iex(12)> request = %Google.Pubsub.V1.PublishRequest{
...(12)> topic: Google.Pubsub.GRPC.full_topic_name("my-topic"),
...(12)> messages: [message]
...(12)> }
%Google.Pubsub.V1.PublishRequest{
messages: [
%Google.Pubsub.V1.PubsubMessage{
attributes: nil,
data: "string",
message_id: nil,
ordering_key: nil,
publish_time: nil
}
],
topic: "projects/emulator-project-id/topics/my-topic"
}
iex(14)> {:ok, response} = channel |> Google.Pubsub.V1.Publisher.Stub.publish(request)
{:ok, %Google.Pubsub.V1.PublishResponse{message_ids: ["1"]}}
To pull messages from a subscription:
iex(16)> request = %Google.Pubsub.V1.PullRequest{
...(16)> subscription: Google.Pubsub.GRPC.full_subscription_name("my-subscription"),
...(16)> max_messages: 10
...(16)> }
%Google.Pubsub.V1.PullRequest{
max_messages: 10,
return_immediately: nil,
subscription: "projects/emulator-project-id/subscriptions/my-subscription"
}
iex(17)> {:ok, response} = channel |> Google.Pubsub.V1.Subscriber.Stub.pull(request)
{:ok,
%Google.Pubsub.V1.PullResponse{
received_messages: [
%Google.Pubsub.V1.ReceivedMessage{
ack_id: "projects/emulator-project-id/subscriptions/my-subscription:1",
delivery_attempt: 0,
message: %Google.Pubsub.V1.PubsubMessage{
attributes: %{},
data: "string",
message_id: "1",
ordering_key: "",
publish_time: %Google.Protobuf.Timestamp{nanos: 0, seconds: 1610913863}
}
}
]
}}
To acknowledge received messages:
iex(20)> request = %Google.Pubsub.V1.AcknowledgeRequest{
...(20)> subscription: Google.Pubsub.GRPC.full_subscription_name("my-subscription"),
...(20)> ack_ids: Enum.map(response.received_messages, fn m -> m.ack_id end)
...(20)> }
%Google.Pubsub.V1.AcknowledgeRequest{
ack_ids: ["projects/emulator-project-id/subscriptions/my-subscription:1"],
subscription: "projects/emulator-project-id/subscriptions/my-subscription"
}
iex(21)> {:ok, response} = channel |> Google.Pubsub.V1.Subscriber.Stub.acknowledge(request)
{:ok, %Google.Protobuf.Empty{}}
More usage examples on https://github.com/vjebelev/google_pubsub_grpc. Will update the broadway google adapter to work via gRPC as well, as time allows.