Source code for pytokenbucket.tokenbucket

# -*- coding: utf-8 -*-
"""Implementation of a thread-safe token bucket using threads and queues."""

from __future__ import print_function, division

try:
    import queue
except ImportError:
    import Queue as queue

import logging
import threading
import time
log = logging.getLogger(__name__)


[docs]class TokenBucket(object): """A token bucket. Tokens can be requested using `get_token`.""" def __init__(self, bucket_size=10, refresh_amount=1, refresh_period_ms=1000, start_filled=True): """Construct a TokenBucket instance. Args: bucket_size (int): The maximum number of tokens this bucket can hold. Extra tokens are thrown away. refresh_amount (int): The number of tokens to refresh each period. refresh_period_ms (int): The duration between token refreshes in milliseconds. start_filled (bool): Whether the bucket starts filled or empty. """ super(TokenBucket, self).__init__() self.bucket_size = bucket_size self.refresh_amount = refresh_amount self.refresh_period_ms = refresh_period_ms self.timer_queue = queue.Queue(self.bucket_size) self.stopper = threading.Event() if start_filled: # Fill the queue. for _i in range(self.bucket_size): self.timer_queue.put(True) # Create a thread which handles the filling of the queue. self.token_thread = threading.Thread(name="tokenbucket", target=self._token_filler) self.token_thread.setDaemon(True) self.token_thread.start() def _token_filler(self): log.debug("Starting token filler thread") while not self.stopper.is_set(): added = 0 for _i in range(self.refresh_amount): try: self.timer_queue.put_nowait(True) added += 1 except queue.Full: pass log.debug("Added %d/%d tokens to bucket (approx size %d), sleeping %f ms", added, self.refresh_amount, self.timer_queue.qsize(), self.refresh_period_ms) time.sleep(self.refresh_period_ms / 1000)
[docs] def stop(self): """Stop the token bucket pending a shutdown.""" log.debug("Stopping token filler thread") self.stopper.set() self.token_thread.join() log.debug("Stopped token filler thread")
[docs] def get_token(self): """Get a token. Blocks until a token is retrieved or the token bucket is stopped. Returns: True if a token was retrieved. False if the token bucket was stopped. """ while True: try: token = self.timer_queue.get(timeout=self.refresh_period_ms / 1000) self.timer_queue.task_done() return token except queue.Empty: # The queue is empty when there are no more tokens. Wait for more tokens. if self.stopper.is_set(): # The token bucket is stopping. Return false for this request. return False
[docs] def deferred_call(self, callable): """Return a callable which calls the argument when a token is available. This method can be used to make a wrapper for a callable which can then be used by the multiprocessing.dummy Pool with the `apply_async` method. It also abstracts away the internals of getting a token. Args: callable (Callable): a callable which is called when a token is available. If the token bucket is stopped before a token is available, the callable is not called. Returns: A callable which waits for a token to become available before calling the inner callable. """ def _proxy(*args, **kwargs): if self.get_token(): return callable(*args, **kwargs) else: log.info("Did not call %r as a token was not available", callable) return _proxy