Python 并发编程小结1

KeyWords:

  • threading.Thread
  • threading.Event
  • 使用队列queue.Queue,task_down()、join()实现线程通信
  • Lock的上下文管理、RLock、信号量Semaphore
  • 死锁
  • 创建对其他线程不可见的线程状态:threading.local()
  • 线程池: concurrent.futures.ThreadPoolExecutor
  • 并行编程:concurrent.futures.ProcessPoolExecutor、multiprocessing
  • GIL全局解释器的影响和解决方案
  • Actor模式
  • (本文未提到的)生成器、协程与asyncio
  • (本文未提到的)消息发布/订阅模型、多线程轮询

启动和停止线程

threading​ 库可以在单独的线程中执行任何的在 Python 中可以调用的对象。你可以创建一个 Thread​ 对象并将你要执行的对象以 target 参数的形式提供给该对象。 下面是一个简单的例子:

# Code to execute in an independent thread
import time
def countdown(n):
while n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)

# Create and launch a thread
from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()

当你创建好一个线程对象后,该对象并不会立即执行,除非你调用它的 <span class="pre">start()</span>​ 方法(当你调用 <span class="pre">start()</span>​ 方法时,它会调用你传递进来的函数,并把你传递进来的参数传递给该函数)。Python中的线程会在一个单独的系统级线程中执行(比如说一个 POSIX 线程或者一个 Windows 线程),这些线程将由操作系统来全权管理。线程一旦启动,将独立执行直到目标函数返回。你可以查询一个线程对象的状态,看它是否还在执行:

if t.is_alive():
print('Still running')
else:
print('Completed')

你也可以将一个线程加入到当前线程,并等待它终止:

t.join()

Python解释器直到所有线程都终止前仍保持运行。对于需要长时间运行的线程或者需要一直运行的后台任务,你应当考虑使用后台线程。 例如:

t = Thread(target=countdown, args=(10,), daemon=True)
t.start()

后台线程无法等待,不过,这些线程会在主线程终止时自动销毁。 除了如上所示的两个操作,并没有太多可以对线程做的事情。你无法结束一个线程,无法给它发送信号,无法调整它的调度,也无法执行其他高级操作。如果需要这些特性,你需要自己添加。比如说,如果你需要终止线程,那么这个线程必须通过编程在某个特定点轮询来退出。你可以像下边这样把线程放入一个类中:

class CountdownTask:
def __init__(self):
self._running = True

def terminate(self):
self._running = False

def run(self, n):
while self._running and n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)

c = CountdownTask()
t = Thread(target=c.run, args=(10,))
t.start()
c.terminate() # Signal termination
t.join() # Wait for actual termination (if needed)

如果线程执行一些像I/O这样的阻塞操作,那么通过轮询来终止线程将使得线程之间的协调变得非常棘手。比如,如果一个线程一直阻塞在一个I/O操作上,它就永远无法返回,也就无法检查自己是否已经被结束了。要正确处理这些问题,你需要利用超时循环来小心操作线程。 例子如下:

class IOTask:
def terminate(self):
self._running = False

def run(self, sock):
# sock is a socket
sock.settimeout(5) # Set timeout period
while self._running:
# Perform a blocking I/O operation w/ timeout
try:
data = sock.recv(8192)
break
except socket.timeout:
continue
# Continued processing
...
# Terminated
return

讨论

由于全局解释锁(GIL)的原因,Python 的线程被限制到同一时刻只允许一个线程执行这样一个执行模型。所以,Python 的线程更适用于处理I/O和其他需要并发执行的阻塞操作(比如等待I/O、等待从数据库获取数据等等),而不是需要多处理器并行的计算密集型任务。

有时你会看到下边这种通过继承 Thread​ 类来实现的线程:

from threading import Thread

class CountdownThread(Thread):
def __init__(self, n):
super().__init__()
self.n = n
def run(self):
while self.n > 0:

print('T-minus', self.n)
self.n -= 1
time.sleep(5)

c = CountdownThread(5)
c.start()

尽管这样也可以工作,但这使得你的代码依赖于 threading​ 库,所以你的这些代码只能在线程上下文中使用。上文所写的那些代码、函数都是与 threading​ 库无关的,这样就使得这些代码可以被用在其他的上下文中,可能与线程有关,也可能与线程无关。比如,你可以通过 multiprocessing​ 模块在一个单独的进程中执行你的代码:

import multiprocessing
c = CountdownTask(5)
p = multiprocessing.Process(target=c.run)
p.start()

再次重申,这段代码仅适用于 CountdownTask 类是以独立于实际的并发手段(多线程、多进程等等)实现的情况。

判断线程是否启动

你已经启动了一个线程,但是你想知道它是不是真的已经开始运行了。

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用 threading​ 库中的 Event​ 对象。 Event​ 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,event 对象中的信号标志被设置为假。如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待这个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行。 下边的代码展示了如何使用 Event​ 来协调线程的启动:

from threading import Thread, Event
import time

# Code to execute in an independent thread
def countdown(n, started_evt):
print('countdown starting')
started_evt.set()
while n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)

# Create the event object that will be used to signal startup
started_evt = Event()

# Launch the thread and pass the startup event
print('Launching countdown')
t = Thread(target=countdown, args=(10,started_evt))
t.start()

# Wait for the thread to start
started_evt.wait()
print('countdown is running')

当你执行这段代码,“countdown is running” 总是显示在 “countdown starting” 之后显示。这是由于使用 event 来协调线程,使得主线程要等到 countdown()​ 函数输出启动信息后,才能继续执行。

讨论

event 对象最好单次使用,就是说,你创建一个 event 对象,让某个线程等待这个对象,一旦这个对象被设置为真,你就应该丢弃它。尽管可以通过 clear()​ 方法来重置 event 对象,但是很难确保安全地清理 event 对象并对它重新赋值。很可能会发生错过事件、死锁或者其他问题(特别是,你无法保证重置 event 对象的代码会在线程再次等待这个 event 对象之前执行)。如果一个线程需要不停地重复使用 event 对象,你最好使用 Condition​ 对象来代替。下面的代码使用 Condition​ 对象实现了一个周期定时器,每当定时器超时的时候,其他线程都可以监测到:

