從零構建一個簡單的 Python 框架
我希望這篇文章可以鼓勵更多的人來嘗試,因為這確實很有趣。它讓我知道了 web 應用是如何工作的,而且這比我想的要容易的多!
範圍
框架可以處理請求-響應周期、身份認證、資料庫訪問、模板生成等部分工作。Web 開發者使用框架是因為,大多數的 web 應用擁有大量相同的功能,而對每個項目都重新實現同樣的功能意義不大。
比較大的的框架如 Rails 和 Django 實現了高層次的抽象,或者說「自備電池」(「batteries-included」,這是 Python 的口號之一,意即所有功能都自足。)。而實現所有的這些功能可能要花費數千小時,因此在這個項目上,我們重點完成其中的一小部分。在開始寫代碼前,我先列舉一下所需的功能以及限制。
功能:
- 處理 HTTP 的 GET 和 POST 請求。你可以在這篇 wiki 中對 HTTP 有個大致的了解。
- 實現非同步操作(我喜歡 Python 3 的 asyncio 模塊)。
- 簡單的路由邏輯以及參數擷取。
- 像其他微型框架一樣,提供一個簡單的用戶級 API 。
- 支持身份認證,因為學會這個很酷啊(微笑)。
限制:
- 將只支持 HTTP 1.1 的一個小子集,不支持 傳輸編碼 、 HTTP 認證 、 內容編碼 (如 gzip)以及持久化連接等功能。
- 不支持對響應內容的 MIME 判斷 - 用戶需要手動指定。
- 不支持 WSGI - 僅能處理簡單的 TCP 連接。
- 不支持資料庫。
我覺得一個小的用例可以讓上述內容更加具體,也可以用來演示這個框架的 API:
from diy_framework import App, Router
from diy_framework.http_utils import Response
# GET simple route
async def home(r):
rsp = Response()
rsp.set_header('Content-Type', 'text/html')
rsp.body = '<html><body><b>test</b></body></html>'
return rsp
# GET route + params
async def welcome(r, name):
return "Welcome {}".format(name)
# POST route + body param
async def parse_form(r):
if r.method == 'GET':
return 'form'
else:
name = r.body.get('name', '')[0]
password = r.body.get('password', '')[0]
return "{0}:{1}".format(name, password)
# application = router + http server
router = Router()
router.add_routes({
r'/welcome/{name}': welcome,
r'/': home,
r'/login': parse_form,})
app = App(router)
app.start_server()
' 用戶需要定義一些能夠返回字元串或 Response
對象的非同步函數,然後將這些函數與表示路由的字元串配對,最後通過一個函數調用(start_server
)開始處理請求。
完成設計之後,我將它抽象為幾個我需要編碼的部分:
- 接受 TCP 連接以及調度一個非同步函數來處理這些連接的部分
- 將原始文本解析成某種抽象容器的部分
- 對於每個請求,用來決定調用哪個函數的部分
- 將上述部分集中到一起,並為開發者提供一個簡單介面的部分
我先編寫一些測試,這些測試被用來描述每個部分的功能。幾次重構後,整個設計被分成若干部分,每個部分之間是相對解耦的。這樣就非常好,因為每個部分可以被獨立地研究學習。以下是我上文列出的抽象的具體體現:
- 一個 HTTPServer 對象,需要一個 Router 對象和一個 http_parser 模塊,並使用它們來初始化。
- HTTPConnection 對象,每一個對象表示一個單獨的客戶端 HTTP 連接,並且處理其請求-響應周期:使用 http_parser 模塊將收到的位元組流解析為一個 Request 對象;使用一個 Router 實例尋找並調用正確的函數來生成一個響應;最後將這個響應發送回客戶端。
- 一對 Request 和 Response 對象為用戶提供了一種友好的方式,來處理實質上是位元組流的字元串。用戶不需要知道正確的消息格式和分隔符是怎樣的。
- 一個包含「路由:函數」對應關係的 Router 對象。它提供一個添加配對的方法,可以根據 URL 路徑查找到相應的函數。
- 最後,一個 App 對象。它包含配置信息,並使用它們實例化一個 HTTPServer 實例。
讓我們從 HTTPConnection
開始來講解各個部分。
模擬非同步連接
為了滿足上述約束條件,每一個 HTTP 請求都是一個單獨的 TCP 連接。這使得處理請求的速度變慢了,因為建立多個 TCP 連接需要相對高的花銷(DNS 查詢,TCP 三次握手,慢啟動等等的花銷),不過這樣更加容易模擬。對於這一任務,我選擇相對高級的 asyncio-stream 模塊,它建立在 asyncio 的傳輸和協議的基礎之上。我強烈推薦你讀一讀標準庫中的相應代碼,很有意思!
一個 HTTPConnection
的實例能夠處理多個任務。首先,它使用 asyncio.StreamReader
對象以增量的方式從 TCP 連接中讀取數據,並存儲在緩存中。每一個讀取操作完成後,它會嘗試解析緩存中的數據,並生成一個 Request
對象。一旦收到了這個完整的請求,它就生成一個回復,並通過 asyncio.StreamWriter
對象發送回客戶端。當然,它還有兩個任務:超時連接以及錯誤處理。
你可以在這裡瀏覽這個類的完整代碼。我將分別介紹代碼的每一部分。為了簡單起見,我移除了代碼文檔。
class HTTPConnection(object):
def init(self, http_server, reader, writer):
self.router = http_server.router
self.http_parser = http_server.http_parser
self.loop = http_server.loop
self._reader = reader
self._writer = writer
self._buffer = bytearray()
self._conn_timeout = None
self.request = Request()
這個 init
方法沒啥意思,它僅僅是收集了一些對象以供後面使用。它存儲了一個 router
對象、一個 http_parser
對象以及 loop
對象,分別用來生成響應、解析請求以及在事件循環中調度任務。
然後,它存儲了代表一個 TCP 連接的讀寫對,和一個充當原始位元組緩衝區的空位元組數組。_conn_timeout
存儲了一個 asyncio.Handle 的實例,用來管理超時邏輯。最後,它還存儲了 Request
對象的一個單一實例。
下面的代碼是用來接受和發送數據的核心功能:
async def handle_request(self):
try:
while not self.request.finished and not self._reader.at_eof():
data = await self._reader.read(1024)
if data:
self._reset_conn_timeout()
await self.process_data(data)
if self.request.finished:
await self.reply()
elif self._reader.at_eof():
raise BadRequestException()
except (NotFoundException,
BadRequestException) as e:
self.error_reply(e.code, body=Response.reason_phrases[e.code])
except Exception as e:
self.error_reply(500, body=Response.reason_phrases[500])
self.close_connection()
所有內容被包含在 try-except
代碼塊中,這樣在解析請求或響應期間拋出的異常可以被捕獲到,然後一個錯誤響應會發送回客戶端。
在 while
循環中不斷讀取請求,直到解析器將 self.request.finished
設置為 True ,或者客戶端關閉連接所觸發的信號使得 self._reader_at_eof()
函數返回值為 True 為止。這段代碼嘗試在每次循環迭代中從 StreamReader
中讀取數據,並通過調用 self.process_data(data)
函數以增量方式生成 self.request
。每次循環讀取數據時,連接超時計數器被重置。
這兒有個錯誤,你發現了嗎?稍後我們會再討論這個。需要注意的是,這個循環可能會耗盡 CPU 資源,因為如果沒有讀取到東西 self._reader.read()
函數將會返回一個空的位元組對象 b''
。這就意味著循環將會不斷運行,卻什麼也不做。一個可能的解決方法是,用非阻塞的方式等待一小段時間:await asyncio.sleep(0.1)
。我們暫且不對它做優化。
還記得上一段我提到的那個錯誤嗎?只有從 StreamReader
讀取數據時,self._reset_conn_timeout()
函數才會被調用。這就意味著,直到第一個位元組到達時,timeout
才被初始化。如果有一個客戶端建立了與伺服器的連接卻不發送任何數據,那就永遠不會超時。這可能被用來消耗系統資源,從而導致拒絕服務式攻擊(DoS)。修復方法就是在 init
函數中調用 self._reset_conn_timeout()
函數。
當請求接受完成或連接中斷時,程序將運行到 if-else
代碼塊。這部分代碼會判斷解析器收到完整的數據後是否完成了解析。如果是,好,生成一個回復並發送回客戶端。如果不是,那麼請求信息可能有錯誤,拋出一個異常!最後,我們調用 self.close_connection
執行清理工作。
解析請求的部分在 self.process_data
方法中。這個方法非常簡短,也易於測試:
async def process_data(self, data):
self._buffer.extend(data)
self._buffer = self.http_parser.parse_into(
self.request, self._buffer)
每一次調用都將數據累積到 self._buffer
中,然後試著用 self.http_parser
來解析已經收集的數據。這裡需要指出的是,這段代碼展示了一種稱為依賴注入(Dependency Injection)的模式。如果你還記得 init
函數的話,應該知道我們傳入了一個包含 http_parser
對象的 http_server
對象。在這個例子里,http_parser
對象是 diy_framework
包中的一個模塊。不過它也可以是任何含有 parse_into
函數的類,這個 parse_into
函數接受一個 Request
對象以及位元組數組作為參數。這很有用,原因有二:一是,這意味著這段代碼更易擴展。如果有人想通過一個不同的解析器來使用 HTTPConnection
,沒問題,只需將它作為參數傳入即可。二是,這使得測試更加容易,因為 http_parser
不是硬編碼的,所以使用虛假數據或者 mock 對象來替代是很容易的。
下一段有趣的部分就是 reply
方法了:
async def reply(self):
request = self.request
handler = self.router.get_handler(request.path)
response = await handler.handle(request)
if not isinstance(response, Response):
response = Response(code=200, body=response)
self._writer.write(response.to_bytes())
await self._writer.drain()
這裡,一個 HTTPConnection
的實例使用了 HTTPServer
中的 router
對象來得到一個生成響應的對象。一個路由可以是任何一個擁有 get_handler
方法的對象,這個方法接收一個字元串作為參數,返回一個可調用的對象或者拋出 NotFoundException
異常。而這個可調用的對象被用來處理請求以及生成響應。處理程序由框架的使用者編寫,如上文所說的那樣,應該返回字元串或者 Response
對象。Response
對象提供了一個友好的介面,因此這個簡單的 if 語句保證了無論處理程序返回什麼,代碼最終都得到一個統一的 Response
對象。
接下來,被賦值給 self._writer
的 StreamWriter
實例被調用,將位元組字元串發送回客戶端。函數返回前,程序在 await self._writer.drain()
處等待,以確保所有的數據被發送給客戶端。只要緩存中還有未發送的數據,self._writer.close()
方法就不會執行。
HTTPConnection
類還有兩個更加有趣的部分:一個用於關閉連接的方法,以及一組用來處理超時機制的方法。首先,關閉一條連接由下面這個小函數完成:
def close_connection(self):
self._cancel_conn_timeout()
self._writer.close()
每當一條連接將被關閉時,這段代碼首先取消超時,然後把連接從事件循環中清除。
超時機制由三個相關的函數組成:第一個函數在超時後給客戶端發送錯誤消息並關閉連接;第二個函數用於取消當前的超時;第三個函數調度超時功能。前兩個函數比較簡單,我將詳細解釋第三個函數 _reset_cpmm_timeout()
。
def _conn_timeout_close(self):
self.error_reply(500, 'timeout')
self.close_connection()
def _cancel_conn_timeout(self):
if self._conn_timeout:
self._conn_timeout.cancel()
def _reset_conn_timeout(self, timeout=TIMEOUT):
self._cancel_conn_timeout()
self._conn_timeout = self.loop.call_later(
timeout, self._conn_timeout_close)
每當 _reset_conn_timeout
函數被調用時,它會先取消之前所有賦值給 self._conn_timeout
的 asyncio.Handle
對象。然後,使用 BaseEventLoop.call_later 函數讓 _conn_timeout_close
函數在超時數秒(timeout
)後執行。如果你還記得 handle_request
函數的內容,就知道每當接收到數據時,這個函數就會被調用。這就取消了當前的超時並且重新安排 _conn_timeout_close
函數在超時數秒(timeout
)後執行。只要接收到數據,這個循環就會不斷地重置超時回調。如果在超時時間內沒有接收到數據,最後函數 _conn_timeout_close
就會被調用。
創建連接
我們需要創建 HTTPConnection
對象,並且正確地使用它們。這一任務由 HTTPServer
類完成。HTTPServer
類是一個簡單的容器,可以存儲著一些配置信息(解析器,路由和事件循環實例),並使用這些配置來創建 HTTPConnection
實例:
class HTTPServer(object):
def init(self, router, http_parser, loop):
self.router = router
self.http_parser = http_parser
self.loop = loop
async def handle_connection(self, reader, writer):
connection = HTTPConnection(self, reader, writer)
asyncio.ensure_future(connection.handle_request(), loop=self.loop)
HTTPServer
的每一個實例能夠監聽一個埠。它有一個 handle_connection
的非同步方法來創建 HTTPConnection
的實例,並安排它們在事件循環中運行。這個方法被傳遞給 asyncio.start_server 作為一個回調函數。也就是說,每當一個 TCP 連接初始化時(以 StreamReader
和 StreamWriter
為參數),它就會被調用。
self._server = HTTPServer(self.router, self.http_parser, self.loop)
self._connection_handler = asyncio.start_server(
self._server.handle_connection,
host=self.host,
port=self.port,
reuse_address=True,
reuse_port=True,
loop=self.loop)
這就是構成整個應用程序工作原理的核心:asyncio.start_server
接受 TCP 連接,然後在一個預配置的 HTTPServer
對象上調用一個方法。這個方法將處理一條 TCP 連接的所有邏輯:讀取、解析、生成響應並發送回客戶端、以及關閉連接。它的重點是 IO 邏輯、解析和生成響應。
講解了核心的 IO 部分,讓我們繼續。
解析請求
這個微型框架的使用者被寵壞了,不願意和位元組打交道。它們想要一個更高層次的抽象 —— 一種更加簡單的方法來處理請求。這個微型框架就包含了一個簡單的 HTTP 解析器,能夠將位元組流轉化為 Request 對象。
這些 Request 對象是像這樣的容器:
class Request(object):
def init(self):
self.method = None
self.path = None
self.query_params = {}
self.path_params = {}
self.headers = {}
self.body = None
self.body_raw = None
self.finished = False
它包含了所有需要的數據,可以用一種容易理解的方法從客戶端接受數據。哦,不包括 cookie ,它對身份認證是非常重要的,我會將它留在第二部分。
每一個 HTTP 請求都包含了一些必需的內容,如請求路徑和請求方法。它們也包含了一些可選的內容,如請求體、請求頭,或是 URL 參數。隨著 REST 的流行,除了 URL 參數,URL 本身會包含一些信息。比如,"/user/1/edit" 包含了用戶的 id 。
一個請求的每個部分都必須被識別、解析,並正確地賦值給 Request 對象的對應屬性。HTTP/1.1 是一個文本協議,事實上這簡化了很多東西。(HTTP/2 是一個二進位協議,這又是另一種樂趣了)
解析器不需要跟蹤狀態,因此 http_parser
模塊其實就是一組函數。調用函數需要用到 Request
對象,並將它連同一個包含原始請求信息的位元組數組傳遞給 parse_into
函數。然後解析器會修改 Request
對象以及充當緩存的位元組數組。位元組數組的信息被逐漸地解析到 request 對象中。
http_parser
模塊的核心功能就是下面這個 parse_into
函數:
def parse_into(request, buffer):
_buffer = buffer[:]
if not request.method and can_parse_request_line(_buffer):
(request.method, request.path,
request.query_params) = parse_request_line(_buffer)
remove_request_line(_buffer)
if not request.headers and can_parse_headers(_buffer):
request.headers = parse_headers(_buffer)
if not has_body(request.headers):
request.finished = True
remove_intro(_buffer)
if not request.finished and can_parse_body(request.headers, _buffer):
request.body_raw, request.body = parse_body(request.headers, _buffer)
clear_buffer(_buffer)
request.finished = True
return _buffer
從上面的代碼中可以看到,我把解析的過程分為三個部分:解析請求行(這行像這樣:GET /resource HTTP/1.1
),解析請求頭以及解析請求體。
請求行包含了 HTTP 請求方法以及 URL 地址。而 URL 地址則包含了更多的信息:路徑、url 參數和開發者自定義的 url 參數。解析請求方法和 URL 還是很容易的 - 合適地分割字元串就好了。函數 urlparse.parse
可以用來解析 URL 參數。開發者自定義的 URL 參數可以通過正則表達式來解析。
接下來是 HTTP 頭部。它們是一行行由鍵值對組成的簡單文本。問題在於,可能有多個 HTTP 頭有相同的名字,卻有不同的值。一個值得關注的 HTTP 頭部是 Content-Length
,它描述了請求體的位元組長度(不是整個請求,僅僅是請求體)。這對於決定是否解析請求體有很重要的作用。
最後,解析器根據 HTTP 方法和頭部來決定是否解析請求體。
路由!
在某種意義上,路由就像是連接框架和用戶的橋樑,用戶用合適的方法創建 Router
對象並為其設置路徑/函數對,然後將它賦值給 App 對象。而 App 對象依次調用 get_handler
函數生成相應的回調函數。簡單來說,路由就負責兩件事,一是存儲路徑/函數對,二是返回需要的路徑/函數對
Router
類中有兩個允許最終開發者添加路由的方法,分別是 add_routes
和 add_route
。因為 add_routes
就是 add_route
函數的一層封裝,我們將主要講解 add_route
函數:
def add_route(self, path, handler):
compiled_route = self.class.build_route_regexp(path)
if compiled_route not in self.routes:
self.routes[compiled_route] = handler
else:
raise DuplicateRoute
首先,這個函數使用 Router.build_router_regexp
的類方法,將一條路由規則(如 '/cars/{id}' 這樣的字元串),「編譯」到一個已編譯的正則表達式對象。這些已編譯的正則表達式用來匹配請求路徑,以及解析開發者自定義的 URL 參數。如果已經存在一個相同的路由,程序就會拋出一個異常。最後,這個路由/處理程序對被添加到一個簡單的字典self.routes
中。
下面展示 Router 是如何「編譯」路由的:
@classmethod
def build_route_regexp(cls, regexp_str):
"""
Turns a string into a compiled regular expression. Parses '{}' into
named groups ie. '/path/{variable}' is turned into
'/path/(?P<variable>[a-zA-Z0-9_-]+)'.
:param regexp_str: a string representing a URL path.
:return: a compiled regular expression.
"""
def named_groups(matchobj):
return '(?P<{0}>[a-zA-Z0-9_-]+)'.format(matchobj.group(1))
re_str = re.sub(r'{([a-zA-Z0-9_-]+)}', named_groups, regexp_str)
re_str = ''.join(('^', re_str, '$',))
return re.compile(re_str)
這個方法使用正則表達式將所有出現的 {variable}
替換為 (?P<variable>)
。然後在字元串頭尾分別添加 ^
和 $
標記,最後編譯正則表達式對象。
完成了路由存儲僅成功了一半,下面是如何得到路由對應的函數:
def get_handler(self, path):
logger.debug('Getting handler for: {0}'.format(path))
for route, handler in self.routes.items():
path_params = self.class.match_path(route, path)
if path_params is not None:
logger.debug('Got handler for: {0}'.format(path))
wrapped_handler = HandlerWrapper(handler, path_params)
return wrapped_handler
raise NotFoundException()
一旦 App
對象獲得一個 Request
對象,也就獲得了 URL 的路徑部分(如 /users/15/edit)。然後,我們需要匹配函數來生成一個響應或者 404 錯誤。get_handler
函數將路徑作為參數,循環遍歷路由,對每條路由調用 Router.match_path
類方法檢查是否有已編譯的正則對象與這個請求路徑匹配。如果存在,我們就調用 HandleWrapper
來包裝路由對應的函數。path_params
字典包含了路徑變數(如 '/users/15/edit' 中的 '15'),若路由沒有指定變數,字典就為空。最後,我們將包裝好的函數返回給 App
對象。
如果遍歷了所有的路由都找不到與路徑匹配的,函數就會拋出 NotFoundException
異常。
這個 Route.match
類方法挺簡單:
def match_path(cls, route, path):
match = route.match(path)
try:
return match.groupdict()
except AttributeError:
return None
它使用正則對象的 match 方法來檢查路由是否與路徑匹配。若果不匹配,則返回 None 。
最後,我們有 HandleWraapper
類。它的唯一任務就是封裝一個非同步函數,存儲 path_params
字典,並通過 handle
方法對外提供一個統一的介面。
class HandlerWrapper(object):
def init(self, handler, path_params):
self.handler = handler
self.path_params = path_params
self.request = None
async def handle(self, request):
return await self.handler(request, **self.path_params)
組合到一起
框架的最後部分就是用 App
類把所有的部分聯繫起來。
App
類用於集中所有的配置細節。一個 App
對象通過其 start_server
方法,使用一些配置數據創建一個 HTTPServer
的實例,然後將它傳遞給 asyncio.start_server 函數。asyncio.start_server
函數會對每一個 TCP 連接調用 HTTPServer
對象的 handle_connection
方法。
def start_server(self):
if not self._server:
self.loop = asyncio.get_event_loop()
self._server = HTTPServer(self.router, self.http_parser, self.loop)
self._connection_handler = asyncio.start_server(
self._server.handle_connection,
host=self.host,
port=self.port,
reuse_address=True,
reuse_port=True,
loop=self.loop)
logger.info('Starting server on {0}:{1}'.format(
self.host, self.port))
self.loop.run_until_complete(self._connection_handler)
try:
self.loop.run_forever()
except KeyboardInterrupt:
logger.info('Got signal, killing server')
except DiyFrameworkException as e:
logger.error('Critical framework failure:')
logger.error(e.traceback)
finally:
self.loop.close()
else:
logger.info('Server already started - {0}'.format(self))
總結
如果你查看源碼,就會發現所有的代碼僅 320 余行(包括測試代碼的話共 540 余行)。這麼少的代碼實現了這麼多的功能,讓我有點驚訝。這個框架沒有提供模板、身份認證以及資料庫訪問等功能(這些內容也很有趣哦)。這也讓我知道,像 Django 和 Tornado 這樣的框架是如何工作的,而且我能夠快速地調試它們了。
這也是我按照測試驅動開發完成的第一個項目,整個過程有趣而有意義。先編寫測試用例迫使我思考設計和架構,而不僅僅是把代碼放到一起,讓它們可以運行。不要誤解我的意思,有很多時候,後者的方式更好。不過如果你想給確保這些不怎麼維護的代碼在之後的幾周甚至幾個月依然工作,那麼測試驅動開發正是你需要的。
我研究了下整潔架構以及依賴注入模式,這些充分體現在 Router
類是如何作為一個更高層次的抽象的(實體?)。Router
類是比較接近核心的,像 http_parser
和 App
的內容比較邊緣化,因為它們只是完成了極小的字元串和位元組流、或是中層 IO 的工作。測試驅動開發(TDD)迫使我獨立思考每個小部分,這使我問自己這樣的問題:方法調用的組合是否易於理解?類名是否準確地反映了我正在解決的問題?我的代碼中是否很容易區分出不同的抽象層?
來吧,寫個小框架,真的很有趣:)
via: http://mattscodecave.com/posts/simple-python-framework-from-scratch.html
(題圖來自:es-static.us)
本文轉載來自 Linux 中國: https://github.com/Linux-CN/archive