Skip to content

Commit

Permalink
Merge pull request #3 from tejanium/infinite-timeout
Browse files Browse the repository at this point in the history
Infinite timeout
  • Loading branch information
cjab authored Dec 3, 2018
2 parents 66ad69d + 09480b0 commit 347e48d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
7 changes: 5 additions & 2 deletions lib/weddell/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,18 @@ defmodule Weddell.Client do
host = Keyword.get(opts, :host, @default_host)
port = Keyword.get(opts, :port, @default_port)
{:ok, channel} =
Stub.connect("#{host}:#{port}", cred: cred)
Stub.connect("#{host}:#{port}", cred: cred, adapter_opts: %{
http2_opts: %{keepalive: :infinity}
})
{:ok, %__MODULE__{channel: channel,
project: project}}
end

@doc false
@spec request_opts() :: Keyword.t
def request_opts() do
def request_opts(extra_opts \\ []) do
[metadata: auth_header(), content_type: "application/grpc"]
|> Enum.concat(extra_opts)
end

@doc false
Expand Down
9 changes: 4 additions & 5 deletions lib/weddell/client/subscriber/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Weddell.Client.Subscriber.Stream do
defstruct [:client, :subscription, :grpc_stream]

@default_ack_deadline 10
@deadline_expired 4
@unavailable 14

@doc """
Open a new stream on a subscription.
Expand All @@ -37,7 +37,7 @@ defmodule Weddell.Client.Subscriber.Stream do
stream =
%__MODULE__{client: client,
subscription: subscription,
grpc_stream: Stub.streaming_pull(client.channel, Client.request_opts())}
grpc_stream: Stub.streaming_pull(client.channel, Client.request_opts(timeout: :infinity))}
request =
StreamingPullRequest.new(subscription: Util.full_subscription(client.project, subscription),
stream_ack_deadline_seconds: @default_ack_deadline)
Expand Down Expand Up @@ -134,14 +134,13 @@ defmodule Weddell.Client.Subscriber.Stream do
"""
@spec recv(stream :: t) :: Enumerable.t
def recv(stream) do
case GRPCStub.recv(stream.grpc_stream) do
case GRPCStub.recv(stream.grpc_stream, timeout: :infinity) do
{:ok, recv} ->
recv
|> Stream.map(fn
{:ok, response} ->
Enum.map(response.received_messages, &Message.new/1)
{:error, %RPCError{status: @deadline_expired}} ->
# Deadline expired and stream ended, this is expected
{:error, %RPCError{status: @unavailable}} ->
[]
{:error, e} ->
raise e
Expand Down

0 comments on commit 347e48d

Please sign in to comment.