import threading
import time

class PeriodicTimer:
def __init__(self, interval):
self._interval = interval
self._flag = 0
self._cv = threading.Condition()

def start(self):
t = threading.Thread(target=self.run)
t.daemon = True

t.start()

def run(self):
'''
Run the timer and notify waiting threads after each interval
'''
while True:
time.sleep(self._interval)
with self._cv:
self._flag ^= 1
self._cv.notify_all()

def wait_for_tick(self):
'''
Wait for the next tick of the timer
'''
with self._cv:
last_flag = self._flag
while last_flag == self._flag:
self._cv.wait()

# Example use of the timer
ptimer = PeriodicTimer(5)
ptimer.start()

# Two threads that synchronize on the timer
def countdown(nticks):
while nticks > 0:
ptimer.wait_for_tick()
print('T-minus', nticks)
nticks -= 1

def countup(last):
n = 0
while n < last:
ptimer.wait_for_tick()
print('Counting', n)
n += 1

threading.Thread(target=countdown, args=(10,)).start()
threading.Thread(target=countup, args=(5,)).start()

event对象的一个重要特点是当它被设置为真时会唤醒所有等待它的线程。如果你只想唤醒单个线程,最好是使用信号量或者 Condition​ 对象来替代。考虑一下这段使用信号量实现的代码:

# Worker thread
def worker(n, sema):
# Wait to be signaled
sema.acquire()

# Do some work
print('Working', n)

# Create some threads
sema = threading.Semaphore(0)
nworkers = 10
for n in range(nworkers):
t = threading.Thread(target=worker, args=(n, sema,))
t.start()

运行上边的代码将会启动一个线程池,但是并没有什么事情发生。这是因为所有的线程都在等待获取信号量。每次信号量被释放,只有一个线程会被唤醒并执行,示例如下:

>>> sema.release()
Working 0
>>> sema.release()
Working 1
>>>

编写涉及到大量的线程间同步问题的代码会让你痛不欲生。比较合适的方式是使用队列来进行线程间通信或者每个把线程当作一个Actor,利用Actor模型来控制并发。下一节将会介绍到队列,而Actor模型将在12.10节介绍。

线程间通信

从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue​ 库中的队列了。创建一个被多个线程共享的 Queue​ 对象,这些线程通过使用 put()​ 和 get()​ 操作来向队列中添加或者删除元素。 例如:

from queue import Queue
from threading import Thread

# A thread that produces data
def producer(out_q):
while True:
# Produce some data
...
out_q.put(data)

# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()
# Process the data
...

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

Queue​ 对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。 当使用队列时,协调生产者和消费者的关闭问题可能会有一些麻烦。一个通用的解决方法是在队列中放置一个特殊的值,当消费者读到这个值的时候,终止执行。

from queue import Queue
from threading import Thread

# Object that signals shutdown
_sentinel = object()

# A thread that produces data
def producer(out_q):
while running:
# Produce some data
...
out_q.put(data)

# Put the sentinel on the queue to indicate completion
out_q.put(_sentinel)

# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()

# Check for termination
if data is _sentinel:
in_q.put(_sentinel)
break

# Process the data
...

本例中有一个特殊的地方:消费者在读到这个特殊值之后立即又把它放回到队列中,将之传递下去。这样,所有监听这个队列的消费者线程就可以全部关闭了。 尽管队列是最常见的线程间通信机制,但是仍然可以自己通过创建自己的数据结构并添加所需的锁和同步机制来实现线程间通信。最常见的方法是使用 Condition​ 变量来包装你的数据结构。下边这个例子演示了如何创建一个线程安全的优先级队列,如同1.5节中介绍的那样。

import heapq
import threading

class PriorityQueue:
def __init__(self):
self._queue = []
self._count = 0
self._cv = threading.Condition()
def put(self, item, priority):
with self._cv:
heapq.heappush(self._queue, (-priority, self._count, item))
self._count += 1
self._cv.notify()

def get(self):
with self._cv:
while len(self._queue) == 0:
self._cv.wait()
return heapq.heappop(self._queue)[-1]

使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下,你没有办法知道接收数据的线程是什么时候接收到的数据并开始工作的。不过队列对象提供一些基本完成的特性,比如下边这个例子中的 **task_done()** ​ 和 **join()** ​ :

from queue import Queue
from threading import Thread

# A thread that produces data
def producer(out_q):
while running:
# Produce some data
...
out_q.put(data)

# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()

# Process the data
...
# Indicate completion
in_q.task_done()

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

# Wait for all produced items to be consumed
q.join()

如果一个线程需要在一个“消费者”线程处理完特定的数据项时立即得到通知,你可以把要发送的数据和一个 Event​ 放到一起使用,这样“生产者”就可以通过这个Event对象来监测处理的过程了。示例如下:

from queue import Queue
from threading import Thread, Event

# A thread that produces data
def producer(out_q):
while running:
# Produce some data
...
# Make an (data, event) pair and hand it to the consumer
evt = Event()
out_q.put((data, evt))
...
# Wait for the consumer to process the item
evt.wait()

# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data, evt = in_q.get()
# Process the data
...
# Indicate completion
evt.set()

讨论

基于简单队列编写多线程程序在多数情况下是一个比较明智的选择。从线程安全队列的底层实现来看,你无需在你的代码中使用锁和其他底层的同步机制,这些只会把你的程序弄得乱七八糟。此外,使用队列这种基于消息的通信机制可以被扩展到更大的应用范畴,比如,你可以把你的程序放入多个进程甚至是分布式系统而无需改变底层的队列结构。 使用线程队列有一个要注意的问题是,向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。如果你担心对象的共享状态,那你最好只传递不可修改的数据结构(如:整型、字符串或者元组)或者一个对象的深拷贝。例如:

from queue import Queue
from threading import Thread
import copy

# A thread that produces data
def producer(out_q):
while True:
# Produce some data
...
out_q.put(copy.deepcopy(data))

# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()
# Process the data
...

Queue​ 对象提供一些在当前上下文很有用的附加特性。比如在创建 Queue 对象时提供可选的 size​ 参数来限制可以添加到队列中的元素数量。对于“生产者”与“消费者”速度有差异的情况,为队列中的元素数量添加上限是有意义的。比如,一个“生产者”产生项目的速度比“消费者” “消费”的速度快,那么使用固定大小的队列就可以在队列已满的时候阻塞队列,以免未预期的连锁效应扩散整个程序造成死锁或者程序运行失常。在通信的线程之间进行“流量控制”是一个看起来容易实现起来困难的问题。如果你发现自己曾经试图通过摆弄队列大小来解决一个问题,这也许就标志着你的程序可能存在脆弱设计或者固有的可伸缩问题。 get()​ 和 put()​ 方法都支持非阻塞方式和设定超时,例如:

import queue
q = queue.Queue()

try:
data = q.get(block=False)
except queue.Empty:
...

try:
q.put(item, block=False)
except queue.Full:
...

try:
data = q.get(timeout=5.0)
except queue.Empty:
...

这些操作都可以用来避免当执行某些特定队列操作时发生无限阻塞的情况,比如,一个非阻塞的 put()​ 方法和一个固定大小的队列一起使用,这样当队列已满时就可以执行不同的代码。比如输出一条日志信息并丢弃。

def producer(q):
...
try:
q.put(item, block=False)
except queue.Full:
log.warning('queued item %r discarded!', item)

如果你试图让消费者线程在执行像 q.get()​ 这样的操作时,超时自动终止以便检查终止标志,你应该使用 q.get()​ 的可选参数 timeout​ ,如下:

_running = True

def consumer(q):
while _running:
try:
item = q.get(timeout=5.0)
# Process item
...
except queue.Empty:
pass

最后,有 q.qsize()​ , q.full()​ , q.empty()​ 等实用方法可以获取一个队列的当前大小和状态。但要注意,这些方法都不是线程安全的。可能你对一个队列使用 empty()​ 判断出这个队列为空,但同时另外一个线程可能已经向这个队列中插入一个数据项。所以,你最好不要在你的代码中使用这些方法。

加锁

要在多线程程序中安全使用可变对象,你需要使用 threading 库中的 <span class="pre">Lock</span>​ 对象,就像下边这个例子这样:

import threading

class SharedCounter:
'''
A counter object that can be shared by multiple threads.
'''
def __init__(self, initial_value = 0):
self._value = initial_value
self._value_lock = threading.Lock()

def incr(self,delta=1):
'''
Increment the counter with locking
'''
with self._value_lock:
self._value += delta

def decr(self,delta=1):
'''
Decrement the counter with locking
'''
with self._value_lock:
self._value -= delta

<span class="pre">Lock</span>​ 对象和 <span class="pre">with</span>​ 语句块一起使用可以保证互斥执行,就是每次只有一个线程可以执行 with 语句包含的代码块。with 语句会在这个代码块执行前自动获取锁,在执行结束后自动释放锁。

讨论

线程调度本质上是不确定的,因此,在多线程程序中错误地使用锁机制可能会导致随机数据损坏或者其他的异常行为,我们称之为竞争条件。为了避免竞争条件,最好只在临界区(对临界资源进行操作的那部分代码)使用锁。 在一些“老的” Python 代码中,显式获取和释放锁是很常见的。下边是一个上一个例子的变种:

import threading

class SharedCounter:
'''
A counter object that can be shared by multiple threads.
'''
def __init__(self, initial_value = 0):
self._value = initial_value
self._value_lock = threading.Lock()

def incr(self,delta=1):
'''
Increment the counter with locking
'''
self._value_lock.acquire()
self._value += delta
self._value_lock.release()

def decr(self,delta=1):
'''
Decrement the counter with locking
'''
self._value_lock.acquire()
self._value -= delta
self._value_lock.release()

相比于这种显式调用的方法,with 语句更加优雅,也更不容易出错,特别是程序员可能会忘记调用 release() 方法或者程序在获得锁之后产生异常这两种情况(使用 with 语句可以保证在这两种情况下仍能正确释放锁)。 为了避免出现死锁的情况,使用锁机制的程序应该设定为每个线程一次只允许获取一个锁。如果不能这样做的话,你就需要更高级的死锁避免机制,我们将在12.5节介绍。 在 threading​ 库中还提供了其他的同步原语,比如 RLock​ 和 Semaphore​ 对象。但是根据以往经验,这些原语是用于一些特殊的情况,如果你只是需要简单地对可变对象进行锁定,那就不应该使用它们。一个 RLock​ (可重入锁)可以被同一个线程多次获取,主要用来实现基于监测对象模式的锁定和同步。在使用这种锁的情况下,当锁被持有时,只有一个线程可以使用完整的函数或者类中的方法。比如,你可以实现一个这样的 SharedCounter 类:

import threading

class SharedCounter:
'''
A counter object that can be shared by multiple threads.
'''
_lock = threading.RLock()
def __init__(self, initial_value = 0):
self._value = initial_value

def incr(self,delta=1):
'''
Increment the counter with locking
'''
with SharedCounter._lock:
self._value += delta

def decr(self,delta=1):
'''
Decrement the counter with locking
'''
with SharedCounter._lock:
self.incr(-delta)

