Python 中,GIL(全局解釋器鎖)導(dǎo)致單進程無法充分利用多核 CPU,而多進程通過獨立解釋器與內(nèi)存空間,可實現(xiàn)真正并行計算,適用于數(shù)據(jù)加密、模型訓(xùn)練等 CPU 密集型任務(wù)。小編將詳解多進程創(chuàng)建方法與進程間數(shù)據(jù)共享方案,助你高效處理高負載任務(wù)。
一、Python 多進程的 3 種創(chuàng)建方法
Python 主要通過multiprocessing庫與concurrent.futures模塊實現(xiàn)多進程,不同方法適配不同任務(wù)規(guī)模。
(一)Process 類:創(chuàng)建單個進程
適用于任務(wù)數(shù)量少的場景,需實例化Process指定目標函數(shù)與參數(shù),調(diào)用start()啟動,join()等待結(jié)束。Windows 系統(tǒng)需在if __name__ == "__main__":下執(zhí)行,避免進程重復(fù)創(chuàng)建。
示例:
TypeScript取消自動換行復(fù)制
(二)Pool 類:進程池批量處理
任務(wù)量大時,Pool預(yù)先創(chuàng)建固定數(shù)量進程(默認等于 CPU 核心數(shù)),循環(huán)復(fù)用減少資源消耗。通過apply_async()異步提交任務(wù),close()關(guān)閉池后用join()等待結(jié)果。
示例:
TypeScript取消自動換行復(fù)制
def square(num):
return num * num
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4) # 4核CPU設(shè)4進程
tasks = range(1, 11)
results = [pool.apply_async(square, (n,)) for n in tasks]
pool.close()
pool.join()
print([r.get() for r in results]) # 獲取結(jié)果
(三)ProcessPoolExecutor:簡潔管理(推薦)
Python 3.2 + 引入的ProcessPoolExecutor封裝底層細節(jié),支持with語句自動管理池生命周期,map()批量獲取結(jié)果,代碼更簡潔。
示例:
TypeScript取消自動換行復(fù)制
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = range(1, 11)
results = list(executor.map(square, tasks)) # 直接獲取結(jié)果
print(results)
二、進程間數(shù)據(jù)共享的 3 種方案
進程內(nèi)存隔離,需通過專門機制實現(xiàn)數(shù)據(jù)交互,不同方案適配不同通信場景。
(一)隊列(Queue):安全的多對多通信
基于消息傳遞,支持多進程并發(fā)讀寫,自動處理同步,適合 “生產(chǎn)者 - 消費者” 場景。put()添加數(shù)據(jù),get()獲取數(shù)據(jù),避免數(shù)據(jù)競爭。
示例:
TypeScript取消自動換行復(fù)制
queue = multiprocessing.Queue()
def producer(queue):
for i in range(3):
queue.put(f"數(shù)據(jù){i}")
def consumer(queue):
while not queue.empty():
print(f"處理:{queue.get()}")
if __name__ == "__main__":
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p1.join()
p2.start()
p2.join()
(二)管道(Pipe):高效的一對一通信
創(chuàng)建兩個連接對象實現(xiàn)雙向通信,速度比隊列快,但僅支持兩個進程交互。send()發(fā)送數(shù)據(jù),recv()接收數(shù)據(jù),適合簡單雙向通信。
示例:
TypeScript取消自動換行復(fù)制
conn_a, conn_b = multiprocessing.Pipe()
def process1(conn):
conn.send("請求數(shù)據(jù)")
print(f"接收響應(yīng):{conn.recv()}")
def process2(conn):
print(f"接收請求:{conn.recv()}")
conn.send("返回數(shù)據(jù)")
if __name__ == "__main__":
p1 = multiprocessing.Process(target=process1, args=(conn_a,))
p2 = multiprocessing.Process(target=process2, args=(conn_b,))
p1.start()
p2.start()
(三)共享內(nèi)存(Value/Array):高效共享簡單數(shù)據(jù)
Value共享單個值(如計數(shù)器),Array共享數(shù)組,基于操作系統(tǒng)共享內(nèi)存機制,效率最高。需用Lock加鎖避免多進程同時修改導(dǎo)致的數(shù)據(jù)競爭。
示例:
TypeScript取消自動換行復(fù)制
counter = multiprocessing.Value('i', 0) # 'i'表示int類型
lock = multiprocessing.Lock()
def increment(counter, lock):
with lock: # 加鎖確保原子操作
counter.value += 1
if __name__ == "__main__":
processes = [multiprocessing.Process(target=increment, args=(counter, lock)) for _ in range(5)]
[p.start() for p in processes]
[p.join() for p in processes]
print(f"最終計數(shù):{counter.value}") # 輸出5
多進程創(chuàng)建需按需選擇:單進程用Process,批量任務(wù)用Pool或ProcessPoolExecutor(推薦);數(shù)據(jù)共享中,隊列安全、管道高效、共享內(nèi)存適合簡單數(shù)據(jù)。實際開發(fā)中,需結(jié)合任務(wù)規(guī)模與通信需求選擇方案,確保兼顧效率與安全性,充分發(fā)揮多核 CPU 優(yōu)勢。