This document describes the current stable version of Kombu (5.0). For development docs, go here.

Source code for kombu.utils.limits

"""Token bucket implementation for rate limiting."""

from collections import deque
from time import monotonic

__all__ = ('TokenBucket',)


[docs]class TokenBucket: """Token Bucket Algorithm. See Also: https://en.wikipedia.org/wiki/Token_Bucket Most of this code was stolen from an entry in the ASPN Python Cookbook: https://code.activestate.com/recipes/511490/ Warning: Thread Safety: This implementation is not thread safe. Access to a `TokenBucket` instance should occur within the critical section of any multithreaded code. """ #: The rate in tokens/second that the bucket will be refilled. fill_rate = None #: Maximum number of tokens in the bucket. capacity = 1 #: Timestamp of the last time a token was taken out of the bucket. timestamp = None def __init__(self, fill_rate, capacity=1): self.capacity = float(capacity) self._tokens = capacity self.fill_rate = float(fill_rate) self.timestamp = monotonic() self.contents = deque()
[docs] def add(self, item): self.contents.append(item)
[docs] def pop(self): return self.contents.popleft()
[docs] def clear_pending(self): self.contents.clear()
[docs] def can_consume(self, tokens=1): """Check if one or more tokens can be consumed. Returns: bool: true if the number of tokens can be consumed from the bucket. If they can be consumed, a call will also consume the requested number of tokens from the bucket. Calls will only consume `tokens` (the number requested) or zero tokens -- it will never consume a partial number of tokens. """ if tokens <= self._get_tokens(): self._tokens -= tokens return True return False
[docs] def expected_time(self, tokens=1): """Return estimated time of token availability. Returns: float: the time in seconds. """ _tokens = self._get_tokens() tokens = max(tokens, _tokens) return (tokens - _tokens) / self.fill_rate
def _get_tokens(self): if self._tokens < self.capacity: now = monotonic() delta = self.fill_rate * (now - self.timestamp) self._tokens = min(self.capacity, self._tokens + delta) self.timestamp = now return self._tokens