在上边这个例子中,没有对每一个实例中的可变对象加锁,取而代之的是一个被所有实例共享的类级锁。这个锁用来同步类方法,具体来说就是,这个锁可以保证一次只有一个线程可以调用这个类方法。不过,与一个标准的锁不同的是,已经持有这个锁的方法在调用同样使用这个锁的方法时,无需再次获取锁。比如 decr 方法。 这种实现方式的一个特点是,无论这个类有多少个实例都只用一个锁。因此在需要大量使用计数器的情况下内存效率更高。不过这样做也有缺点,就是在程序中使用大量线程并频繁更新计数器时会有争用锁的问题。 信号量对象是一个建立在共享计数器基础上的同步原语。如果计数器不为0,with 语句将计数器减1,线程被允许执行。with 语句执行结束后,计数器加1。如果计数器为0,线程将被阻塞,直到其他线程结束将计数器加1。尽管你可以在程序中像标准锁一样使用信号量来做线程同步,但是这种方式并不被推荐,因为使用信号量为程序增加的复杂性会影响程序性能。 相对于简单地作为锁使用,信号量更适用于那些需要在线程之间引入信号或者限制的程序。比如,你需要限制一段代码的并发访问量,你就可以像下面这样使用信号量完成:

from threading import Semaphore
import urllib.request

# At most, five threads allowed to run at once
_fetch_url_sema = Semaphore(5)

def fetch_url(url):
with _fetch_url_sema:
return urllib.request.urlopen(url)

如果你对线程同步原语的底层理论和实现感兴趣,可以参考操作系统相关书籍,绝大多数都有提及。

防止死锁的加锁机制

(这一章节我没看)

在多线程程序中,死锁问题很大一部分是由于线程同时获取多个锁造成的。举个例子:一个线程获取了第一个锁,然后在获取第二个锁的 时候发生阻塞,那么这个线程就可能阻塞其他线程的执行,从而导致整个程序假死。 解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁,这个规则使用上下文管理器 是非常容易实现的,示例如下:

import threading
from contextlib import contextmanager

# Thread-local state to stored information on locks already acquired
_local = threading.local()

@contextmanager
def acquire(*locks):
# Sort locks by object identifier
locks = sorted(locks, key=lambda x: id(x))

# Make sure lock order of previously acquired locks is not violated
acquired = getattr(_local,'acquired',[])
if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
raise RuntimeError('Lock Order Violation')

# Acquire all of the locks
acquired.extend(locks)
_local.acquired = acquired

try:
for lock in locks:
lock.acquire()
yield
finally:
# Release locks in reverse order of acquisition
for lock in reversed(locks):
lock.release()
del acquired[-len(locks):]

如何使用这个上下文管理器呢?你可以按照正常途径创建一个锁对象,但不论是单个锁还是多个锁中都使用 <span class="pre">acquire()</span>​ 函数来申请锁, 示例如下:

import threading
x_lock = threading.Lock()
y_lock = threading.Lock()

def thread_1():
while True:
with acquire(x_lock, y_lock):
print('Thread-1')

def thread_2():
while True:
with acquire(y_lock, x_lock):
print('Thread-2')

t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()

如果你执行这段代码,你会发现它即使在不同的函数中以不同的顺序获取锁也没有发生死锁。 其关键在于,在第一段代码中,我们对这些锁进行了排序。通过排序,使得不管用户以什么样的顺序来请求锁,这些锁都会按照固定的顺序被获取。 如果有多个 <span class="pre">acquire()</span>​ 操作被嵌套调用,可以通过线程本地存储(TLS)来检测潜在的死锁问题。 假设你的代码是这样写的:

import threading
x_lock = threading.Lock()
y_lock = threading.Lock()

def thread_1():

while True:
with acquire(x_lock):
with acquire(y_lock):
print('Thread-1')

def thread_2():
while True:
with acquire(y_lock):
with acquire(x_lock):
print('Thread-2')

t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()

如果你运行这个版本的代码,必定会有一个线程发生崩溃,异常信息可能像这样:

Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/local/lib/python3.3/threading.py", line 639, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.3/threading.py", line 596, in run
self._target(*self._args, **self._kwargs)
File "deadlock.py", line 49, in thread_1
with acquire(y_lock):
File "/usr/local/lib/python3.3/contextlib.py", line 48, in __enter__
return next(self.gen)
File "deadlock.py", line 15, in acquire
raise RuntimeError("Lock Order Violation")
RuntimeError: Lock Order Violation
>>>

发生崩溃的原因在于,每个线程都记录着自己已经获取到的锁。 <span class="pre">acquire()</span>​ 函数会检查之前已经获取的锁列表, 由于锁是按照升序排列获取的,所以函数会认为之前已获取的锁的id必定小于新申请到的锁,这时就会触发异常。

讨论

死锁是每一个多线程程序都会面临的一个问题(就像它是每一本操作系统课本的共同话题一样)。根据经验来讲,尽可能保证每一个 线程只能同时保持一个锁,这样程序就不会被死锁问题所困扰。一旦有线程同时申请多个锁,一切就不可预料了。

死锁的检测与恢复是一个几乎没有优雅的解决方案的扩展话题。一个比较常用的死锁检测与恢复的方案是引入看门狗计数器。当线程正常 运行的时候会每隔一段时间重置计数器,在没有发生死锁的情况下,一切都正常进行。一旦发生死锁,由于无法重置计数器导致定时器 超时,这时程序会通过重启自身恢复到正常状态。

避免死锁是另外一种解决死锁问题的方式,在进程获取锁的时候会严格按照对象id升序排列获取,经过数学证明,这样保证程序不会进入 死锁状态。证明就留给读者作为练习了。避免死锁的主要思想是,单纯地按照对象id递增的顺序加锁不会产生循环依赖,而循环依赖是 死锁的一个必要条件,从而避免程序进入死锁状态。

下面以一个关于线程死锁的经典问题:“哲学家就餐问题”,作为本节最后一个例子。题目是这样的:五位哲学家围坐在一张桌子前,每个人 面前有一碗饭和一只筷子。在这里每个哲学家可以看做是一个独立的线程,而每只筷子可以看做是一个锁。每个哲学家可以处在静坐、 思考、吃饭三种状态中的一个。需要注意的是,每个哲学家吃饭是需要两只筷子的,这样问题就来了:如果每个哲学家都拿起自己左边的筷子, 那么他们五个都只能拿着一只筷子坐在那儿,直到饿死。此时他们就进入了死锁状态。 下面是一个简单的使用死锁避免机制解决“哲学家就餐问题”的实现:

import threading

# The philosopher thread
def philosopher(left, right):
while True:
with acquire(left,right):
print(threading.currentThread(), 'eating')

# The chopsticks (represented by locks)
NSTICKS = 5
chopsticks = [threading.Lock() for n in range(NSTICKS)]

# Create all of the philosophers
for n in range(NSTICKS):
t = threading.Thread(target=philosopher,
args=(chopsticks[n],chopsticks[(n+1) % NSTICKS]))
t.start()

最后,要特别注意到,为了避免死锁,所有的加锁操作必须使用 <span class="pre">acquire()</span>​ 函数。如果代码中的某部分绕过acquire 函数直接申请锁,那么整个死锁避免机制就不起作用了。

保存线程的状态信息

和Java ThreadLocal类似,介绍了怎么用

问题

你需要保存正在运行线程的状态,这个状态对于其他的线程是不可见的。

解决方案

有时在多线程编程中,你需要只保存当前运行线程的状态。 要这么做,可使用 <span class="pre">thread.local()</span>​ 创建一个本地线程存储对象。 对这个对象的属性的保存和读取操作都只会对执行线程可见,而其他线程并不可见。

作为使用本地存储的一个有趣的实际例子, 考虑在8.3小节定义过的 <span class="pre">LazyConnection</span>​ 上下文管理器类。 下面我们对它进行一些小的修改使得它可以适用于多线程:

from socket import socket, AF_INET, SOCK_STREAM
import threading

class LazyConnection:
def __init__(self, address, family=AF_INET, type=SOCK_STREAM):
self.address = address
self.family = AF_INET
self.type = SOCK_STREAM
self.local = threading.local()

def __enter__(self):
if hasattr(self.local, 'sock'):
raise RuntimeError('Already connected')
self.local.sock = socket(self.family, self.type)
self.local.sock.connect(self.address)
return self.local.sock

def __exit__(self, exc_ty, exc_val, tb):
self.local.sock.close()
del self.local.sock

代码中,自己观察对于 <span class="pre">self.local</span>​ 属性的使用。 它被初始化为一个 <span class="pre">threading.local()</span>​ 实例。 其他方法操作被存储为 <span class="pre">self.local.sock</span>​ 的套接字对象。 有了这些就可以在多线程中安全的使用 <span class="pre">LazyConnection</span>​ 实例了。例如:

from functools import partial
def test(conn):
with conn as s:
s.send(b'GET /index.html HTTP/1.0\r\n')
s.send(b'Host: www.python.org\r\n')

s.send(b'\r\n')
resp = b''.join(iter(partial(s.recv, 8192), b''))

print('Got {} bytes'.format(len(resp)))

if __name__ == '__main__':
conn = LazyConnection(('www.python.org', 80))

t1 = threading.Thread(target=test, args=(conn,))
t2 = threading.Thread(target=test, args=(conn,))
t1.start()
t2.start()
t1.join()
t2.join()

它之所以行得通的原因是每个线程会创建一个自己专属的套接字连接(存储为self.local.sock)。 因此,当不同的线程执行套接字操作时,由于操作的是不同的套接字,因此它们不会相互影响。

讨论

在大部分程序中创建和操作线程特定状态并不会有什么问题。 不过,当出了问题的时候,通常是因为某个对象被多个线程使用到,用来操作一些专用的系统资源, 比如一个套接字或文件。你不能让所有线程共享一个单独对象, 因为多个线程同时读和写的时候会产生混乱。 本地线程存储通过让这些资源只能在被使用的线程中可见来解决这个问题。

本节中,使用 <span class="pre">thread.local()</span>​ 可以让 <span class="pre">LazyConnection</span>​ 类支持一个线程一个连接, 而不是对于所有的进程都只有一个连接。

其原理是,每个 <span class="pre">threading.local()</span>​ 实例为每个线程维护着一个单独的实例字典。 所有普通实例操作比如获取、修改和删除值仅仅操作这个字典。 每个线程使用一个独立的字典就可以保证数据的隔离了。

创建线程池

concurrent.futures​ 函数库有一个 ThreadPoolExecutor​ 类可以被用来完成这个任务。 下面是一个简单的TCP服务器,使用了一个线程池来响应客户端:

from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor

def echo_client(sock, client_addr):
'''
Handle a client connection
'''
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
sock.close()

def echo_server(addr):
pool = ThreadPoolExecutor(128)
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
pool.submit(echo_client, client_sock, client_addr)

echo_server(('',15000))

如果你想手动创建你自己的线程池, 通常可以使用一个Queue来轻松实现。下面是一个稍微不同但是手动实现的例子:

from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_client(q):
'''
Handle a client connection
'''
sock, client_addr = q.get()
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')

sock.close()

def echo_server(addr, nworkers):
# Launch the client workers
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()

# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))

echo_server(('',15000), 128)

使用 <span class="pre">ThreadPoolExecutor</span>​ 相对于手动实现的一个好处在于它使得 任务提交者更方便的从被调用函数中获取返回值。例如,你可能会像下面这样写:

from concurrent.futures import ThreadPoolExecutor
import urllib.request

def fetch_url(url):
u = urllib.request.urlopen(url)
data = u.read()
return data

pool = ThreadPoolExecutor(10)
# Submit work to the pool
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')

# Get the results back
x = a.result()
y = b.result()

例子中返回的handle对象会帮你处理所有的阻塞与协作,然后从工作线程中返回数据给你。 特别的,<span class="pre">a.result()</span>​ 操作会阻塞进程直到对应的函数执行完成并返回一个结果。

讨论

通常来讲,你应该避免编写线程数量可以无限制增长的程序。例如,看看下面这个服务器:

from threading import Thread
from socket import socket, AF_INET, SOCK_STREAM

def echo_client(sock, client_addr):
'''
Handle a client connection
'''
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
sock.close()

def echo_server(addr, nworkers):
# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
t = Thread(target=echo_client, args=(client_sock, client_addr))
t.daemon = True
t.start()

echo_server(('',15000))

