Python 中 Process 之間的通訊與同步:探索多行程間的互動

Process 之間的溝通、同步、資料共享都是個大學問
這邊就整理一下 Python 裡面 Process 互動的方法!
Python Process 之間的溝通
我們都知道要在 Python 開啟一個 Process 就像這樣:
Process(target=f, args=(value)).start()
但你知道要如何讓多個 Process 之間作互動嗎?接著看下去吧。
資訊傳遞
有時候我們會需要兩個或多個 Process 之間互相傳遞資料,在 Python 中 Process 之間的資訊傳遞有很多種方法,下面就整理一下我收集到的資訊,並且寫了個簡單的小範例。
Queue
multiprocessing.Queue 是 thread safe 和 process safe,功能跟一般的 Queue 很像,可以傳入容量的大小,預設為無限。
queue = multiprocessing.Queue(5)
以下為範例:
from multiprocessing import Process, Queue
def f(queue):
val = queue.get()
queue.put(val + 1)
if __name__ == '__main__':
queue = Queue()
queue.put(3)
p = Process(target=f, args=(queue,))
p.start()
p.join()
print(queue.get())
輸出:
4
Simple Queue
Simple Queue 和 Queue 很像,一樣是 thread safe 和 process safe,有點像是有 lock 的 Pipe,但缺少了部分的功能,也無法設置容量上限,但效率上會優於 Queue,以下為範例:
from multiprocessing import Process, SimpleQueue
def f(queue):
val = queue.get()
queue.put(val + 1)
if __name__ == '__main__':
queue = SimpleQueue()
queue.put(3)
p = Process(target=f, args=(queue,))
p.start()
p.join()
print(queue.get())
輸出:
4
Pipe
multiprocessing.Pipe()
會回傳一對的連線 object,用來表示一個管道的兩端,預設上會是雙向的溝通,可以傳參數改成單向的傳遞。
# 單向溝通
pipe_recv, pipe_send = multiprocessing.Pipe(duplex = False)
另外 Pipe 可以傳遞不同類型的資訊,以下為範例:
from multiprocessing import Process, Pipe
def f(pipe_child):
pipe_child.send(pipe_child.recv() + 1)
pipe_child.send('test')
if __name__ == '__main__':
pipe_parent, pipe_child = Pipe()
pipe_parent.send(2)
p = Process(target=f, args=(pipe_child,))
p.start()
p.join()
print(pipe_parent.recv())
print(pipe_parent.recv())
輸出:
3
test
Process 之間的同步
如果說要控制 Process 之間的順序,或是限制 Process 之間對檔案存取的數量,你可以透過 Lock、Condition 之類的類型來達成。
Lock
Lock 用 acquire 來鎖定,用 release 來釋放,或是可以使用 with 來自動處理鎖定和釋放。
from multiprocessing import Process, Lock, Value
def f(lock, total, val):
with lock:
total.value += val
if __name__ == '__main__':
lock = Lock()
total = Value('i', 0)
Proc = []
for num in range(5):
p = Process(target=f, args=(lock, total, num))
Proc.append(p)
p.start()
for p in Proc:
p.join()
print(total.value)
輸出:
10
Semaphore
Semaphore 可以設定初始數值,並且透過 acquire
和 release
來減少數值或是釋放,也可以用 with
。
from multiprocessing import Process, Semaphore
import time
def f(sem, val):
with sem:
print(val)
time.sleep(1)
if __name__ == '__main__':
sem = Semaphore(3)
sem.acquire()
for num in range(5):
Process(target=f, args=(sem, num)).start()
time.sleep(1)
print('start')
sem.release()
輸出:
0
1
start
3
2
4
Barrier
Barrier 會先設定總共有幾個 Process,每個 Process 會呼叫 .wait()
並等待,直到所有的 Process 都呼叫完後,會統一 release 所有 Process 並繼續執行。通常都是拿來等待所有的 Process 執行完畢後,繼續執行下一步驟。
from multiprocessing import Process, Barrier
import time
def f(barrier, val):
start = time.time()
time.sleep(val)
barrier.wait()
end = time.time()
print(f'{val} - wait for {end - start} seconds')
if __name__ == '__main__':
barrier = Barrier(5)
for i in range(5):
Process(target=f, args=(barrier, i)).start()
輸出:
0 - wait for 4.1258227825164795 seconds
2 - wait for 4.119823455810547 seconds
1 - wait for 4.124821662902832 seconds
3 - wait for 4.118821859359741 seconds
4 - wait for 4.117821931838989 seconds
Condition
Condition 透過 .wait()
和 .notify()
來控制 Process 之間的同步,通常用於 producer 和 consumer 的情境。在使用 .wait()
和 .notify()
時都要獲取到底層的 lock,可以透過 .acquire()
鎖定和 .release()
釋放,也可以使用 with
。.wait()
會等待其他 Process 呼叫 .notify()
,在等待期間會自動釋放鎖,並在其他 Process 呼叫 .notify()
後,重新鎖定回去。.notify()
可以指定通知幾個 Process,或是使用 .notify_all()
通知全部在等待的 Process。
from multiprocessing import Process, Condition
import time
def f(cond, val):
cond.acquire()
cond.wait()
print(val)
cond.release()
if __name__ == '__main__':
cond = Condition()
for i in range(5):
Process(target=f, args=(cond, i)).start()
time.sleep(1)
cond.acquire()
print('Release 1 process')
cond.notify(1)
cond.release()
time.sleep(1)
cond.acquire()
print('Release 2 processes')
cond.notify(2)
cond.release()
time.sleep(1)
cond.acquire()
print('Release all processes')
cond.notify_all()
cond.release()
輸出:
Release 1 process
0
Release 2 processes
1
2
Release all processes
3
4
Event
Event 通常拿來通知其他 Process 要等待,直到 flag 被設為 True,等待的話使用 .wait()
,設定 flag 為 True 使用 .set()
,清除則為 .clear()
。
from multiprocessing import Process, Event
import time
def f(event, val):
event.wait()
print(val)
if __name__ == '__main__':
event = Event()
Process(target=f, args=(event, 0)).start()
print('set')
event.set()
Process(target=f, args=(event, 1)).start()
Process(target=f, args=(event, 2)).start()
time.sleep(1)
print('clear')
event.clear()
Process(target=f, args=(event, 3)).start()
Process(target=f, args=(event, 4)).start()
print('set')
event.set()
輸出:
set
1
0
2
clear
set
3
4
資料共用
除了資料傳遞之外,有時候 Process 之間會想要共同存取某個變數,這時候就會需要用到這個功能,以下也有幾個方法供參考,也都有附上小範例。
Value & Array
使用 Shared Memory 來與其他 process 分享 objects。
- Value: 儲存在 shared memory 的 ctypes object
- Array: 儲存在 shared memory 的 ctypes array
使用上需要傳入指定的型態,可以是 Type code 或是 ctype 的型態,Type code 的定義如下:
Type code | C Type |
---|---|
‘b’ | signed char |
‘B’ | unsigned char |
‘u’ | wchar_t |
‘h’ | signed short |
‘H’ | unsigned short |
‘i’ | signed int |
‘I’ | unsigned int |
’l' | signed long |
‘L’ | unsigned long |
‘q’ | signed long long |
‘Q’ | unsigned long long |
‘f’ | float |
’d' | double |
或是直接給予 ctype 的型態:
from ctypes import c_double
Array(c_double, range(5))
Value
- 取數值的話要用
.value
- 可以在型態參數後面給予初始值
Array
- Array 存取與一般 list 無異,使用
[]
- 可以在型態參數後面給予長度或是 initializer
範例:
from multiprocessing import Process, Value, Array
from ctypes import c_double
def f(num, arr):
with num.get_lock():
num.value += 10
with arr.get_lock():
arr[2] -= -1
if __name__ == '__main__':
num = Value('i', 0)
arr = Array(c_double, range(5))
proc = Process(target=f, args=(num, arr))
proc.start()
proc.join()
print(num.value)
print(arr[:])
輸出:
10
[0.0, 1.0, 3.0, 3.0, 4.0]
Manager
Manager object 會控制一個 server process 來管理所有的 shared objects,並且讓其他的 process 可以操作這些 shared objects,甚至可以與不同機器上的 process 共享這些 objects。
目前 Manager 支持多種類型:list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array。
範例:
from multiprocessing import Process, Manager
def f(d, l):
d['test'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
輸出:
{'test': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Custom Object
以上都是使用 ctypes 的型態或是 Python 內建的型態,這邊介紹一個如何使用自訂的 class 的方法,透過繼承 BaseManager
,和使用 register()
,來得以創建自訂的型態,以下為範例:
from multiprocessing import Process, Lock
from multiprocessing.managers import BaseManager, NamespaceProxy
import types
class SimpleClass(object):
def __init__(self):
self.var = 0
def set(self, val):
self.var = val
def get(self):
return self.var
class MyManager(BaseManager):
pass
class SimpleProxy(NamespaceProxy):
_exposed_ = tuple(dir(SimpleClass))
def __getattr__(self, name):
result = super().__getattr__(name)
if isinstance(result, types.MethodType):
def wrapper(*args, **kwargs):
return self._callmethod(name, args, kwargs)
return wrapper
return result
def f(obj, val, lock):
with lock:
obj.set(obj.get() + val)
if __name__ == '__main__':
MyManager.register('SimpleClass', SimpleClass, SimpleProxy)
with MyManager() as manager:
obj = manager.SimpleClass()
lock = Lock()
procs = [None] * 10
for i in range(10):
procs[i] = Process(target=f, args=[obj, i, lock])
procs[i].start()
for i in range(10):
procs[i].join()
print(obj.var)
輸出:
45
我們先是自訂一個 class - SimpleClass,就是我們用來共享的 class,大家可以自由定義,範例中 class 裡面包含了一個 attribute 和 set 和 get 的 method。
class SimpleClass(object):
def __init__(self):
self.var = 0
def set(self, val):
self.var = val
def get(self):
return self.var
接著自訂一個 MyManager 並繼承自 BaseManager,我們自訂的 SimpleClass 等等就會註冊在 MyManager 上面。
class MyManager(BaseManager):
pass
f 函式則是其他 Process 會執行的函示,負責將 SimpleClass 裡面的 var 數值加上 val,val 則是每個 Process 都不同,為了防止 race condition,另外加上了 lock。
def f(obj, val, lock):
with lock:
obj.set(obj.get() + val)
有了 Manager 管理共享的 object,我們需要透過 Proxy 來存取共享的 object,但由於共享的 object 預設只會提供我們存取該 object 上的 method,無法直接存取 attribute,因此特別撰寫自訂的 Proxy - SimpleProxy,能讓我們透過 `obj.var` 來存取 attribute。
class SimpleProxy(NamespaceProxy):
_exposed_ = tuple(dir(SimpleClass))
def __getattr__(self, name):
result = super().__getattr__(name)
if isinstance(result, types.MethodType):
def wrapper(*args, **kwargs):
return self._callmethod(name, args, kwargs)
return wrapper
return result
透過 `MyManager.register` 向 MyManager 註冊我們自訂的 SimpleClass 和自訂的 SimpleProxy,接著使用 MyManager 創建我們自訂的 SimpleClass 並得到控制的 Proxy (obj)。
接著我們開啟 10 個 Process,並傳入所需的參數,最後等待所有的 Process 都執行完畢,並印出結果 45。
if __name__ == '__main__':
MyManager.register('SimpleClass', SimpleClass, SimpleProxy)
with MyManager() as manager:
obj = manager.SimpleClass()
lock = Lock()
procs = [None] * 10
for i in range(10):
procs[i] = Process(target=f, args=[obj, i, lock])
procs[i].start()
for i in range(10):
procs[i].join()
print(obj.var)
Reference
- https://stackoverflow.com/questions/62223424/simplequeue-vs-queue-in-python-what-is-the-advantage-of-using-simplequeue
- https://stackoverflow.com/questions/28267972/python-multiprocessing-locks
- https://stackoverflow.com/questions/10415028/how-can-i-get-the-return-value-of-a-function-passed-to-multiprocessing-process
- https://docs.python.org/zh-tw/3/library/ctypes.html#fundamental-data-types
- https://docs.python.org/3/library/threading.html
- https://docs.python.org/3/library/multiprocessing.html
- https://stackoverflow.com/questions/74773700/is-there-a-way-to-sync-a-serializable-structure-with-python-multiprocessing
- https://stackoverflow.com/questions/26499548/accessing-an-attribute-of-a-multiprocessing-proxy-of-a-class
- https://stackoverflow.com/questions/3671666/sharing-a-complex-object-between-processes
- https://stackoverflow.com/questions/29788809/python-how-to-pass-an-autoproxy-object
- https://www.studytonight.com/python/python-threading-condition-object
如果你覺得這篇文章有用 可以考慮贊助飲料給大貓咪