马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
本文分享自天翼云开发者社区《python 实现消费者优先级队列》,作者:Frost
关键字
条件变量,信号量,消费者优先级,公平性,堆队列算法
需求背景
常见的是消息队列支持为消息指定优先级,但支持为消费者指定优先级的却很少见,作者在网上检索一般能查到 rabbitMQ 的消费者优先级相关资料。并没有找到别的语言的资料。
而 python 标准库里全部队列都是公平的,并没有使用非公平的参数,因此大概不能满足有些场景的需求。
什么是公平与非公平呢,这个一般是指互斥锁的特征,互斥锁的多个尝试取锁的线程其实很类似队列的多个消费者,以 waiter 统称。
假设有 A, B, C, D 四个 waiter,他们按照字母顺序依次调用 acquire()/get(),
那么等到有线程开释锁或队列放入了一条消息,会按照先来后到的顺序,唤醒对应的 waiter,也就是这里的 A,同理,按照排队顺序,B -> C -> D 将是后续的唤醒顺序,其实简单讲就是 FIFO。
一般来说 FIFO 策略具有普适性,可以避免有的消费者被饿死,但某些场景我们希望给队列的消费者赋予优先级,每次优先唤醒仍在等待消费的优先级最高的消费者。
下面会给出 pure python 的实现。
实现原理
先阅读 python 自带的 SimpleQueue 源码 (pure python 版本,位于 Lib\queue.py)。- class _PySimpleQueue:
- '''Simple, unbounded FIFO queue.
- This pure Python implementation is not reentrant.
- '''
- # Note: while this pure Python version provides fairness
- # (by using a threading.Semaphore which is itself fair, being based
- # on threading.Condition), fairness is not part of the API contract.
- # This allows the C version to use a different implementation.
- def __init__(self):
- self._queue = deque()
- self._count = threading.Semaphore(0)
- def put(self, item, block=True, timeout=None):
- '''Put the item on the queue.
- The optional 'block' and 'timeout' arguments are ignored, as this method
- never blocks. They are provided for compatibility with the Queue class.
- '''
- self._queue.append(item)
- self._count.release()
- def get(self, block=True, timeout=None):
- '''Remove and return an item from the queue.
- If optional args 'block' is true and 'timeout' is None (the default),
- block if necessary until an item is available. If 'timeout' is
- a non-negative number, it blocks at most 'timeout' seconds and raises
- the Empty exception if no item was available within that time.
- Otherwise ('block' is false), return an item if one is immediately
- available, else raise the Empty exception ('timeout' is ignored
- in that case).
- '''
- if timeout is not None and timeout < 0:
- raise ValueError("'timeout' must be a non-negative number")
- if not self._count.acquire(block, timeout):
- raise Empty
- return self._queue.popleft()
- def put_nowait(self, item):
- '''Put an item into the queue without blocking.
- This is exactly equivalent to `put(item, block=False)` and is only provided
- for compatibility with the Queue class.
- '''
- return self.put(item, block=False)
- def get_nowait(self):
- '''Remove and return an item from the queue without blocking.
- Only get an item if one is immediately available. Otherwise
- raise the Empty exception.
- '''
- return self.get(block=False)
- def empty(self):
- '''Return True if the queue is empty, False otherwise (not reliable!).'''
- return len(self._queue) == 0
- def qsize(self):
- '''Return the approximate size of the queue (not reliable!).'''
- return len(self._queue)
- __class_getitem__ = classmethod(types.GenericAlias)
复制代码 docstring 里面说明,这个队列是保证了公平性,因为其使用的信号量实现是公平的。
符合直觉的是,我们在 get 方法以及信号量的 acquire 方法增加一个优先级数值的参数,那么再来看信号量的实现,看看能不能做到这一点,- class Semaphore:
- """This class implements semaphore objects.
- Semaphores manage a counter representing the number of release() calls minus
- the number of acquire() calls, plus an initial value. The acquire() method
- blocks if necessary until it can return without making the counter
- negative. If not given, value defaults to 1.
- """
- # After Tim Peters' semaphore class, but not quite the same (no maximum)
- def __init__(self, value=1):
- if value < 0:
- raise ValueError("semaphore initial value must be >= 0")
- self._cond = Condition(Lock())
- self._value = value
- def acquire(self, blocking=True, timeout=None):
- """Acquire a semaphore, decrementing the internal counter by one.
- When invoked without arguments: if the internal counter is larger than
- zero on entry, decrement it by one and return immediately. If it is zero
- on entry, block, waiting until some other thread has called release() to
- make it larger than zero. This is done with proper interlocking so that
- if multiple acquire() calls are blocked, release() will wake exactly one
- of them up. The implementation may pick one at random, so the order in
- which blocked threads are awakened should not be relied on. There is no
- return value in this case.
- When invoked with blocking set to true, do the same thing as when called
- without arguments, and return true.
- When invoked with blocking set to false, do not block. If a call without
- an argument would block, return false immediately; otherwise, do the
- same thing as when called without arguments, and return true.
- When invoked with a timeout other than None, it will block for at
- most timeout seconds. If acquire does not complete successfully in
- that interval, return false. Return true otherwise.
- """
- if not blocking and timeout is not None:
- raise ValueError("can't specify timeout for non-blocking acquire")
- rc = False
- endtime = None
- with self._cond:
- while self._value == 0:
- if not blocking:
- break
- if timeout is not None:
- if endtime is None:
- endtime = _time() + timeout
- else:
- timeout = endtime - _time()
- if timeout <= 0:
- break
- self._cond.wait(timeout)
- else:
- self._value -= 1
- rc = True
- return rc
- __enter__ = acquire
- def release(self, n=1):
- """Release a semaphore, incrementing the internal counter by one or more.
- When the counter is zero on entry and another thread is waiting for it
- to become larger than zero again, wake up that thread.
- """
- if n < 1:
- raise ValueError('n must be one or more')
- with self._cond:
- self._value += n
- for i in range(n):
- self._cond.notify()
- def __exit__(self, t, v, tb):
- self.release()
复制代码
由此,最关键的实现完成了,接下来只需要给 _PySimpleQueue.get 方法也增加 priority 参数,并传入 Semaphore.acquire 方法。 Semaphore.acquire 方法增加 priority 参数,并传入给 Condition.wait 方法,就完成啦,限于篇幅这里就不全写下来了。
另外这里固然加入了 priority 参数,但完全不使用这个参数时,其行为和原始版本时没有区别的,即依然符合 FIFO 策略。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |