Skip to content

Commit

Permalink
Added in new leaky bucket rate limiting functionality + unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Yang committed Dec 10, 2023
1 parent c104a40 commit 2c4540f
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 13 deletions.
9 changes: 6 additions & 3 deletions hyx/ratelimit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from hyx.ratelimit.api import ratelimiter, tokenbucket
from hyx.ratelimit.buckets import TokenBucket
from hyx.ratelimit.managers import TokenBucketLimiter
from hyx.ratelimit.api import leakybucket, ratelimiter, tokenbucket
from hyx.ratelimit.buckets import LeakyBucket, TokenBucket
from hyx.ratelimit.managers import LeakyBucketLimiter, TokenBucketLimiter

__all__ = (
"leakybucket",
"LeakyBucketLimiter",
"LeakyBucket",
"ratelimiter",
"tokenbucket",
"TokenBucketLimiter",
Expand Down
47 changes: 46 additions & 1 deletion hyx/ratelimit/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from types import TracebackType
from typing import Any, Optional, Type, cast

from hyx.ratelimit.managers import RateLimiter, TokenBucketLimiter
from hyx.ratelimit.managers import LeakyBucketLimiter, RateLimiter, TokenBucketLimiter
from hyx.typing import FuncT


Expand Down Expand Up @@ -93,3 +93,48 @@ async def _wrapper(*args: Any, **kwargs: Any) -> Any:
_wrapper._manager = self._limiter # type: ignore[attr-defined]

return cast(FuncT, _wrapper)


class leakybucket:
"""
Constant Rate Limiting based on the Leaky Bucket algorithm.
**Parameters**
* **max_executions** *(float)* - How many executions are permitted?
* **per_time_secs** *(float)* - Per what time span? (in seconds)
"""

__slots__ = ("_limiter",)

def __init__(self, max_executions: float, per_time_secs: float, bucket_size: Optional[float] = None) -> None:
self._limiter = LeakyBucketLimiter(max_executions=max_executions, per_time_secs=per_time_secs)

async def __aenter__(self) -> "leakybucket":
await self._limiter.acquire()

return self

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> Optional[bool]:
return None

def __call__(self, func: FuncT) -> FuncT:
"""
Apply ratelimiter as a decorator
"""

@functools.wraps(func)
async def _wrapper(*args: Any, **kwargs: Any) -> Any:
await self._limiter.acquire()

return await func(*args, **kwargs)

_wrapper._original = func # type: ignore[attr-defined]
_wrapper._manager = self._limiter # type: ignore[attr-defined]

return cast(FuncT, _wrapper)
55 changes: 54 additions & 1 deletion hyx/ratelimit/buckets.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import asyncio
import math
from typing import Optional

from hyx.ratelimit.exceptions import EmptyBucket
from hyx.ratelimit.exceptions import EmptyBucket, FilledBucket


class TokenBucket:
Expand Down Expand Up @@ -82,3 +83,55 @@ def _replenish(self) -> None:
)
self._tokens = tokens_to_add
return


class LeakyBucket:
"""
Leaky Bucket Logic
Leak tokens as time passes on. If there is space in the bucket, executions can be allowed.
Otherwise, it's going to be rejected with an FilledBucket error
"""

__slots__ = (
"_max_executions",
"_per_time_secs",
"_rate",
"_loop",
"_tokens",
"_last_bucket_check",
)

def __init__(self, max_executions: float, per_time_secs: float) -> None:
self._max_executions = max_executions
self._per_time_secs = per_time_secs
self._rate = self._max_executions / self._per_time_secs

self._loop = asyncio.get_running_loop()
self._tokens = 0.0

self._last_bucket_check = self._loop.time()

@property
def tokens(self) -> float:
self._leak_check()
return self._tokens

@property
def full(self) -> bool:
self._leak_check()
return math.ceil(self._tokens) >= self._max_executions

async def fill(self) -> None:
self._leak_check()
if self._tokens + 1 <= self._max_executions:
self._tokens += 1
return
else:
raise FilledBucket

def _leak_check(self) -> None:
now = self._loop.time()
time_elapsed = now - self._last_bucket_check
self._tokens = max(0, self._tokens - time_elapsed * self._rate)
self._last_bucket_check = now
return
12 changes: 10 additions & 2 deletions hyx/ratelimit/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@

class RateLimitExceeded(HyxError):
"""
Occurs when requester have exceeded the rate limit
Occurs when requester has exceeded the rate limit
"""


class EmptyBucket(HyxError):
"""
Occurs when requester have exceeded the rate limit
Occurs when requester has exceeded the rate limit and the bucket is empty
Exception is thrown in the token bucket rate limiter
"""


