怎样用 Java 来构建一个简单的速率限制器?








[*]如果我们的TPS为5,而且在其中一个1秒的时段中,我们在下一秒只使用3个代币,那么我们应该可以大概提供5+2 = 7个代币作为嘉奖。但速率为每个令牌1/7(142.28ms)。奖金不应结转到下一个插槽。
让我们起首定义我们的 速率限制器:
* Rate limiter helps in limiting the rate of execution of a piece of code. The rate is defined in terms of
* TPS(Transactions per second). Rate of 5 would suggest, 5 transactions/second. Transaction could be a DB call, API call,
* or a simple function call.
* <p>
* Every {@link RateLimiter} implementation should implement either {@link RateLimiter#throttle(Code)} or, {@link RateLimiter#enter()}.
* They can also choose to implement all.
* <p>
* {@link Code} represents a piece of code that needs to be rate limited. It could be a function call, if the code to be rate limited
* spreads across multiple functions, we need to use entry() and exit() contract.
public interface RateLimiter {

   * Rate limits the code passed inside as an argument.
   * @param code representation of the piece of code that needs to be rate limited.
   * @return true if executed, false otherwise.
    boolean throttle(Code code);
   * When the piece of code that needs to be rate limited cannot be represented as a contiguous
   * code, then entry() should be used before we start executing the code. This brings the code inside the rate
   * limiting boundaries.
   * @return true if the code will execute and false if rate limited.
   * <p
    boolean enter();
   * Interface to represent a contiguous piece of code that needs to be rate limited.
    interface Code {
         * Calling this function should execute the code that is delegated to this interface.
      void invoke();
复制代码 我们的 RateLimit有两组API:一个是throttle(code),另一个是enter()。这两种方法都满足相同的功能,但采用以下两种方式:

[*]boolean throttle(代码)-如果我们有连续的代码,可以用来通报一个代码块。
[*]布尔输入() - 通常可以在API、DB或任何我们想要节省的调用之前使用。如果实行此代码后面的代码,则将返回 真 ,以及 假的如果它是速率受限的话。您可以将这些哀求排队或拒绝。

考虑我们进行第一笔交易的时间t0。 t0 .所以,
直到(t0 + 1)s,我们只答应进行N次交易。 (t0 + 1)s , we are allowed to make only N transactions.怎样确保这一点?在下次交易时,我们将查抄
当前时间≤(t0 + 1)。.如果没有,那么这意味着我们进入了不同的秒,而且我们被答应进行N次交易。 N transactions.让我们看一小段代码,它演示了:
long now = System.nanoTime();
if (now <= mNextSecondBoundary) { // If we are within the time limit of current second
    if (mCounter < N) { // If token available
      mLastExecutionNanos = now;
      mCounter++; // Allocate token
      invoke(code); // Invoke the code passed the throttle method.
复制代码 那么,我们怎样定义mNextSecondBoundary呢?这将在我们进行第一个事务时完成,如前所述,我们将在完成第一个事务的时间增加一秒。
if (mLastExecutionNanos == 0L) {
    mCounter++; // Allocate the very first token here.
    mLastExecutionNanos = System.nanoTime();
    mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;// (10^9)
复制代码 如今,如果我们实行代码并看到我们进入了不同的秒,我们应该怎么做?我们将通过重置上次实行时间、可用令牌数来增强前面的代码,并通过调用 节省阀()再一次。我们的方法已经知道怎样处理新的秒。
public boolean throttle(Code code) {
    if (mTPS <= 0) {
      // We do not want anything to pass.
      return false;

synchronized (mLock) {
      if (mLastExecutionNanos == 0L) {
            mLastExecutionNanos = System.nanoTime();
            mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;
            return true;
      } else {
            long now = System.nanoTime();
            if (now <= mNextSecondBoundary) {
                if (mCounter < mTPS) {
                  mLastExecutionNanos = now;
                  return true;
                } else {
                  return false;
            } else {
                // Reset the counter as we in a different second now.
                mCounter = 0;
                mLastExecutionNanos = 0L;
                mNextSecondBoundary = 0L;
                return throttle(code);
复制代码 在这个实现中,我们可以通报需要节省的代码块,但是这个代码有一个问题。这将工作,但它会表现不佳。不保举,但为什么呢?请在评论中告诉我。
public boolean enter() {
    if (mTPS == 0L) {
      return false;

synchronized (mBoundaryLock) {
      if (mLastExecutionNanos == 0L) {
            mLastExecutionNanos = System.nanoTime();
            mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;
            return true;
      } else {
            long now = System.nanoTime();
            if (now <= mNextSecondBoundary) {
                if (mCounter < mTPS) {
                  mLastExecutionNanos = now;
                  return true;
                } else return false;
            } else {
                // Reset the counter as we in a different second now.
                mCounter = 0;
                mLastExecutionNanos = 0L;
                mNextSecondBoundary = 0L;
                return enter();
复制代码 如今,我们简单的速率限制器已经可以使用了。您可以检察完整的代码 这里。

public static void main(String[] args) {
    RateLimiter limiter = new SimpleTokenBucketRateLimiter(1);
    Thread[] group = new Thread;
    Runnable r = () -> {
      for (int i = 0; i < 100; i++) {
            try {
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            if (limiter.enter()) {
                System.out.println("Values:- " + Thread.currentThread().getName() + ": " + i);

for (int i = 0; i < 6; i++) {
      group = new Thread(r);
复制代码 我们的API不支持平滑事务,而是让事务等待下一个令牌被分配,而不是丢弃哀求。在拒绝它们之后,它返回false,所以如果我们真的想的话,我们可以把它们排队。
if (limiter.enter()) {
                System.out.println("Values:- " + Thread.currentThread().getName() + ": " + i);
} else { // queue the work again }
当我们尝试将TPS设置为 2我们将看到以下输出:

今天先到这里吧。 我们将在后续文章中构建一个更复杂的速率限制器。

