Python多线程队列的应用总结

最近在工作中频繁发现一些多线程的BUG,这类BUG往往难以稳定复现,分析困难。除了某些业务逻辑本身比较复杂之外,还有部分原因是开发人员对并发模型理解不到位,使用错误的同步操作导致。

线程并发模型本身很复杂,本文不表。仅从应用层面回答几个常见问题,如有纰漏,还望赐教!

什么时候用多线程

多线程能够并发的执行任务,从而加速程序的处理速度。因此当任务规模比较大或者单线程处理比较慢的时候,适合使用多线程来处理问题。比如口令暴破、爬虫等等。

值得注意的是Python的线程和Linux标准线程有所区别,由于GIL的存在,Python多线程不能运行在不同的CPU核心上,因此对于计算密集型的任务,不适合使用Python多线程去解决,因为这类任务你不管开多少线程,都是消耗单个核心的CPU资源,反而还会增加线程切换的开销,降低效率。

同时,线程也不是越多越好,因为线程切换是由操作系统调度的,具体到Python的运行时上还额外加了GIL,也就是说Python线程切换实际上涉及上锁、解锁,内核态、用户态切换多种操作开销,如果数量太多,反正会降低程序运行效率。

什么时候用多线程队列

我们先看下标准库是如何介绍Queue模块的:

The Queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.

可以看到这个模块本身就是为生产者-消费者的多线程模型设计的,实现了全部要求锁语义,也就是说队列其实也是一种线程同步工具。因此,当我们的任务符合生产者-消费者模型,或者数据可靠性要求比较高的情况下适合用队列来同步多线程任务。
队列还有一个好处,相比常规的多线程任务处理模型,多线程能够提高并发效率和稳定性。
举个例子,假设我们有100000个密码,开启10个线程暴破目标网站A。有这么几种做法:

  1. 每次启动10个线程,每个线程取一个密码尝试登录,等待10个线程结束。如此循环往复直至处理完所有密码
  2. 把100000个密码划分为10个List,每个List有10000个密码,启动10个线程,每个线程处理一个List
  3. 使用队列,启动10个消费者线程等待队列,然后在主线程中同步地把所有密码入队,等待队列中任务处理完成

第一种做法共计启动了100000个线程,显然有太多不必要的开销,而且耗时会更长(为什么?)。
第二种做法效率比第一种高,但是不稳定,试想某个线程因为外部操作阻塞了,那么它这个List上后续的任务就全部在等待。
第三种是解决了前两种做法带来的问题,首先它不会启动过多的线程,造成无谓的开销,其次它的稳定性更高,即便其中某一两个线程无法工作,只要不是全部出问题,其余线程仍然能够处理完所有任务。

一般情况下,只要我们的多线程任务是一个常驻程序,或者任务量比较大,最好使用队列方式进行线程同步。

Python标准库Queue模块的实现分析


下面是官方文档给的简单例子:

def worker():
while True:
item = q.get()
do_work(item)
q.task_done()

q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()

for item in source():
q.put(item)

q.join() # block until all tasks are done

这里先启动一批worker线程,阻塞在q.get()方法上,每添加一个任务,unfinished_tasks计数加1,并且唤醒一个阻塞的worker。
当worker执行到q.task_done()时,unfinished_tasks计数减1,当unfinished_tasks减到0时唤醒q.join()
此时在q.join()内部发现unfinished_tasks0,跳出循环,整个工作流结束。

多线程队列的局限性及解决方法

上面的例子描述了Queue的工作原理,但是在实际的工作中如果我们完全照搬这种写法可能会引入一些问题。
便于大家理解,我们仍然从这段代码出发,通过迭代代码的方式来说明以上示例可能会遇到的问题及解决方法。

问题一:如何关闭Worker?

示例中的Worker处于一个死循环当中,要么执行任务,要么永远阻塞在q.get()上,只能随着主线程的退出而结束。
但是很多时候主线程不能退出,这种情况下worker线程就永远不会退出。因此我们需要一种机制能够通知到Worker退出。

有两种方法处理让Worker退出:

非阻塞get

"""
非阻塞的get方法
这种方法令Worker轮询检测退出标志位,浪费了部分性能,但是避免了死锁等待的问题,出了问题比较好定位。
"""

class Worker(Thread):
def __init__(self, queue):
super(Worker, self).__init__()
self.q = queue
self._stop = False

def run(self):
while not self._stop:
try:
item = self.q.get(timeout=1)
except Empty:
continue
try:
print("Worker {} do_work {}".format(self.ident, item))
time.sleep(random.random())
finally:
self.q.task_done()
print("Worker {} exit".format(self.ident))

def stop(self):
self._stop = True


q = Queue()
threads = []
for i in range(num_worker_threads):
t = Worker(q)
t.daemon = True
t.start()
threads.append(t)

for item in source():
q.put(item)

q.join() # block until all tasks are done

# stop all workers
for t in threads:
t.stop()

# wait all workers to exit
for t in threads:
t.join()

队列通知

将Worker退出信号发送到队列当中,Worker收到信号之后自行退出。