尽管这个也可以工作, 但是它不能抵御有人试图通过创建大量线程让你服务器资源枯竭而崩溃的攻击行为。 通过使用预先初始化的线程池,你可以设置同时运行线程的上限数量。

你可能会关心创建大量线程会有什么后果。 现代操作系统可以很轻松的创建几千个线程的线程池。 甚至,同时几千个线程等待工作并不会对其他代码产生性能影响。 当然了,如果所有线程同时被唤醒并立即在CPU上执行,那就不同了——特别是有了全局解释器锁GIL。 通常,你应该只在I/O处理相关代码中使用线程池。

创建大的线程池的一个可能需要关注的问题是内存的使用。 例如,如果你在OS X系统上面创建2000个线程,系统显示Python进程使用了超过9GB的虚拟内存。 不过,这个计算通常是有误差的。当创建一个线程时,操作系统会预留一个虚拟内存区域来 放置线程的执行栈(通常是8MB大小)。但是这个内存只有一小片段被实际映射到真实内存中。 因此,Python进程使用到的真实内存其实很小 (比如,对于2000个线程来讲,只使用到了70MB的真实内存,而不是9GB)。 如果你担心虚拟内存大小,可以使用 <span class="pre">threading.stack_size()</span>​ 函数来降低它。例如:

import threading
threading.stack_size(65536)

如果你加上这条语句并再次运行前面的创建2000个线程试验, 你会发现Python进程只使用到了大概210MB的虚拟内存,而真实内存使用量没有变。 注意线程栈大小必须至少为32768字节,通常是系统内存页大小(4096、8192等)的整数倍。

简单并行编程

你有个程序要执行CPU密集型工作,你想让他利用多核CPU的优势来运行的快一点。

解决方案

concurrent.futures​ 库提供了一个 ProcessPoolExecutor​ 类, 可被用来在一个单独的Python解释器中执行计算密集型函数。 不过,要使用它,你首先要有一些计算密集型的任务。 我们通过一个简单而实际的例子来演示它。假定你有个Apache web服务器日志目录的gzip压缩包:

logs/
20120701.log.gz
20120702.log.gz
20120703.log.gz
20120704.log.gz
20120705.log.gz
20120706.log.gz
...

进一步假设每个日志文件内容类似下面这样:

124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...

下面是一个脚本,在这些日志文件中查找出所有访问过robots.txt文件的主机:

# findrobots.py

import gzip
import io
import glob

def find_robots(filename):
'''
Find all of the hosts that access robots.txt in a single log file
'''
robots = set()
with gzip.open(filename) as f:
for line in io.TextIOWrapper(f,encoding='ascii'):
fields = line.split()
if fields[6] == '/robots.txt':
robots.add(fields[0])
return robots

def find_all_robots(logdir):
'''
Find all hosts across and entire sequence of files
'''
files = glob.glob(logdir+'/*.log.gz')
all_robots = set()
for robots in map(find_robots, files):
all_robots.update(robots)
return all_robots

if __name__ == '__main__':
robots = find_all_robots('logs')
for ipaddr in robots:
print(ipaddr)

前面的程序使用了通常的map-reduce风格来编写。 函数 <span class="pre">find_robots()</span>​ 在一个文件名集合上做map操作,并将结果汇总为一个单独的结果, 也就是 <span class="pre">find_all_robots()</span>​ 函数中的 <span class="pre">all_robots</span>​ 集合。 现在,假设你想要修改这个程序让它使用多核CPU。 很简单——只需要将map()操作替换为一个 <span class="pre">concurrent.futures</span>​ 库中生成的类似操作即可。 下面是一个简单修改版本:

# findrobots.py

import gzip
import io
import glob
from concurrent import futures

def find_robots(filename):
'''
Find all of the hosts that access robots.txt in a single log file

'''
robots = set()
with gzip.open(filename) as f:
for line in io.TextIOWrapper(f,encoding='ascii'):
fields = line.split()
if fields[6] == '/robots.txt':
robots.add(fields[0])
return robots

def find_all_robots(logdir):
'''
Find all hosts across and entire sequence of files
'''
files = glob.glob(logdir+'/*.log.gz')
all_robots = set()
with futures.ProcessPoolExecutor() as pool:
for robots in pool.map(find_robots, files):
all_robots.update(robots)
return all_robots

if __name__ == '__main__':
robots = find_all_robots('logs')
for ipaddr in robots:
print(ipaddr)

通过这个修改后,运行这个脚本产生同样的结果,但是在四核机器上面比之前快了3.5倍。 实际的性能优化效果根据你的机器CPU数量的不同而不同。

讨论

<span class="pre">ProcessPoolExecutor</span>​ 的典型用法如下:

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as pool:
...
do work in parallel using pool
...

其原理是,一个 <span class="pre">ProcessPoolExecutor</span>​ 创建N个独立的Python解释器, N是系统上面可用CPU的个数。你可以通过提供可选参数给 <span class="pre">ProcessPoolExecutor(N)</span>​ 来修改 处理器数量。这个处理池会一直运行到with块中最后一个语句执行完成, 然后处理池被关闭。不过,程序会一直等待直到所有提交的工作被处理完成。

被提交到池中的工作必须被定义为一个函数。有两种方法去提交。 如果你想让一个列表推导或一个 <span class="pre">map()</span>​ 操作并行执行的话,可使用 <span class="pre">pool.map()</span>​ :

# A function that performs a lot of work
def work(x):
...
return result

# Nonparallel code
results = map(work, data)

# Parallel implementation
with ProcessPoolExecutor() as pool:
results = pool.map(work, data)

另外,你可以使用 <span class="pre">pool.submit()</span>​ 来手动的提交单个任务:

# Some function
def work(x):
...
return result

with ProcessPoolExecutor() as pool:
...
# Example of submitting work to the pool
future_result = pool.submit(work, arg)

# Obtaining the result (blocks until done)
r = future_result.result()
...

