February 19, 2015 · Python Concurrency Design Pattern

Reactor Pattern in Python

上一篇筆記中,提到了如何利用 non-blocking I/O 與 I/O multiplexing 來實作一個 concurrent socket server。但文末也提到,如果要根據不同 file descriptor 做不同的處理,必須在 event loop 裡寫下一長串的 if conditions 的問題:

while read_waiting or write_waiting:  # event loop  
    rs, ws, es = select(read_waiting, write_waiting, [])
    for fd in rs:
        if fd == ...:
            # ...
        elif fd == ...:
            # ...
        elif fd == ...:
            # ...
        else:
            # ...

    for fd in ws:
        if fd == ...:
            # ...
        elif fd == ...:
            # ...
        elif fd == ...:
            # ...
        else:
            # ...

這是由於 event loop 跟多個 file descriptors 的處理邏輯混雜在一起的緣故。為了解決這個問題,我們要把執行 event loop 跟處理 event 的職責分開。

Implementing Dispatcher

首先,讓我們把 event loop 的執行者稱作 Dispatcher 吧。因為它的職責是負責在 event 發生時,交給適當的 event handler 處理。另外,它也要提供註冊、註銷 event handler 的 interface:

# file: dispatcher.py

from itertools import imap  
from select import select


class Dispatcher(object):

    _instance = None

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = object.__new__(cls, *args, **kwargs)
            cls._instance._handlers = {'read': {}, 'write': {}}

        return cls._instance

    def register(self, handler, event):
        fd = handler.get_handle().fileno()
        assert fd not in self._handlers[event]
        self._handlers[event][fd] = handler

    def unregister(self, handler, event):
        fd = handler.get_handle().fileno()
        del self._handlers[event][fd]

    def unregister_all(self, handler):
        fd = handler.get_handle().fileno()
        for handlers in self._handlers.itervalues():
            try:
                del handlers[fd]
            except KeyError:
                pass

    def run(self):
        notify = self._notify
        read_handlers = self._handlers['read']
        write_handlers = self._handlers['write']
        while read_handlers or write_handlers:
            rs, ws, es = select(read_handlers, write_handlers, [])
            notify(rs, 'read')
            notify(ws, 'write')

    def _notify(self, fds, event):
        for handler in imap(self._handlers[event].get, fds):
            handler.handle_event(event)

這裡 Dispatcher 利用 __new__() 實作成一個簡易的 singleton

register()unregister() 分別是註冊與註銷 event 與對應 handler 的操作。在註冊的同時也會檢查這個 file descriptor 是不是只由這個 handler 負責處理當前註冊的 event。而 unregister_all() 則是用來清掉所有 handler 註冊的 events。這在某個 file descriptor 已經不再需要被監聽的時候很好用。

最後,run() 則是用來啟動 event loop 的。在 event loop 中,會利用 select 找出 ready for reading/writing 的 file descriptors,並利用 _notify() 通知對應的 handler。

Defining Event Handler

Dispatcher 的定義可以看到,event handler 需要提供 get_handle() 來取得它想監聽的 handle,也需要提供 handle_event(event) 來通知 handler:

# file: handler.py

import abc

from dispatcher import Dispatcher


class EventHandler(object):

    __metaclass__ = abc.ABCMeta

    def __init__(self, handle):
        self._handle = handle
        self._dispatcher = Dispatcher()

    def get_handle(self):
        return self._handle

    @abc.abstractmethod
    def handle_event(self, event):
        pass

    def _register(self, event):
        self._dispatcher.register(self, event)

    def _unregister(self, event):
        self._dispatcher.unregister(self, event)

    def _close(self):
        self._dispatcher.unregister_all(self)
        self._handle.close()

這裡定義了三個 helper methods 供 subclass 使用。_register()_unregister() 是用來方便把自己註冊/註銷給 dispatcher 的。而 _close() 則會關閉 handler 持有的 handle,並向 dispatcher 註銷相關的所有 events。

Implementing the Echo Server

於是,我們可以來實作我們自己的 handler 了。這裡依舊以先前的 multi-client echo server 為例。

首先是 ClientHandler,它會負責在 client ready for reading 的時候從 client 接收訊息(_try_recv()),並在 ready for writing 的時候把訊息回覆給 client(_try_send()):

# file: echo_server.py

# ...

class ClientHandler(EventHandler):

    def __init__(self, sock):
        self._buf = ''
        sock.setblocking(0)

        super(ClientHandler, self).__init__(sock)
        self._register('read')

    def handle_event(self, event):
        assert event in ('read', 'write')

        if event == 'read':
            self._try_recv()
        elif event == 'write':
            self._try_send()

    def _try_recv(self):
        try:
            sock = self._handle
            addr = sock.getpeername()
            data = sock.recv(BUF_SIZE)
            print('* Received "{}" from {}:{}'.format(data, *addr))
            if data:
                print('  Sending data back to client')
                if not self._buf:
                    self._register('write')
                self._buf += data
                self._try_send()
            else:
                print('  Closing connection')
                self._close()
        except socket.error as ex:
            if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise

    def _try_send(self):
        try:
            sent = self._handle.send(self._buf[:BUF_SIZE])
            self._buf = self._buf[sent:]
            if not self._buf:
                self._unregister('write')
        except socket.error as ex:
            if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise

# ...

可以看到它在 __init__() 裡把自己註冊為 client ready for reading 的 handler,並會在連線中斷的時候把自己註銷。_try_recv() 會在 buffer 有資料時註冊監聽 client ready for writing 的 event,而 _try_send() 則會在 buffer 中的訊息已全部送出的情況下註銷 ready for writing event。

然後是 ClientAcceptor,它會在 ready for reading 的時候呼叫 socket.accept() 接受新的 connection:

# file: echo_server.py

# ...

class ClientAcceptor(EventHandler):

    def __init__(self, host, port):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.bind((host, port))
        sock.listen(1)
        sock.setblocking(0)
        print('* Running on {}:{}'.format(host, port))

        super(ClientAcceptor, self).__init__(sock)
        self._register('read')

    def handle_event(self, event):
        assert event == 'read'

        try:
            conn, addr = self._handle.accept()
            ClientHandler(conn)
            print('* Connected by {}'.format(addr))
        except socket.error as ex:
            if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise

# ...

同樣的,它會在 __init__() 裡把自己註冊給 dispatcher。

另外,可以看到在接受新的連線之後,它只用 conn 建立一個 ClientHandler 就丟著不管它了。這是因為 ClientHandler 會自行註冊給 dispatcher 的緣故。

於是現在的 main() 就變得很簡單了:

# file: echo_server.py

# ...

def main():  
    ClientAcceptor(HOST, PORT)
    Dispatcher().run()

# ...

Reactor Pattern

這種將 event demultiplexing/dispatching 與 event handling 分離的方式被稱為 Reactor pattern。從程式結構來看,它包含以下幾個 components:

Reactor - Class Diagram

而這些 components 是這樣運作的:

  1. [step 1-3] client 為特定 event(如 ready for reading/writing)將 handler 註冊給 dispatcher。註冊時,dispatcher 會跟 handler 要求 handler 想監聽的 handle。
  2. [step 4-5] 註冊完所有的 handler 之後,dispatcher 啟動它的 event loop。在 event loop 中,會透過 demultiplexer 監聽所有已註冊對應 handler 的 handle 是否有 event 發生。
  3. [step 6-7] 當 event 發生時,dispatcher 會通知對應的 handler 進行處理。

Reactor - Sequence Diagram

實際上,Python 內建的 asyncore module 已經提供了 socket 的 Reactor pattern 實作。只不過,它的名稱有些混淆:它的 loop() 實際上是上述的 dispatcher,而它的 dispatcher 則比較接近上述 event handler 的角色。雖然它的實作方式跟這篇筆記不大相同,但基本上觀念是一樣的。使用方式可以參考它 echo server 的範例。

Implementation Considerations

Single or Multiple Dispatchers

在上面的範例中,dispatcher 被實作成一個 singleton。這樣的設計在大多情況下應該就足夠了,但有時候還是會有需要多個 dispatcher 的情況。譬如說,每個 dispatcher 都可以在其專屬的 process 或 thread 上執行,以盡可能地善用多核心資源。

這時也可以考慮效仿 Event Handling with Python 裡頭提到的 HandlerManager:宣告一個 global 層級的 instance,只需要一個 dispatcher 的情況就使用這個 instance,有需要的時候再自行建立新的 instance 使用。

class Dispatcher(object):

    def __init__(self):
        # ...

    # ...

dispatcher = Dispatcher()  

Event Handling Interface

在實作 echo server 時,我們的 event handlers 全都是 abstract class EventHandler 的 subclasses。這樣做是為了把 subclasses 共有的 methods(get_handle()_register()_unregister()_close())抽出來,還有確保所有 handler 都有實作 handle_event()。當然在 Python 這種 dynamic typing 的語言,實際上不需要強制 handlers 一定要是某個 class 的 descendants,只要 interface 接得上去就可以了。

另外,這裡的 handler 都是以 handle_event(event) 的形式接受所有 event 的通知(single-method interface)。另一種可能的實作方式是以不同的 method 處理不同的 event(multi-method interface):

class MyHandler(object):

    # ... 

    def handle_read(self):
        # perform read operation
        # ...

    def handle_write(self):
        # perform write operation
        # ...

    # ...

這可以避免 single-method interface 還要自己根據 event 決定對應的處理方式。內建的 asyncore module 就是採用 multi-method interface 的實作。

event handler 也不見得要設計成 class,也可以只是一個 callback function。但因為 callback function 自身並不知道自己監聽的 handle 是什麼,因此註冊/註銷、以及通知 handler 的時候也要一併提供 handle 或 file descriptor 給 dispatcher:

def handle_event(event, sock):  
    # ...


def main():  
    # ...
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # ...
    dispatcher = Dispatcher()
    dispatcher.register(handle_event, 'read', sock)
    # ...

Handler Registration

在我們實作的版本中,註冊 handler 時都要一併傳遞想要監聽的 event type。但是在 asyncore 的實作中,註冊 handler 時則不需指定監聽的 event type。它會在 event loop 的每一個 iteration 都去詢問 handler 是否要監聽 ready for reading/writing 的 event,並加入對應的 waiting list 中。以下節錄其原始碼

def poll(timeout=0.0, map=None):  
    if map is None:
        map = socket_map
    if map:
        r = []; w = []; e = []
        for fd, obj in map.items():
            is_r = obj.readable()
            is_w = obj.writable()
            if is_r:
                r.append(fd)
            # accepting sockets should not be writable
            if is_w and not obj.accepting:
                w.append(fd)
            if is_r or is_w:
                e.append(fd)
        if [] == r == w == e:
            time.sleep(timeout)
            return

        try:
            r, w, e = select.select(r, w, e, timeout)
        except select.error, err:
            if err.args[0] != EINTR:
                raise
            else:
                return

        # ...

這個 poll() 的內容就是 event loop 每一個 iteration 實際執行的程式碼。這裡的 map 存的是以 file descriptor(fd)為 key、以 event handler(obj)為 value 的 dictionary。可以看到它分別利用 obj.readable()obj.writable() 來判斷是否將 handler 加入 reading 或 writing 的 waiting list(rw)裡頭。

Synchronous Event Demultiplexer

最後要提的是,現有的 Reactor 實作所採用的 synchronous event demultiplexer 其實通常不是 select。這是因為在不同 operating system 上,都各自有更有效率的做法。譬如說,Linux 通常用的是 epoll,而 BSD 則是 kqueue。因此 Reactor 的實作通常會把這部分封裝起來,在不同平台上自動選擇效率最好的 demultiplexer。

至於這些方法有什麼差別,就等之後有機會再來討論吧。

Summary

延續上一篇筆記,這篇筆記介紹了如何以 Reactor pattern 將 event handling 的處理邏輯從 event demultiplexing/dispatching 的部分分離。於是,現在 event loop 不必知道針對不同 handle 的處理邏輯,也能夠將不同 handles 切割成獨立的 event handlers 處理,使得擴充或修改功能只會影響到個別的 handlers。

這個 pattern 現在已經被廣泛利用在 event-driven applications 中,各個程式語言也都有許多成熟的實作品可供使用,如 C 語言的 libeventlibev、C++ 的 ACE Reactor Framework、Python 內建的 asyncoretwisted 等等。

本篇的討論主要基於 Douglas C. Schmidt 所撰寫的 Reactor 這篇近二十年前的論文,裡頭的範例採用 C++ 實作(作者本人就是 ACE 的開發者)。想看看更詳細的討論可以找來翻翻。

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket
Comments powered by Disqus