February 14, 2015 · Python Concurrency

Non-Blocking I/O and I/O Multiplexing

最近花了不少時間在研究 Python concurrency。但大概是以前 OS 沒學好,發現自己對 non-blocking I/O 跟 I/O multiplexing 之類的東西都不是很熟悉。雖然看來都不是太難理解的概念,但在釐清背景知識時還是花了不少時間。為了避免我過一陣子又忘記了,便趁著我記憶還清楚的時候,把我好不容易學會的粗淺知識整理下來。雖然我已經盡可能地多參考一些資料,但由於內容有許多我原本不會或是不熟的東西,如果有任何寫得不正確的部分,還煩請各位不吝指正。

Example: Echo Server

為了方便解釋,就用一個簡單的 echo server 作為例子吧。

首先是 server 的部分:

# file: echo_server.py

from __future__ import print_function

import socket

HOST = 'localhost'  
PORT = 1357  
BUF_SIZE = 1024


def main():  
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind((HOST, PORT))
    sock.listen(1)
    print('* Running on {}:{}'.format(HOST, PORT))

    conn, addr = sock.accept()
    print('* Connected by {}'.format(addr))

    while True:
        data = conn.recv(BUF_SIZE)
        print('* Received "{}" from {}:{}'.format(data, *addr))

        if data:
            print('  Sending data back to client')
            conn.sendall(data)
        else:
            print('  Closing connection')
            conn.close()
            break


if __name__ == '__main__':  
    main()

先等待一個 client 連上,然後從 client 收到的訊息,就原封不動地丟回去。

接著是 client 的部分:

# file: echo_client.py

from __future__ import print_function

import socket

HOST = 'localhost'  
PORT = 1357  
BUF_SIZE = 1024


def main():  
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print('* Connecting to {}:{}'.format(HOST, PORT))
    sock.connect((HOST, PORT))

    while True:
        msg = raw_input('> ')
        if not msg:
            break

        print('* Sending {}'.format(msg))
        sock.sendall(msg)

        recv_size = 0
        expect_size = len(msg)
        while recv_size < expect_size:
            data = sock.recv(BUF_SIZE)
            recv_size += len(data)
            print('* Received {}'.format(data))

    print('* Closing socket')
    sock.close()


if __name__ == '__main__':  
    main()

連上 server 之後,每次從 standard input 接收的訊息都會先發送給 server,再接收 server 回傳的訊息。

先執行 echo_server.py,再執行 echo_client.py 並輸入幾行訊息之後,結果應該會像這樣:

$ python echo_server.py 
* Running on localhost:1357
* Connected by ('127.0.0.1', 40404)
* Received "hello" from 127.0.0.1:40404
  Sending data back to client
* Received "world" from 127.0.0.1:40404
  Sending data back to client
* Received "" from 127.0.0.1:40404
  Closing connection
$ python echo_client.py 
* Connecting to localhost:1357
> hello
* Sending hello
* Received hello
> world
* Sending world
* Received world
> 
* Closing socket

Supporting Multiple Clients

到目前為止,我們的 echo server 只能支援一個 client 連進來。如果要讓我們的 server 能同時處理多個 client 要怎麼做呢?不如就把 sock.accept() 包進 loop 裡,讓 server 可以一直接受新的 connection:

# file: echo_server.py

# ...

def main():  
    # create socket
    # ...

    clients = {}
    while True:
        conn, addr = sock.accept()
        print('* Connected by {}'.format(addr))

        clients[conn.fileno()] = conn
        for fd, conn in clients.items():
            addr = conn.getpeername()
            data = conn.recv(BUF_SIZE)
            print('* Received "{}" from {}:{}'.format(data, *addr))

            if data:
                print('  Sending data back to client')
                conn.sendall(data)
            else:
                print('  Closing connection')
                del clients[fd]
                conn.close()

這裡用 clients 記錄所有連線的 clients,然後透過 loop 從每個 client 接收訊息,並且跟 sock.accept() 一起包進 while loop 裡,我們的 echo server 就可以接受多個 connection 了。

但只要你曾經寫過 multi-client socket server,肯定知道這樣寫根本行不通(XD)。問題在於:上面程式的 sock.accept()conn.recv()conn.sendall() 都是 blocking I/O。也就是說,程式會停在這個地方,直到 I/O 操作順利完成為止。

當程式執行到 sock.accept() 這行時,如果沒有任何 client 連入,程式就會卡在這邊,等到有新的 connection 才會繼續往下執行,這段時間就無法處理從任何已連線 client 發來的訊息。同樣的,當程式執行到 conn.recv() 這行時,程式也會卡在這邊等待這個 client 發送訊息過來,在這時也是無法接受新的 connection、或是處理其它 client 發來的訊息。

Thread-per-Connection

最直觀的做法,就是用 multi-threading 來解決這個問題:

# file: echo_server.py

# ...
import threading

# ...

def handle_client(conn, addr):  
    while True:
        data = conn.recv(BUF_SIZE)
        print('* Received "{}" from {}:{}'.format(data, *addr))

        if data:
            print('  Sending data back to client')
            conn.sendall(data)
        else:
            print('  Closing connection')
            conn.close()
            break


def main():  
    # create socket
    # ...

    while True:
        conn, addr = sock.accept()
        print('* Connected by {}'.format(addr))

        worker = threading.Thread(target=handle_client, args=(conn, addr))
        worker.start()

main thread 只負責接受新的 connection。每當有一個 client 連進來,就產生一條 thread 專門處理這個 connection。即使 main thread 卡在 sock.accept() 等待 connection,或是其它 thread 被卡在 conn.recv() 等待接收訊息,其它的 thread 都還能夠各自運作。

Thread Pool

現在每個 I/O 操作都只會 block 處理當前那個 connection 的 thread。雖然看起來問題是解決了,但當連線數相當多的時候,為每一個 connection 分配一條 thread 其實會吃掉不少系統資源。

那麼,採用 thread pool 呢?我們一方面可以用它控制 thread 的數量,另一方面則可以讓完成工作的 thread 繼續處理下一個 connection,省去不斷建立、銷毀 thread 的成本:

# file: echo_server.py

# ...
from multiprocessing.pool import ThreadPool

# ...

def main():  
    # create socket
    # ...

    pool = ThreadPool(10)
    while True:
        # accept new connection
        # ...
        pool.apply_async(handle_client, args=(conn, addr))

但能同時處理的連線數量卻因此而受限了。

此外,multi-threading 也可能會因為需要在不同 connection thread 之間 context switch 導致其效能不彰。更別提在複雜一點的系統中,還得小心避免發生 race condition。為此加上 lock 的同時,也要注意不會因而產生 deadlock 讓程式動彈不得。

Non-Blocking I/O

唔,所以看起來 thread-per-connection 並不是個經濟實惠的做法。那麼,有沒有辦法讓一條 thread 就能夠同時處理多個 connection,又不會被卡在 I/O 操作呢?可以的,就是透過 non-blocking I/O 來做到。

Python 內建的 socket 可以利用 socket.setblocking(0) 使得對 socket 的 I/O 操作變成 non-blocking:如果操作無法立刻完成,會立刻丟出 error,使得程式不會卡在那邊,能夠繼續往下執行。

但這時 client 的處理就比較複雜了。因為 non-blocking 機制的關係,回覆訊息給 client 的操作有可能無法成功(或是只發送一部分),所以每個 connection 都需要一個 message buffer 來存放還沒發送出去的訊息。為了方便管理每個 connection 的 message buffer,我們要把 client socket 連同它的 buffer 一起包進 Connection object 裡頭:

# file: echo_server.py

# ...

class Connection(object):

    def __init__(self, sock):
        sock.setblocking(0)
        self._sock = sock
        self._connected = True
        self._buf = ''

    def connected(self):
        return self._connected

    def has_data(self):
        return bool(self._buf)

    def close(self):
        self._connected = False
        self._sock.close()

    # ...

可以看到它會先把丟進來的 socket 設成 non-blocking mode。這裡的 self._buf 就是這個 connection 的 message buffer。

接著,try_recv() 會嘗試從 client 接收訊息,並 append 到 self._buf 裡:

# file: echo_server.py

# ...

class Connection(object):

    # ...

    def try_recv(self):
        try:
            addr = self._sock.getpeername()
            data = self._sock.recv(BUF_SIZE)
            print('* Received "{}" from {}:{}'.format(data, *addr))

            if data:
                print('  Sending data back to client')
                self._buf += data
                self.try_send()
            else:
                print('  Closing connection')
                self.close()
        except socket.error as ex:
            if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise

    # ...

注意到現在 self._sock.recv() 被包在 try-except statement 裡頭。在這裡如果發生 socket.error,且 error code 是 EWOULDBLOCKEAGAIN(在許多系統上,這兩個 error code 通常是同一個值,但保險起見還是兩個一起檢查),就代表 self._sock.recv() 會被 block。這種情況下我們還要讓程式繼續執行,因此 catch 到之後就不多做處理。

最後,try_send() 會嘗試把存在 self._buf 的訊息傳到 client:

# file: echo_server.py

# ...

class Connection(object):

    # ...

    def try_send(self):
        try:
            sent = self._sock.send(self._buf[:BUF_SIZE])
            self._buf = self._buf[sent:]
        except socket.error as ex:
            if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise

這裡對 self._sock.send() 的 try-except 處理與 try_recv() 相同。

於是,現在 main() 變成這樣:

# file: echo_server.py

# ...

def main():  
    # create socket
    # ...
    sock.setblocking(0)

    clients = {}
    while True:
        try:
            conn, addr = sock.accept()
            clients[conn.fileno()] = Connection(conn)
            print('* Connected by {}'.format(addr))
        except socket.error as ex:
            if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise

        for fd, conn in clients.items():
            conn.try_recv()
            if not conn.connected():
                del clients[fd]
            elif conn.has_data():
                conn.try_send()

先嘗試看看是否能接受新的 connection,再一一嘗試是否能夠從 client 接收訊息、或是回覆訊息給 client。

但這種寫法其實就是在 loop 中瘋狂地嘗試進行 I/O,造成了 busy-waiting。若是一段時間都沒有任何新的 connection,也沒有任何 client 發送訊息給 server,這段程式也會繼續佔著寶貴的 CPU 資源,卻什麼事也沒做。

I/O Multiplexing

為了避免 busy-waiting 的問題,我們需要一種可以在「有某個 socket 能夠順利進行 I/O 操作(而不會被 block)」的時候通知我們的機制。於是,這裡就輪到 I/O Multiplexing 出馬了。它可以讓我們同時監聽多個 file descriptors 是否 ready。這其中最通用的就是 select 了:

# file: echo_server.py

# ...
from select import select

# ...

def main():  
    # create socket
    # ...

    clients = {}
    read_waiting = set([sock.fileno()])
    write_waiting = set()
    while True:
        rs, ws, es = select(read_waiting, write_waiting, [])
        for fd in rs:
            if fd == sock.fileno():
                try:
                    conn, addr = sock.accept()
                    new_fd = conn.fileno()
                    clients[new_fd] = Connection(conn)
                    read_waiting.add(new_fd)
                    print('* Connected by {}'.format(addr))
                except socket.error as ex:
                    if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
                        raise
            else:
                conn = clients[fd]
                conn.try_recv()
                if not conn.connected():
                    del clients[fd]
                    read_waiting.remove(fd)
                    if conn.has_data():
                        write_waiting.remove(fd)
                elif conn.has_data():
                    write_waiting.add(fd)

        for fd in ws:
            conn = clients[fd]
            conn.try_send()
            if not conn.has_data():
                write_waiting.remove(fd)

這裡宣告了 read_waitingwrite_waiting 代表想要監聽 ready for reading/writing event 的 file descriptors。接著在最外層的 while loop(event loop)裡,透過 select 找出發生 event 的 file descriptors(rsws),再對每個 file descriptor 做相應的處理。

rs 中,假如這個 file descriptor 是 sock,代表有新的 client 連進來了;否則,代表它是個 client,而且它有新的訊息可以接收。另外,如果 connection 的 buffer 有資料,我們需要把這個 client 加到 write_waiting 裡頭,以在 client ready for writing 的時候把訊息傳送給它。而在 ws 中的 file descriptor 一定是 client,且代表這個 client 能夠接收訊息了。在嘗試發送訊息之後,假如 buffer 已經沒有訊息要發給這個 client,就把它從 write_waiting 移除。

由於 select 實際上是個 blocking system call,因此在沒有任何新的 connection、也沒有任何 client 發送訊息給 server 的時候,程式就會停在這邊不繼續執行。這樣跟 busy-waiting 相比有什麼好處?如果當前的 select 操作需要等待,那麼 operating system 會知道當前這個 thread/process 無事可做,便能夠主動介入換成其它 thread/process 使用 CPU,因而避免了 busy-waiting 會佔著 CPU 不放,卻什麼也不做的問題。

Summary

在這篇筆記裡,我們以一個簡單的 concurrent socket server 為例,展示了 single thread 進行 blocking I/O 時,會導致程式卡在 I/O 操作無法處理其它工作的問題。雖然 thread-per-connection 看似可以解決這個問題,但在大量 connection 的情況下,context switching 與多個 connection thread 的資源使用對系統都是不小的開銷。

因此,為了能夠使 single thread 能夠處理多個 connection,我們利用 non-blocking mode 避免 I/O 操作 block 整個程式的進行。此外,為了避免 busy-waiting 霸佔 CPU 的問題,我們透過 I/O multiplexing 達成在 ready for reading/writing 才進行處理的通知機制。

到目前為止,我們需要的功能都已經完成了,而且只需要 single thread 就足以應付新的 connection 與多個 clients 的訊息收發。但現在的問題是,在上面的 event loop 裡,一一判斷 file descriptor 做對應處理的方法實在不是很明智。假設現在有多個 file descriptor 都需要各自不同的處理,就得在 loop 裡寫下一長串的 if conditions.....。

本來想繼續寫下去,但發現文章的篇幅好像已經太長了,因此就先在這個段落停筆。這個問題就讓我們暫且銘記於心,留待下一篇筆記再來討論。

Further Reading

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket
Comments powered by Disqus