"""
通过队列通知Woker退出
这种方法向队列中发送Worker的退出信号,Worker收到信号之后自行退出。
避免轮询,充分利用Queue的同步机制,也是Python3中标准库concurrent.futures.ThreadPoolExecutor模块的实现。
"""

sentinel = None # or object(), anything can't exist in queue is ok


class Worker(Thread):
def __init__(self, queue):
super(Worker, self).__init__()
self.q = queue

def run(self):
while True:
try:
item = self.q.get()
if item is sentinel:
self.q.put(sentinel)
break
print("Worker {} do_work {}".format(self.ident, item))
time.sleep(random.random())
finally:
self.q.task_done()
print("Worker {} exit".format(self.ident))


q = Queue()
threads = []
for i in range(num_worker_threads):
t = Worker(q)
t.daemon = True
t.start()
threads.append(t)

for item in source():
q.put(item)

q.join() # block until all tasks are done
q.put(sentinel) # notice worker to exit

# wait all workers to exit
for t in threads:
t.join()

一般情况下,优先使用第二种方法。

问题二:队列等待有超时机制吗?

当我们执行到q.join()方法的时候,除非队列中的任务全部执行完成,否则我们是无法进行下一个操作的。标准库中join()方法没有超时参数,怎么办呢?

分析join()的标准库实现,其实我们自己可以给join函数加一个超时参数:

class MyQueue(Queue):
def join(self, timeout=None):
if timeout is None:
super(MyQueue).join()
elif timeout > 0:
t = time.time() + timeout
self.all_tasks_done.acquire()
try:
while self.unfinished_tasks:
remaining = t - time.time()
if remaining <= 0:
break
self.all_tasks_done.wait(remaining)
finally:
self.all_tasks_done.release()
else:
raise Exception("timeout must be positive number")

问题三:如何立即停止所有任务?

如果使用的非阻塞的get方法,那么直接调用Worker线程的stop方法即可,每个Worker处理完当前的任务之后会立即结束,即便Queue里还有任务也没有影响,直接删除队列对象就可以释放内存了。
但如果使用的是队列通知Worker退出,就没有那么简单了。当我们向Queue发送Worker的退出信号时,Queue中可能还有其他的任务,由于Queue底层是一个FIFO的队列,Worker必须把Queue中任务处理完,才能收到退出信号正常退出。
在这种情况下,我们必须先清空队列中的任务,再发送退出信号。这样就可以立即停止当前任务了。

问题四:如何在Worker完成目标之后停止所有任务?

这个场景类似于暴力破解中暴破成功之后立即停止。完成这个需求其实和队列没有太多关系,只需要在Worker之间共享一个event即可。

class Worker(Thread):
def __init__(self, queue, stop_event):
super(Worker, self).__init__()
self.q = queue
self.stop_event = stop_event

def run(self):
while not self.stop_event.is_set():
try:
item = self.q.get()
if item is sentinel:
self.q.put(sentinel)
break
if some_condition:
self.stop_event.set()
finally:
self.q.task_done()

利用event还可以很容易的实现超时等待的功能,只需要调用event.wait(timeout=60)即可。

问题五:为什么Queue.join()之后任务却没有执行完?

看下面的代码:

def worker():
while True:
item = q.get()
do_work(item)
q.task_done()


def puter():
for item in source():
q.put(item)
time.sleep(random.random())


q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()

t = Thread(target=puter)
t.daemon = True
t.start()

q.join() # block until all tasks are done
print("all tasks are done??????")
print("wait 5 seconds...")
time.sleep(5)

代码执行结果如下:

do_work 0                                                                                                               do_work 1
all tasks are done??????
wait 5 seconds...
do_work 2
do_work 3
do_work 4
do_work 5
do_work 6
do_work 7
do_work 8
do_work 9
do_work 10

Worker执行2个任务之后join就结束了,后续等待的过程中可以看到Worker还在执行任务,除非主线程退出。
这里要明确一个概念,queue.join()的阻塞条件是队列不为空,这个条件和任务全部完成并不是等价的。
在上面的例子当中,生产者-消费者都工作在子线程中,很容易存在某个时间点,worker消费完队列中所有的任务,但是puter还没有把新的任务入队,此时队列为空,queue.join()退出。

在任务已经全部入队的情况下,队列为空等价于任务全部执行。
但是当任务是动态添加的时候,很容易出现队列中没有任务的情况,此时我们不能使用queue.join()来等待队列。这种情况下,一般需要我们自己设置类似stop_event的同步变量,然后在主线程等待它。

总结

多线程是个老生常态的话题,虽然Python API已经封装得非常好了,Python3中的concurrent.futures.ThreadPoolExecutor能够满足绝大多数并发需求,但是现实的复杂需求需要我们使用threading、Queue这些底层模块编写复杂的多线程模型,如果平常理解不到位,很容易就会写出低效、死锁的代码。
而往往这类问题在debug时又非常难以发现,多半是偶现问题,非常浪费时间。因此推荐大家熟练掌握几种常用的模型,结合业务需求分析,在逐步去扩展并发模型的功能,写出更可靠稳定的代码。