如果你手动提交一个任务,结果是一个 <span class="pre">Future</span>​ 实例。 要获取最终结果,你需要调用它的 <span class="pre">result()</span>​ 方法。 它会阻塞进程直到结果被返回来。

如果不想阻塞,你还可以使用一个回调函数,例如:

def when_done(r):
print('Got:', r.result())

with ProcessPoolExecutor() as pool:
future_result = pool.submit(work, arg)
future_result.add_done_callback(when_done)

回调函数接受一个 <span class="pre">Future</span>​ 实例,被用来获取最终的结果(比如通过调用它的result()方法)。 尽管处理池很容易使用,在设计大程序的时候还是有很多需要注意的地方,如下几点:

  • 这种并行处理技术只适用于那些可以被分解为互相独立部分的问题。
  • 被提交的任务必须是简单函数形式。对于方法、闭包和其他类型的并行执行还不支持。
  • 函数参数和返回值必须兼容pickle,因为要使用到进程间的通信,所有解释器之间的交换数据必须被序列化
  • 被提交的任务函数不应保留状态或有副作用。除了打印日志之类简单的事情,

一旦启动你不能控制子进程的任何行为,因此最好保持简单和纯洁——函数不要去修改环境。

  • 在Unix上进程池通过调用 <span class="pre">fork()</span>​ 系统调用被创建,

它会克隆Python解释器,包括fork时的所有程序状态。 而在Windows上,克隆解释器时不会克隆状态。 实际的fork操作会在第一次调用 <span class="pre">pool.map()</span>​ 或 <span class="pre">pool.submit()</span>​ 后发生。

  • 当你混合使用进程池和多线程的时候要特别小心。

你应该在创建任何线程之前先创建并激活进程池(比如在程序启动的main线程中创建进程池)。

Py的全局锁问题

尽管Python完全支持多线程编程, 但是解释器的C语言实现部分在完全并行执行时并不是线程安全的。 实际上,解释器被一个全局解释器锁保护着,它确保任何时候都只有一个Python线程执行。 GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势 (比如一个使用了多个线程的计算密集型程序只会在一个单CPU上面运行)。

在讨论普通的GIL之前,有一点要强调的是GIL只会影响到那些严重依赖CPU的程序(比如计算型的)。 如果你的程序大部分只会涉及到I/O,比如网络交互,那么使用多线程就很合适, 因为它们大部分时间都在等待。实际上,你完全可以放心的创建几千个Python线程, 现代操作系统运行这么多线程没有任何压力,没啥可担心的。

而对于依赖CPU的程序,你需要弄清楚执行的计算的特点。 例如,优化底层算法要比使用多线程运行快得多。 类似的,由于Python是解释执行的,如果你将那些性能瓶颈代码移到一个C语言扩展模块中, 速度也会提升的很快。如果你要操作数组,那么使用NumPy这样的扩展会非常的高效。 最后,你还可以考虑下其他可选实现方案,比如PyPy,它通过一个JIT编译器来优化执行效率 (不过在写这本书的时候它还不能支持Python 3)。

还有一点要注意的是,线程不是专门用来优化性能的。 一个CPU依赖型程序可能会使用线程来管理一个图形用户界面、一个网络连接或其他服务。 这时候,GIL会产生一些问题,因为如果一个线程长期持有GIL的话会导致其他非CPU型线程一直等待。 事实上,一个写的不好的C语言扩展会导致这个问题更加严重, 尽管代码的计算部分会比之前运行的更快些。

说了这么多,现在想说的是我们有两种策略来解决GIL的缺点。 首先,如果你完全工作于Python环境中,你可以使用 <span class="pre">multiprocessing</span>​ 模块来创建一个进程池, 并像协同处理器一样的使用它。例如,假如你有如下的线程代码:

# Performs a large calculation (CPU bound)
def some_work(args):
...
return result

# A thread that calls the above function
def some_thread():
while True:
...
r = some_work(args)
...

修改代码,使用进程池:

# Processing pool (see below for initiazation)
pool = None

# Performs a large calculation (CPU bound)
def some_work(args):
...
return result

# A thread that calls the above function
def some_thread():
while True:
...
r = pool.apply(some_work, (args))
...

# Initiaze the pool
if __name__ == '__main__':
import multiprocessing
pool = multiprocessing.Pool()

这个通过使用一个技巧利用进程池解决了GIL的问题。 当一个线程想要执行CPU密集型工作时,会将任务发给进程池。 然后进程池会在另外一个进程中启动一个单独的Python解释器来工作。 当线程等待结果的时候会释放GIL。 并且,由于计算任务在单独解释器中执行,那么就不会受限于GIL了。 在一个多核系统上面,你会发现这个技术可以让你很好的利用多CPU的优势。

另外一个解决GIL的策略是使用C扩展编程技术。 主要思想是将计算密集型任务转移给C,跟Python独立,在工作的时候在C代码中释放GIL。 这可以通过在C代码中插入下面这样的特殊宏来完成:

#include "Python.h"
...

PyObject *pyfunc(PyObject *self, PyObject *args) {
...
Py_BEGIN_ALLOW_THREADS
// Threaded C code
...
Py_END_ALLOW_THREADS
...
}

如果你使用其他工具访问C语言,比如对于Cython的ctypes库,你不需要做任何事。 例如,ctypes在调用C时会自动释放GIL。

讨论

许多程序员在面对线程性能问题的时候,马上就会怪罪GIL,什么都是它的问题。 其实这样子太不厚道也太天真了点。 作为一个真实的例子,在多线程的网络编程中神秘的 <span class="pre">stalls</span>​ 可能是因为其他原因比如一个DNS查找延时,而跟GIL毫无关系。 最后你真的需要先去搞懂你的代码是否真的被GIL影响到。 同时还要明白GIL大部分都应该只关注CPU的处理而不是I/O.

如果你准备使用一个处理器池,注意的是这样做涉及到数据序列化和在不同Python解释器通信。 被执行的操作需要放在一个通过def语句定义的Python函数中,不能是lambda、闭包可调用实例等, 并且函数参数和返回值必须要兼容pickle。 同样,要执行的任务量必须足够大以弥补额外的通信开销。