class FilledBucket(HyxError):
"""
Occurs when requester has exceeded the rate limit and the bucket is full
Exception is thrown in the leaky bucket rate limiter
"""
19 changes: 17 additions & 2 deletions hyx/ratelimit/managers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Optional

from hyx.ratelimit.buckets import TokenBucket
from hyx.ratelimit.exceptions import EmptyBucket, RateLimitExceeded
from hyx.ratelimit.buckets import LeakyBucket, TokenBucket
from hyx.ratelimit.exceptions import EmptyBucket, FilledBucket, RateLimitExceeded


class RateLimiter:
Expand Down Expand Up @@ -38,3 +38,18 @@ def __init__(self) -> None:

async def acquire(self) -> None:
...


class LeakyBucketLimiter(RateLimiter):
def __init__(self, max_executions: float, per_time_secs: float) -> None:
self._leaky_bucket = LeakyBucket(max_executions, per_time_secs)

@property
def bucket(self) -> LeakyBucket:
return self._leaky_bucket

async def acquire(self) -> None:
try:
await self._leaky_bucket.fill()
except FilledBucket as e:
raise RateLimitExceeded from e
47 changes: 45 additions & 2 deletions tests/test_ratelimiter/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

from hyx.ratelimit import TokenBucketLimiter, ratelimiter, tokenbucket
from hyx.ratelimit import LeakyBucketLimiter, TokenBucketLimiter, leakybucket, ratelimiter, tokenbucket
from hyx.ratelimit.exceptions import RateLimitExceeded


Expand All @@ -24,6 +24,15 @@ async def calc() -> float:
assert await calc() == 42


async def test__ratelimiter__leaky_bucket_decorator() -> None:
@leakybucket(max_executions=4, per_time_secs=1, bucket_size=4)
async def calc() -> float:
return 42

for _ in range(4):
assert await calc() == 42


async def test__ratelimiter__context_manager() -> None:
limiter = ratelimiter(limiter=TokenBucketLimiter(max_executions=4, per_time_secs=1, bucket_size=4))

Expand All @@ -40,7 +49,27 @@ async def test__ratelimiter__token_bucket_context_manager() -> None:
assert True


async def test__ratelimiter__limit_exceeded() -> None:
async def test__ratelimiter__leaky_bucket_context_manager() -> None:
leakybucket(max_executions=4, per_time_secs=1, bucket_size=4)

async def calc() -> float:
return 42

for _ in range(4):
assert await calc() == 42


async def test__ratelimiter__leaky_bucket_limit_exceeded() -> None:
@ratelimiter(limiter=LeakyBucketLimiter(max_executions=3, per_time_secs=1))
async def calc() -> float:
return 42

with pytest.raises(RateLimitExceeded):
for _ in range(4):
assert await calc() == 42


async def test__ratelimiter__token_bucket_limit_exceeded() -> None:
@ratelimiter(limiter=TokenBucketLimiter(max_executions=3, per_time_secs=1, bucket_size=3))
async def calc() -> float:
return 42
Expand Down Expand Up @@ -75,3 +104,17 @@ async def calc() -> float:

for _ in range(3):
assert await calc() == 42


async def test__ratelimiter__token_bucket_leak_after_time_period() -> None:
@ratelimiter(limiter=LeakyBucketLimiter(max_executions=3, per_time_secs=1))
async def calc() -> float:
return 42

for _ in range(3):
assert await calc() == 42

await asyncio.sleep(1)

for _ in range(3):
assert await calc() == 42
33 changes: 31 additions & 2 deletions tests/test_ratelimiter/test_buckets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import pytest

from hyx.ratelimit.buckets import TokenBucket
from hyx.ratelimit.exceptions import EmptyBucket
from hyx.ratelimit.buckets import LeakyBucket, TokenBucket
from hyx.ratelimit.exceptions import EmptyBucket, FilledBucket


async def test__token_bucket_success() -> None:
Expand Down Expand Up @@ -33,3 +33,32 @@ async def test__token_bucket__fully_replenish_after_time_period() -> None:

assert bucket.tokens == 3
assert bucket.empty is False


async def test__leaky_bucket_success() -> None:
bucket = LeakyBucket(3, 1)

for i in range(3):
assert round(bucket.tokens) == i
await bucket.fill()
assert bucket.full is True


async def test__leaky_bucket_limit_exceeded() -> None:
bucket = LeakyBucket(3, 1)

with pytest.raises(FilledBucket):
for _ in range(4):
await bucket.fill()


async def test__leaky_bucket__fully_replenish_after_time_period() -> None:
bucket = LeakyBucket(3, 1)

for _ in range(3):
await bucket.fill()

await asyncio.sleep(3)

assert bucket.tokens == 0
assert bucket.full is False

0 comments on commit 2c4540f

Please sign in to comment.