Python 中最快解壓 zip 文件的方法
假設現在的上下文(LCTT 譯註:context,計算機術語,此處意為業務情景)是這樣的:一個 zip 文件被上傳到一個Web 服務中,然後 Python 需要解壓這個 zip 文件然後分析和處理其中的每個文件。這個特殊的應用查看每個文件各自的名稱和大小,並和已經上傳到 AWS S3 上的文件進行比較,如果文件(和 AWS S3 上的相比)有所不同或者文件本身更新,那麼就將它上傳到 AWS S3。
挑戰在於這些 zip 文件太大了。它們的平均大小是 560MB 但是其中一些大於 1GB。這些文件中大多數是文本文件,但是其中同樣也有一些巨大的二進位文件。不同尋常的是,每個 zip 文件包含 100 個文件但是其中 1-3 個文件卻佔據了多達 95% 的 zip 文件大小。
最開始我嘗試在內存中解壓文件,並且每次只處理一個文件。在各種內存爆炸和 EC2 耗盡內存的情況下,這個方法壯烈失敗了。我覺得這個原因是這樣的。最開始你有 1GB 文件在內存中,然後你現在解壓每個文件,在內存中大約就要佔用 2-3GB。所以,在很多次測試之後,解決方案是將這些 zip 文件複製到磁碟上(在臨時目錄 /tmp
中),然後遍歷這些文件。這次情況好多了但是我仍然注意到了整個解壓過程花費了巨量的時間。是否可能有方法優化呢?
原始函數
首先是下面這些模擬對 zip 文件中文件實際操作的普通函數:
def _count_file(fn):
with open(fn, 'rb') as f:
return _count_file_object(f)
def _count_file_object(f):
# Note that this iterates on 'f'.
# You *could* do 'return len(f.read())'
# which would be faster but potentially memory
# inefficient and unrealistic in terms of this
# benchmark experiment.
total = 0
for line in f:
total += len(line)
return total
這裡是可能最簡單的另一個函數:
def f1(fn, dest):
with open(fn, 'rb') as f:
zf = zipfile.ZipFile(f)
zf.extractall(dest)
total = 0
for root, dirs, files in os.walk(dest):
for file_ in files:
fn = os.path.join(root, file_)
total += _count_file(fn)
return total
如果我更仔細地分析一下,我將會發現這個函數花費時間 40% 運行 extractall
,60% 的時間在遍歷各個文件並讀取其長度。
第一步嘗試
我的第一步嘗試是使用線程。先創建一個 zipfile.ZipFile
的實例,展開其中的每個文件名,然後為每一個文件開始一個線程。每個線程都給它一個函數來做「實質工作」(在這個基準測試中,就是遍歷每個文件然後獲取它的名稱)。實際業務中的函數進行的工作是複雜的 S3、Redis 和 PostgreSQL 操作,但是在我的基準測試中我只需要製作一個可以找出文件長度的函數就好了。線程池函數:
def f2(fn, dest):
def unzip_member(zf, member, dest):
zf.extract(member, dest)
fn = os.path.join(dest, member.filename)
return _count_file(fn)
with open(fn, 'rb') as f:
zf = zipfile.ZipFile(f)
futures = []
with concurrent.futures.ThreadPoolExecutor() as executor:
for member in zf.infolist():
futures.append(
executor.submit(
unzip_member,
zf,
member,
dest,
)
)
total = 0
for future in concurrent.futures.as_completed(futures):
total += future.result()
return total
結果:加速 ~10%
第二步嘗試
所以可能是 GIL(LCTT 譯註:Global Interpreter Lock,一種全局鎖,CPython 中的一個概念)阻礙了我。最自然的想法是嘗試使用多線程在多個 CPU 上分配工作。但是這樣做有缺點,那就是你不能傳遞一個非可 pickle 序列化的對象(LCTT 譯註:意為只有可 pickle 序列化的對象可以被傳遞),所以你只能發送文件名到之後的函數中:
def unzip_member_f3(zip_filepath, filename, dest):
with open(zip_filepath, 'rb') as f:
zf = zipfile.ZipFile(f)
zf.extract(filename, dest)
fn = os.path.join(dest, filename)
return _count_file(fn)
def f3(fn, dest):
with open(fn, 'rb') as f:
zf = zipfile.ZipFile(f)
futures = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for member in zf.infolist():
futures.append(
executor.submit(
unzip_member_f3,
fn,
member.filename,
dest,
)
)
total = 0
for future in concurrent.futures.as_completed(futures):
total += future.result()
return total
結果: 加速 ~300%
這是作弊
使用處理器池的問題是這樣需要存儲在磁碟上的原始 .zip
文件。所以為了在我的 web 伺服器上使用這個解決方案,我首先得要將內存中的 zip 文件保存到磁碟,然後調用這個函數。這樣做的代價我不是很清楚但是應該不低。
好吧,再翻翻看又沒有損失。可能,解壓過程加速到足以彌補這樣做的損失了吧。
但是一定記住!這個優化取決於使用所有可用的 CPU。如果一些其它的 CPU 需要執行在 gunicorn
中的其它事務呢?這時,這些其它進程必須等待,直到有 CPU 可用。由於在這個伺服器上有其他的事務正在進行,我不是很確定我想要在進程中接管所有其他 CPU。
結論
一步一步地做這個任務的這個過程感覺挺好的。你被限制在一個 CPU 上但是表現仍然特別好。同樣地,一定要看看在f1
和 f2
兩段代碼之間的不同之處!利用 concurrent.futures
池類你可以獲取到允許使用的 CPU 的個數,但是這樣做同樣給人感覺不是很好。如果你在虛擬環境中獲取的個數是錯的呢?或者可用的個數太低以致無法從負載分配獲取好處並且現在你僅僅是為了移動負載而支付營運開支呢?
我將會繼續使用 zipfile.ZipFile(file_buffer).extractall(temp_dir)
。這個工作這樣做已經足夠好了。
想試試手嗎?
我使用一個 c5.4xlarge
EC2 伺服器來進行我的基準測試。文件可以從此處下載:
wget https://www.peterbe.com/unzip-in-parallel/hack.unzip-in-parallel.py
wget https://www.peterbe.com/unzip-in-parallel/symbols-2017-11-27T14_15_30.zip
這裡的 .zip
文件有 34MB。和在伺服器上的相比已經小了很多。
hack.unzip-in-parallel.py
文件里是一團糟。它包含了大量可怕的修正和醜陋的代碼,但是這只是一個開始。
via: https://www.peterbe.com/plog/fastest-way-to-unzip-a-zip-file-in-python
本文轉載來自 Linux 中國: https://github.com/Linux-CN/archive