Reactor Pattern in Python
在上一篇筆記中,提到了如何利用 non-blocking I/O 與 I/O multiplexing 來實作一個 concurrent socket server。但文末也提到,如果要根據不同 file descriptor 做不同的處理,必須在 event loop 裡寫下一長串的 if conditions 的問題:
while read_waiting or write_waiting: # event loop
rs, ws, es = select(read_waiting, write_waiting, [])
for fd in rs:
if fd == ...:
# ...
elif fd == ...:
# ...
elif fd == ...:
# ...
else:
# ...
for fd in ws:
if fd == ...:
# ...
elif fd == ...:
# ...
elif fd == ...:
# ...
else:
# ...
這是由於 event loop 跟多個 file descriptors 的處理邏輯混雜在一起的緣故。為了解決這個問題,我們要把執行 event loop 跟處理 event 的職責分開。
Implementing Dispatcher
首先,讓我們把 event loop 的執行者稱作 Dispatcher
吧。因為它的職責是負責在 event 發生時,交給適當的 event handler 處理。另外,它也要提供註冊、註銷 event handler 的 interface:
# file: dispatcher.py
from itertools import imap
from select import select
class Dispatcher(object):
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = object.__new__(cls, *args, **kwargs)
cls._instance._handlers = {'read': {}, 'write': {}}
return cls._instance
def register(self, handler, event):
fd = handler.get_handle().fileno()
assert fd not in self._handlers[event]
self._handlers[event][fd] = handler
def unregister(self, handler, event):
fd = handler.get_handle().fileno()
del self._handlers[event][fd]
def unregister_all(self, handler):
fd = handler.get_handle().fileno()
for handlers in self._handlers.itervalues():
try:
del handlers[fd]
except KeyError:
pass
def run(self):
notify = self._notify
read_handlers = self._handlers['read']
write_handlers = self._handlers['write']
while read_handlers or write_handlers:
rs, ws, es = select(read_handlers, write_handlers, [])
notify(rs, 'read')
notify(ws, 'write')
def _notify(self, fds, event):
for handler in imap(self._handlers[event].get, fds):
handler.handle_event(event)
這裡 Dispatcher
利用 __new__()
實作成一個簡易的 singleton。
register()
與 unregister()
分別是註冊與註銷 event 與對應 handler 的操作。在註冊的同時也會檢查這個 file descriptor 是不是只由這個 handler 負責處理當前註冊的 event。而 unregister_all()
則是用來清掉所有 handler 註冊的 events。這在某個 file descriptor 已經不再需要被監聽的時候很好用。
最後,run()
則是用來啟動 event loop 的。在 event loop 中,會利用 select
找出 ready for reading/writing 的 file descriptors,並利用 _notify()
通知對應的 handler。
Defining Event Handler
從 Dispatcher
的定義可以看到,event handler 需要提供 get_handle()
來取得它想監聽的 handle,也需要提供 handle_event(event)
來通知 handler:
# file: handler.py
import abc
from dispatcher import Dispatcher
class EventHandler(object):
__metaclass__ = abc.ABCMeta
def __init__(self, handle):
self._handle = handle
self._dispatcher = Dispatcher()
def get_handle(self):
return self._handle
@abc.abstractmethod
def handle_event(self, event):
pass
def _register(self, event):
self._dispatcher.register(self, event)
def _unregister(self, event):
self._dispatcher.unregister(self, event)
def _close(self):
self._dispatcher.unregister_all(self)
self._handle.close()
這裡定義了三個 helper methods 供 subclass 使用。_register()
跟 _unregister()
是用來方便把自己註冊/註銷給 dispatcher 的。而 _close()
則會關閉 handler 持有的 handle,並向 dispatcher 註銷相關的所有 events。
Implementing the Echo Server
於是,我們可以來實作我們自己的 handler 了。這裡依舊以先前的 multi-client echo server 為例。
首先是 ClientHandler
,它會負責在 client ready for reading 的時候從 client 接收訊息(_try_recv()
),並在 ready for writing 的時候把訊息回覆給 client(_try_send()
):
# file: echo_server.py
# ...
class ClientHandler(EventHandler):
def __init__(self, sock):
self._buf = ''
sock.setblocking(0)
super(ClientHandler, self).__init__(sock)
self._register('read')
def handle_event(self, event):
assert event in ('read', 'write')
if event == 'read':
self._try_recv()
elif event == 'write':
self._try_send()
def _try_recv(self):
try:
sock = self._handle
addr = sock.getpeername()
data = sock.recv(BUF_SIZE)
print('* Received "{}" from {}:{}'.format(data, *addr))
if data:
print(' Sending data back to client')
if not self._buf:
self._register('write')
self._buf += data
self._try_send()
else:
print(' Closing connection')
self._close()
except socket.error as ex:
if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
def _try_send(self):
try:
sent = self._handle.send(self._buf[:BUF_SIZE])
self._buf = self._buf[sent:]
if not self._buf:
self._unregister('write')
except socket.error as ex:
if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
# ...
可以看到它在 __init__()
裡把自己註冊為 client ready for reading 的 handler,並會在連線中斷的時候把自己註銷。_try_recv()
會在 buffer 有資料時註冊監聽 client ready for writing 的 event,而 _try_send()
則會在 buffer 中的訊息已全部送出的情況下註銷 ready for writing event。
然後是 ClientAcceptor
,它會在 ready for reading 的時候呼叫 socket.accept()
接受新的 connection:
# file: echo_server.py
# ...
class ClientAcceptor(EventHandler):
def __init__(self, host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((host, port))
sock.listen(1)
sock.setblocking(0)
print('* Running on {}:{}'.format(host, port))
super(ClientAcceptor, self).__init__(sock)
self._register('read')
def handle_event(self, event):
assert event == 'read'
try:
conn, addr = self._handle.accept()
ClientHandler(conn)
print('* Connected by {}'.format(addr))
except socket.error as ex:
if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
# ...
同樣的,它會在 __init__()
裡把自己註冊給 dispatcher。
另外,可以看到在接受新的連線之後,它只用 conn
建立一個 ClientHandler
就丟著不管它了。這是因為 ClientHandler
會自行註冊給 dispatcher 的緣故。
於是現在的 main()
就變得很簡單了:
# file: echo_server.py
# ...
def main():
ClientAcceptor(HOST, PORT)
Dispatcher().run()
# ...
Reactor Pattern
這種將 event demultiplexing/dispatching 與 event handling 分離的方式被稱為 Reactor pattern。從程式結構來看,它包含以下幾個 components:
- Handle:由 OS 管理的 resource references。如 socket、file、timer 等等。
- Synchronous Event Demultiplexer:用以等候一組 handles 的 event 發生。在這篇筆記的範例中用的是
select
。 - Initiation Dispatcher (Reactor):定義註冊/註銷 event handler 的 interface,並負責在 event 發生時通知適當的 event handler。即是範例中的
Dispatcher
。 - Event Handler:定義處理 event 的 interface。即是範例的
EventHandler
。 - Concrete Event Handler:負責實現 event 發生時的對應處理。即是範例的
ClientAcceptor
與ClientHandler
。
而這些 components 是這樣運作的:
- [step 1-3] client 為特定 event(如 ready for reading/writing)將 handler 註冊給 dispatcher。註冊時,dispatcher 會跟 handler 要求 handler 想監聽的 handle。
- [step 4-5] 註冊完所有的 handler 之後,dispatcher 啟動它的 event loop。在 event loop 中,會透過 demultiplexer 監聽所有已註冊對應 handler 的 handle 是否有 event 發生。
- [step 6-7] 當 event 發生時,dispatcher 會通知對應的 handler 進行處理。
實際上,Python 內建的 asyncore
module 已經提供了 socket 的 Reactor pattern 實作。只不過,它的名稱有些混淆:它的 loop()
實際上是上述的 dispatcher,而它的 dispatcher
則比較接近上述 event handler 的角色。雖然它的實作方式跟這篇筆記不大相同,但基本上觀念是一樣的。使用方式可以參考它 echo server 的範例。
Implementation Considerations
Single or Multiple Dispatchers
在上面的範例中,dispatcher 被實作成一個 singleton。這樣的設計在大多情況下應該就足夠了,但有時候還是會有需要多個 dispatcher 的情況。譬如說,每個 dispatcher 都可以在其專屬的 process 或 thread 上執行,以盡可能地善用多核心資源。
這時也可以考慮效仿 Event Handling with Python 裡頭提到的 HandlerManager
:宣告一個 global 層級的 instance,只需要一個 dispatcher 的情況就使用這個 instance,有需要的時候再自行建立新的 instance 使用。
class Dispatcher(object):
def __init__(self):
# ...
# ...
dispatcher = Dispatcher()
Event Handling Interface
在實作 echo server 時,我們的 event handlers 全都是 abstract class EventHandler
的 subclasses。這樣做是為了把 subclasses 共有的 methods(get_handle()
、_register()
、_unregister()
跟 _close()
)抽出來,還有確保所有 handler 都有實作 handle_event()
。當然在 Python 這種 dynamic typing 的語言,實際上不需要強制 handlers 一定要是某個 class 的 descendants,只要 interface 接得上去就可以了。
另外,這裡的 handler 都是以 handle_event(event)
的形式接受所有 event 的通知(single-method interface)。另一種可能的實作方式是以不同的 method 處理不同的 event(multi-method interface):
class MyHandler(object):
# ...
def handle_read(self):
# perform read operation
# ...
def handle_write(self):
# perform write operation
# ...
# ...
這可以避免 single-method interface 還要自己根據 event 決定對應的處理方式。內建的 asyncore
module 就是採用 multi-method interface 的實作。
event handler 也不見得要設計成 class,也可以只是一個 callback function。但因為 callback function 自身並不知道自己監聽的 handle 是什麼,因此註冊/註銷、以及通知 handler 的時候也要一併提供 handle 或 file descriptor 給 dispatcher:
def handle_event(event, sock):
# ...
def main():
# ...
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# ...
dispatcher = Dispatcher()
dispatcher.register(handle_event, 'read', sock)
# ...
Handler Registration
在我們實作的版本中,註冊 handler 時都要一併傳遞想要監聽的 event type。但是在 asyncore
的實作中,註冊 handler 時則不需指定監聽的 event type。它會在 event loop 的每一個 iteration 都去詢問 handler 是否要監聽 ready for reading/writing 的 event,並加入對應的 waiting list 中。以下節錄其原始碼:
def poll(timeout=0.0, map=None):
if map is None:
map = socket_map
if map:
r = []; w = []; e = []
for fd, obj in map.items():
is_r = obj.readable()
is_w = obj.writable()
if is_r:
r.append(fd)
# accepting sockets should not be writable
if is_w and not obj.accepting:
w.append(fd)
if is_r or is_w:
e.append(fd)
if [] == r == w == e:
time.sleep(timeout)
return
try:
r, w, e = select.select(r, w, e, timeout)
except select.error, err:
if err.args[0] != EINTR:
raise
else:
return
# ...
這個 poll()
的內容就是 event loop 每一個 iteration 實際執行的程式碼。這裡的 map
存的是以 file descriptor(fd
)為 key、以 event handler(obj
)為 value 的 dictionary。可以看到它分別利用 obj.readable()
與 obj.writable()
來判斷是否將 handler 加入 reading 或 writing 的 waiting list(r
跟 w
)裡頭。
Synchronous Event Demultiplexer
最後要提的是,現有的 Reactor 實作所採用的 synchronous event demultiplexer 其實通常不是 select
。這是因為在不同 operating system 上,都各自有更有效率的做法。譬如說,Linux 通常用的是 epoll
,而 BSD 則是 kqueue
。因此 Reactor 的實作通常會把這部分封裝起來,在不同平台上自動選擇效率最好的 demultiplexer。
至於這些方法有什麼差別,就等之後有機會再來討論吧。
Summary
延續上一篇筆記,這篇筆記介紹了如何以 Reactor pattern 將 event handling 的處理邏輯從 event demultiplexing/dispatching 的部分分離。於是,現在 event loop 不必知道針對不同 handle 的處理邏輯,也能夠將不同 handles 切割成獨立的 event handlers 處理,使得擴充或修改功能只會影響到個別的 handlers。
這個 pattern 現在已經被廣泛利用在 event-driven applications 中,各個程式語言也都有許多成熟的實作品可供使用,如 C 語言的 libevent 跟 libev、C++ 的 ACE Reactor Framework、Python 內建的 asyncore
與 twisted 等等。
本篇的討論主要基於 Douglas C. Schmidt 所撰寫的 Reactor 這篇近二十年前的論文,裡頭的範例採用 C++ 實作(作者本人就是 ACE 的開發者)。想看看更詳細的討論可以找來翻翻。