另外一个难点是当混合使用线程和进程池的时候会让你很头疼。 如果你要同时使用两者,最好在程序启动时,创建任何线程之前先创建一个单例的进程池。 然后线程使用同样的进程池来进行它们的计算密集型工作。

C扩展最重要的特征是它们和Python解释器是保持独立的。 也就是说,如果你准备将Python中的任务分配到C中去执行, 你需要确保C代码的操作跟Python保持独立, 这就意味着不要使用Python数据结构以及不要调用Python的C API。 另外一个就是你要确保C扩展所做的工作是足够的,值得你这样做。 也就是说C扩展担负起了大量的计算任务,而不是少数几个计算。

这些解决GIL的方案并不能适用于所有问题。 例如,某些类型的应用程序如果被分解为多个进程处理的话并不能很好的工作, 也不能将它的部分代码改成C语言执行。 对于这些应用程序,你就要自己需求解决方案了 (比如多进程访问共享内存区,多解析器运行于同一个进程等)。 或者,你还可以考虑下其他的解释器实现,比如PyPy。

了解更多关于在C扩展中释放GIL,请参考15.7和15.10小节。

Actor模式

actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。 事实上,它天生的简单性是它如此受欢迎的重要原因之一。 简单来讲,一个actor就是一个并发执行的任务,只是简单的执行发送给它的消息任务。 响应这些消息时,它可能还会给其他actor发送更进一步的消息。 actor之间的通信是单向和异步的。因此,消息发送者不知道消息是什么时候被发送, 也不会接收到一个消息已被处理的回应或通知。

结合使用一个线程和一个队列可以很容易的定义actor,例如:

from queue import Queue
from threading import Thread, Event

# Sentinel used for shutdown
class ActorExit(Exception):
pass

class Actor:
def __init__(self):
self._mailbox = Queue()

def send(self, msg):
'''
Send a message to the actor
'''
self._mailbox.put(msg)

def recv(self):
'''
Receive an incoming message
'''
msg = self._mailbox.get()
if msg is ActorExit:
raise ActorExit()
return msg

def close(self):
'''
Close the actor, thus shutting it down
'''
self.send(ActorExit)

def start(self):
'''
Start concurrent execution
'''
self._terminated = Event()
t = Thread(target=self._bootstrap)

t.daemon = True
t.start()

def _bootstrap(self):
try:
self.run()
except ActorExit:
pass
finally:
self._terminated.set()

def join(self):
self._terminated.wait()

def run(self):
'''
Run method to be implemented by the user
'''
while True:
msg = self.recv()

# Sample ActorTask
class PrintActor(Actor):
def run(self):
while True:
msg = self.recv()
print('Got:', msg)

# Sample use
p = PrintActor()
p.start()
p.send('Hello')
p.send('World')
p.close()
p.join()

这个例子中,你使用actor实例的 <span class="pre">send()</span>​ 方法发送消息给它们。 其机制是,这个方法会将消息放入一个队里中, 然后将其转交给处理被接受消息的一个内部线程。 <span class="pre">close()</span>​ 方法通过在队列中放入一个特殊的哨兵值(ActorExit)来关闭这个actor。 用户可以通过继承Actor并定义实现自己处理逻辑run()方法来定义新的actor。 <span class="pre">ActorExit</span>​ 异常的使用就是用户自定义代码可以在需要的时候来捕获终止请求 (异常被get()方法抛出并传播出去)。

如果你放宽对于同步和异步消息发送的要求, 类actor对象还可以通过生成器来简化定义。例如:

def print_actor():
while True:

try:
msg = yield # Get a message
print('Got:', msg)
except GeneratorExit:
print('Actor terminating')

# Sample use
p = print_actor()
next(p) # Advance to the yield (ready to receive)
p.send('Hello')
p.send('World')
p.close()

讨论

actor模式的魅力就在于它的简单性。 实际上,这里仅仅只有一个核心操作 <span class="pre">send()</span>​ . 甚至,对于在基于actor系统中的“消息”的泛化概念可以已多种方式被扩展。 例如,你可以以元组形式传递标签消息,让actor执行不同的操作,如下:

class TaggedActor(Actor):
def run(self):
while True:
tag, *payload = self.recv()
getattr(self,'do_'+tag)(*payload)

# Methods correponding to different message tags
def do_A(self, x):
print('Running A', x)

def do_B(self, x, y):
print('Running B', x, y)

# Example
a = TaggedActor()
a.start()
a.send(('A', 1)) # Invokes do_A(1)
a.send(('B', 2, 3)) # Invokes do_B(2,3)
a.close()
a.join()

作为另外一个例子,下面的actor允许在一个工作者中运行任意的函数, 并且通过一个特殊的Result对象返回结果:

from threading import Event
class Result:
def __init__(self):
self._evt = Event()
self._result = None

def set_result(self, value):
self._result = value

self._evt.set()

def result(self):
self._evt.wait()
return self._result

class Worker(Actor):
def submit(self, func, *args, **kwargs):
r = Result()
self.send((func, args, kwargs, r))
return r

def run(self):
while True:
func, args, kwargs, r = self.recv()
r.set_result(func(*args, **kwargs))

# Example use
worker = Worker()
worker.start()
r = worker.submit(pow, 2, 3)
worker.close()
worker.join()
print(r.result())

最后,“发送”一个任务消息的概念可以被扩展到多进程甚至是大型分布式系统中去。 例如,一个类actor对象的 send()​ 方法可以被编程让它能在一个套接字连接上传输数据 或通过某些消息中间件(比如AMQP、ZMQ等)来发送。

实现消息发布/订阅模型

TODO

使用生成器代替线程

TODO

多个线程队列轮询

TODO

在Unix系统上面启动守护线程

TODO

Ref

Python CookBook

文章作者: Met Guo
文章链接: https://guoyujian.github.io/2024/03/08/Python-%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B%E5%B0%8F%E7%BB%931/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Gmet's Blog