-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: concurrent connections limiter #696
Conversation
Coverage of commit
|
Thanks for taking a look at this! question: did you consider using the existing concurrency tracking in question: rather than nesting data in Memcache, did you consider using Memcache's incr/decr commands for managing the count? That's what the existing rate limit implementation does. |
I tried this just now and it seems to only register an increment (no decrement) for streaming requests. Let me know if you think this is worth further poking 🙂
Perhaps it's overly complex but I wanted to be able to track on a per-process level vs. just a sum. Without being able to ask memcache for Do you have an opinion on switching the rate limit state over to Redis or similar in a future task? I don't think the dev time would be that high and it would give us more flexibility for shared state between the ECS tasks (mainly atomic transactions, querying against key prefixes for this particular feature). |
Decrementing for streaming processes is handled by the process monitors: api/apps/api_web/lib/request_track.ex Lines 77 to 89 in bbfa321
Can you say more about why you wanted to do that? There's a separate process for each connection, so the per-process count is either 0 or 1. |
Sure! Let's say a user has two active streaming connections, each one happening to be to a separate ECS task. These are long-running connections (spawned at application startup and in theory only recreated on crash or restart) - maybe for an application like the one that caused the issue that created this task, but implemented on the backend so that it didn't scale per client. When their app starts up, the counter would get bumped to 2 for. After n minutes / hours / etc. when the Memcache TTL hits, the entry would be cleared, and the connection(s) would still be open. I can think of a situation where someone has an app in a degraded state where it keeps opening connections after some error condition is reached without closing the old tasks. Eventually, it would be allowed to continue opening connections by the rate limiter. I could very well be missing something here, but that's the scenario I was considering. We could potentially add a loop somewhere to update a simpler memcache value that's just {counter, timestamp} whenever an outstanding connection is present, if you think that is more straightforward, but I'm not sure if you can do that with the increment / decrement commands. |
OK got it, thanks! I should be able to update to use this for detecting the streaming disconnect at the very least, which was the one event not handled by the plug. |
Coverage of commit
|
TBH, I was expecting something a little simpler, more like: # in apps/api_web/lib/api_web/plugs/request_track.ex
def call(conn, table_name) do
key = conn.assigns.api_user
RequestTrack.increment(table_name, key)
count = RequestTrack.count(table_name, key))
_ = Logger.metadata(concurrent: count)
conn = register_before_send(conn, fn conn ->
# don't decrement if we're using chunked requests (streaming)
if conn.state == :set do
RequestTrack.decrement(table_name)
end
conn
end)
if count > max_concurrent_for_user(key) do
conn
|> put_status(429)
|> put_view(ApiWeb.ErrorView)
|> render("429.json-api", [])
|> halt()
else
conn
end
end |
This would only be per ECS task, though, right? So whenever that scales so would our rate limit. I'm cool with that if you are, but I assumed we would want to have the limit apply across all of the prod deployments. |
I'd prefer to start with something simpler and expand it if necessary than start with something complex, but I'll defer to you and @bklebe on the final approach. |
@paulswartz I think we may want a meeting to discuss this — broadly I agree with you though. @nlwstein I think perhaps with streaming connections that sharing it across the cluster is not quite as important, mostly because streaming connections impose more resource consumption on the client side compared to one-shot requests. The problem with not sharing the streaming connection limit across the cluster would be that clients could cause the cluster to scale up by maxing out streaming connections. @paulswartz do you think that's acceptable, at least to start with? |
I do think that's acceptable, but happy to discuss it more! |
@paulswartz @bklebe As I mentioned on the call, I intend to move my hooks for detecting streaming open / close to the existing logger |
apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex
Outdated
Show resolved
Hide resolved
apps/api_web/lib/api_web/rate_limiter/rate_limiter_concurrent.ex
Outdated
Show resolved
Hide resolved
current_timestamp = get_current_unix_ts() | ||
heartbeat_tolerance = get_heartbeat_tolerance() | ||
{_type, key} = lookup(user, event_stream?) | ||
{:ok, locks} = GenServer.call(__MODULE__, {:memcache_get, key, %{}}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: I think that having a GenServer.call/2
here is going to mean we won't be able to have concurrent API calls: they'll be serialized as messages to this single GenServer. Could we share the pooled Memcache connections we're already using for the regular rate limiter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you saying that calling GenServer.call
makes the entire call chain synchronous, or just the writes to memcache?
I'm not entirely sure we would want asynchronous writes to memcache in this particular case due to the fact that it doesn't support transactions - this might inadvertently prevent the race mentioned earlier at least on a per-instance basis 🤔
I have some test code locally that we may be able to use to take a similar approach w/r/t memcache supervisor if this doesn't work as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider this example (using Agent, which is a GenServer under the hood):
{:ok, pid} = Agent.start_link(fn -> :ok end)
0..4
|> Enum.map(fn -> Agent.get(pid, fn -> Process.sleep(1_000) end) end)
|> Task.yield_many()
Even though the tasks are running in "parallel", only one can be executing the Agent call at once (so the last one times out). That'll be the same here: only one request will be able to access Memcached at once, so we'll naturally limit our throughput to how fast this single process can communicate with Memcached. It also doesn't prevent a race between the get
call and set
calls.
Could you use ApiWeb.RateLimiter.Memcache.Supervisor.random_child/0
to use one of the existing workers in parallel, and then have the call to Memcache happen in the calling process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, that makes sense! I have updated to share the pool with RateLimiter
.
Coverage of commit
|
Coverage of commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is out-of-date with master and needs to be updated.
Coverage of commit
|
Coverage of commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nlwstein, thanks for the PR. Would you mind cleaning up the commit history or squashing all these WIPs down if you only intend to merge one?
5f88ba4
to
f39f14d
Compare
@bklebe All set! In the future, would you prefer squash prior to review? I had been following a pattern of squashing prior to merge. I have no strong opinion, though if I had to pick one, I would go with squash prior to merge because it can be helpful to step through intermediate commits to see someone's thought process during code review. |
Coverage of commit
|
Coverage of commit
|
…al blocking when limit reached mode
Coverage of commit
|
Coverage of commit
|
Coverage of commit
|
end | ||
|
||
defp lookup(%ApiWeb.User{} = user, event_stream?) do | ||
type = if event_stream?, do: "event_stream", else: "static" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: it doesn't look like type
is ever used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
Coverage of commit
|
Coverage of commit
|
GitHub wackiness, already addressed!
* Revert "Revert "feat: concurrent connections limiter (#696)" (#750)" This reverts commit 1598989. * fix: don't override connection opts * fix: only build memcache config if memcache is actually required * fix: keep a consistent connection_opts throughout various environments * chore: resolve credo complaint --------- Co-authored-by: Paul Swartz <paul@paulswartz.net>
Summary of changes
Asana Ticket: 🍎 Reject API requests when there are too many concurrent requests
This provides a new rate limiter that is mainly useful for clamping down on too many active streaming connections. It is also able to track and control long-running "static" requests (for example, trying to load all of the predictions at once). It is configurable on a per-user basis in the admin panel (
-1
to disable). The largest value between user config and the base config is used.The static concurrency locks are released in real-time, whereas the streaming ones depend on the hibernate loop cycle, so there can be a latency of under a minute for releasing those. The limits I have added to the config file are guidance for testing, not necessarily for production.
Opinion
: I am not sure ifmemcached
is the ideal tool for this, but I used it because I don't want to introduce new infrastructure if possible and it was already in-use for a similar job. Changing out the storage mechanism in the future should be relatively easy.The whole rate limiting system seems like a good use case for Redis (or perhaps there's something new that's even better), primarily because: