Non-Blocking I/O and I/O Multiplexing
最近花了不少時間在研究 Python concurrency。但大概是以前 OS 沒學好,發現自己對 non-blocking I/O 跟 I/O multiplexing 之類的東西都不是很熟悉。雖然看來都不是太難理解的概念,但在釐清背景知識時還是花了不少時間。為了避免我過一陣子又忘記了,便趁著我記憶還清楚的時候,把我好不容易學會的粗淺知識整理下來。雖然我已經盡可能地多參考一些資料,但由於內容有許多我原本不會或是不熟的東西,如果有任何寫得不正確的部分,還煩請各位不吝指正。
Example: Echo Server
為了方便解釋,就用一個簡單的 echo server 作為例子吧。
首先是 server 的部分:
# file: echo_server.py
from __future__ import print_function
import socket
HOST = 'localhost'
PORT = 1357
BUF_SIZE = 1024
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((HOST, PORT))
sock.listen(1)
print('* Running on {}:{}'.format(HOST, PORT))
conn, addr = sock.accept()
print('* Connected by {}'.format(addr))
while True:
data = conn.recv(BUF_SIZE)
print('* Received "{}" from {}:{}'.format(data, *addr))
if data:
print(' Sending data back to client')
conn.sendall(data)
else:
print(' Closing connection')
conn.close()
break
if __name__ == '__main__':
main()
先等待一個 client 連上,然後從 client 收到的訊息,就原封不動地丟回去。
接著是 client 的部分:
# file: echo_client.py
from __future__ import print_function
import socket
HOST = 'localhost'
PORT = 1357
BUF_SIZE = 1024
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('* Connecting to {}:{}'.format(HOST, PORT))
sock.connect((HOST, PORT))
while True:
msg = raw_input('> ')
if not msg:
break
print('* Sending {}'.format(msg))
sock.sendall(msg)
recv_size = 0
expect_size = len(msg)
while recv_size < expect_size:
data = sock.recv(BUF_SIZE)
recv_size += len(data)
print('* Received {}'.format(data))
print('* Closing socket')
sock.close()
if __name__ == '__main__':
main()
連上 server 之後,每次從 standard input 接收的訊息都會先發送給 server,再接收 server 回傳的訊息。
先執行 echo_server.py
,再執行 echo_client.py
並輸入幾行訊息之後,結果應該會像這樣:
$ python echo_server.py
* Running on localhost:1357
* Connected by ('127.0.0.1', 40404)
* Received "hello" from 127.0.0.1:40404
Sending data back to client
* Received "world" from 127.0.0.1:40404
Sending data back to client
* Received "" from 127.0.0.1:40404
Closing connection
$ python echo_client.py
* Connecting to localhost:1357
> hello
* Sending hello
* Received hello
> world
* Sending world
* Received world
>
* Closing socket
Supporting Multiple Clients
到目前為止,我們的 echo server 只能支援一個 client 連進來。如果要讓我們的 server 能同時處理多個 client 要怎麼做呢?不如就把 sock.accept()
包進 loop 裡,讓 server 可以一直接受新的 connection:
# file: echo_server.py
# ...
def main():
# create socket
# ...
clients = {}
while True:
conn, addr = sock.accept()
print('* Connected by {}'.format(addr))
clients[conn.fileno()] = conn
for fd, conn in clients.items():
addr = conn.getpeername()
data = conn.recv(BUF_SIZE)
print('* Received "{}" from {}:{}'.format(data, *addr))
if data:
print(' Sending data back to client')
conn.sendall(data)
else:
print(' Closing connection')
del clients[fd]
conn.close()
這裡用 clients
記錄所有連線的 clients,然後透過 loop 從每個 client 接收訊息,並且跟 sock.accept()
一起包進 while loop 裡,我們的 echo server 就可以接受多個 connection 了。
但只要你曾經寫過 multi-client socket server,肯定知道這樣寫根本行不通(XD)。問題在於:上面程式的 sock.accept()
、conn.recv()
跟 conn.sendall()
都是 blocking I/O。也就是說,程式會停在這個地方,直到 I/O 操作順利完成為止。
當程式執行到 sock.accept()
這行時,如果沒有任何 client 連入,程式就會卡在這邊,等到有新的 connection 才會繼續往下執行,這段時間就無法處理從任何已連線 client 發來的訊息。同樣的,當程式執行到 conn.recv()
這行時,程式也會卡在這邊等待這個 client 發送訊息過來,在這時也是無法接受新的 connection、或是處理其它 client 發來的訊息。
Thread-per-Connection
最直觀的做法,就是用 multi-threading 來解決這個問題:
# file: echo_server.py
# ...
import threading
# ...
def handle_client(conn, addr):
while True:
data = conn.recv(BUF_SIZE)
print('* Received "{}" from {}:{}'.format(data, *addr))
if data:
print(' Sending data back to client')
conn.sendall(data)
else:
print(' Closing connection')
conn.close()
break
def main():
# create socket
# ...
while True:
conn, addr = sock.accept()
print('* Connected by {}'.format(addr))
worker = threading.Thread(target=handle_client, args=(conn, addr))
worker.start()
main thread 只負責接受新的 connection。每當有一個 client 連進來,就產生一條 thread 專門處理這個 connection。即使 main thread 卡在 sock.accept()
等待 connection,或是其它 thread 被卡在 conn.recv()
等待接收訊息,其它的 thread 都還能夠各自運作。
Thread Pool
現在每個 I/O 操作都只會 block 處理當前那個 connection 的 thread。雖然看起來問題是解決了,但當連線數相當多的時候,為每一個 connection 分配一條 thread 其實會吃掉不少系統資源。
那麼,採用 thread pool 呢?我們一方面可以用它控制 thread 的數量,另一方面則可以讓完成工作的 thread 繼續處理下一個 connection,省去不斷建立、銷毀 thread 的成本:
# file: echo_server.py
# ...
from multiprocessing.pool import ThreadPool
# ...
def main():
# create socket
# ...
pool = ThreadPool(10)
while True:
# accept new connection
# ...
pool.apply_async(handle_client, args=(conn, addr))
但能同時處理的連線數量卻因此而受限了。
此外,multi-threading 也可能會因為需要在不同 connection thread 之間 context switch 導致其效能不彰。更別提在複雜一點的系統中,還得小心避免發生 race condition。為此加上 lock 的同時,也要注意不會因而產生 deadlock 讓程式動彈不得。
Non-Blocking I/O
唔,所以看起來 thread-per-connection 並不是個經濟實惠的做法。那麼,有沒有辦法讓一條 thread 就能夠同時處理多個 connection,又不會被卡在 I/O 操作呢?可以的,就是透過 non-blocking I/O 來做到。
Python 內建的 socket 可以利用 socket.setblocking(0)
使得對 socket 的 I/O 操作變成 non-blocking:如果操作無法立刻完成,會立刻丟出 error,使得程式不會卡在那邊,能夠繼續往下執行。
但這時 client 的處理就比較複雜了。因為 non-blocking 機制的關係,回覆訊息給 client 的操作有可能無法成功(或是只發送一部分),所以每個 connection 都需要一個 message buffer 來存放還沒發送出去的訊息。為了方便管理每個 connection 的 message buffer,我們要把 client socket 連同它的 buffer 一起包進 Connection
object 裡頭:
# file: echo_server.py
# ...
class Connection(object):
def __init__(self, sock):
sock.setblocking(0)
self._sock = sock
self._connected = True
self._buf = ''
def connected(self):
return self._connected
def has_data(self):
return bool(self._buf)
def close(self):
self._connected = False
self._sock.close()
# ...
可以看到它會先把丟進來的 socket 設成 non-blocking mode。這裡的 self._buf
就是這個 connection 的 message buffer。
接著,try_recv()
會嘗試從 client 接收訊息,並 append 到 self._buf
裡:
# file: echo_server.py
# ...
class Connection(object):
# ...
def try_recv(self):
try:
addr = self._sock.getpeername()
data = self._sock.recv(BUF_SIZE)
print('* Received "{}" from {}:{}'.format(data, *addr))
if data:
print(' Sending data back to client')
self._buf += data
self.try_send()
else:
print(' Closing connection')
self.close()
except socket.error as ex:
if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
# ...
注意到現在 self._sock.recv()
被包在 try-except statement 裡頭。在這裡如果發生 socket.error
,且 error code 是 EWOULDBLOCK
或 EAGAIN
(在許多系統上,這兩個 error code 通常是同一個值,但保險起見還是兩個一起檢查),就代表 self._sock.recv()
會被 block。這種情況下我們還要讓程式繼續執行,因此 catch 到之後就不多做處理。
最後,try_send()
會嘗試把存在 self._buf
的訊息傳到 client:
# file: echo_server.py
# ...
class Connection(object):
# ...
def try_send(self):
try:
sent = self._sock.send(self._buf[:BUF_SIZE])
self._buf = self._buf[sent:]
except socket.error as ex:
if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
這裡對 self._sock.send()
的 try-except 處理與 try_recv()
相同。
於是,現在 main()
變成這樣:
# file: echo_server.py
# ...
def main():
# create socket
# ...
sock.setblocking(0)
clients = {}
while True:
try:
conn, addr = sock.accept()
clients[conn.fileno()] = Connection(conn)
print('* Connected by {}'.format(addr))
except socket.error as ex:
if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
for fd, conn in clients.items():
conn.try_recv()
if not conn.connected():
del clients[fd]
elif conn.has_data():
conn.try_send()
先嘗試看看是否能接受新的 connection,再一一嘗試是否能夠從 client 接收訊息、或是回覆訊息給 client。
但這種寫法其實就是在 loop 中瘋狂地嘗試進行 I/O,造成了 busy-waiting。若是一段時間都沒有任何新的 connection,也沒有任何 client 發送訊息給 server,這段程式也會繼續佔著寶貴的 CPU 資源,卻什麼事也沒做。
I/O Multiplexing
為了避免 busy-waiting 的問題,我們需要一種可以在「有某個 socket 能夠順利進行 I/O 操作(而不會被 block)」的時候通知我們的機制。於是,這裡就輪到 I/O Multiplexing 出馬了。它可以讓我們同時監聽多個 file descriptors 是否 ready。這其中最通用的就是 select
了:
# file: echo_server.py
# ...
from select import select
# ...
def main():
# create socket
# ...
clients = {}
read_waiting = set([sock.fileno()])
write_waiting = set()
while True:
rs, ws, es = select(read_waiting, write_waiting, [])
for fd in rs:
if fd == sock.fileno():
try:
conn, addr = sock.accept()
new_fd = conn.fileno()
clients[new_fd] = Connection(conn)
read_waiting.add(new_fd)
print('* Connected by {}'.format(addr))
except socket.error as ex:
if ex.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
else:
conn = clients[fd]
conn.try_recv()
if not conn.connected():
del clients[fd]
read_waiting.remove(fd)
if conn.has_data():
write_waiting.remove(fd)
elif conn.has_data():
write_waiting.add(fd)
for fd in ws:
conn = clients[fd]
conn.try_send()
if not conn.has_data():
write_waiting.remove(fd)
這裡宣告了 read_waiting
與 write_waiting
代表想要監聽 ready for reading/writing event 的 file descriptors。接著在最外層的 while loop(event loop)裡,透過 select
找出發生 event 的 file descriptors(rs
跟 ws
),再對每個 file descriptor 做相應的處理。
在 rs
中,假如這個 file descriptor 是 sock
,代表有新的 client 連進來了;否則,代表它是個 client,而且它有新的訊息可以接收。另外,如果 connection 的 buffer 有資料,我們需要把這個 client 加到 write_waiting
裡頭,以在 client ready for writing 的時候把訊息傳送給它。而在 ws
中的 file descriptor 一定是 client,且代表這個 client 能夠接收訊息了。在嘗試發送訊息之後,假如 buffer 已經沒有訊息要發給這個 client,就把它從 write_waiting
移除。
由於 select
實際上是個 blocking system call,因此在沒有任何新的 connection、也沒有任何 client 發送訊息給 server 的時候,程式就會停在這邊不繼續執行。這樣跟 busy-waiting 相比有什麼好處?如果當前的 select
操作需要等待,那麼 operating system 會知道當前這個 thread/process 無事可做,便能夠主動介入換成其它 thread/process 使用 CPU,因而避免了 busy-waiting 會佔著 CPU 不放,卻什麼也不做的問題。
Summary
在這篇筆記裡,我們以一個簡單的 concurrent socket server 為例,展示了 single thread 進行 blocking I/O 時,會導致程式卡在 I/O 操作無法處理其它工作的問題。雖然 thread-per-connection 看似可以解決這個問題,但在大量 connection 的情況下,context switching 與多個 connection thread 的資源使用對系統都是不小的開銷。
因此,為了能夠使 single thread 能夠處理多個 connection,我們利用 non-blocking mode 避免 I/O 操作 block 整個程式的進行。此外,為了避免 busy-waiting 霸佔 CPU 的問題,我們透過 I/O multiplexing 達成在 ready for reading/writing 才進行處理的通知機制。
到目前為止,我們需要的功能都已經完成了,而且只需要 single thread 就足以應付新的 connection 與多個 clients 的訊息收發。但現在的問題是,在上面的 event loop 裡,一一判斷 file descriptor 做對應處理的方法實在不是很明智。假設現在有多個 file descriptor 都需要各自不同的處理,就得在 loop 裡寫下一長串的 if conditions.....。
本來想繼續寫下去,但發現文章的篇幅好像已經太長了,因此就先在這個段落停筆。這個問題就讓我們暫且銘記於心,留待下一篇筆記再來討論。