目錄

廣告 AD

Python 中 Process 之間的通訊與同步:探索多行程間的互動

Process 之間的溝通、同步、資料共享都是個大學問

這邊就整理一下 Python 裡面 Process 互動的方法!

廣告 AD

我們都知道要在 Python 開啟一個 Process 就像這樣:

Python

Process(target=f, args=(value)).start()

但你知道要如何讓多個 Process 之間作互動嗎?接著看下去吧。


有時候我們會需要兩個或多個 Process 之間互相傳遞資料,在 Python 中 Process 之間的資訊傳遞有很多種方法,下面就整理一下我收集到的資訊,並且寫了個簡單的小範例。


multiprocessing.Queue 是 thread safe 和 process safe,功能跟一般的 Queue 很像,可以傳入容量的大小,預設為無限。

Python Document - Queue

Python

queue = multiprocessing.Queue(5)

以下為範例:

Python

from multiprocessing import Process, Queue

def f(queue):
    val = queue.get()
    queue.put(val + 1)

if __name__ == '__main__':
    queue = Queue()
    queue.put(3)
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()
    print(queue.get())

輸出:

Shell

4

Simple Queue 和 Queue 很像,一樣是 thread safe 和 process safe,有點像是有 lock 的 Pipe,但缺少了部分的功能,也無法設置容量上限,但效率上會優於 Queue,以下為範例:

Python Document - SimpleQueue

Python

from multiprocessing import Process, SimpleQueue

def f(queue):
    val = queue.get()
    queue.put(val + 1)

if __name__ == '__main__':
    queue = SimpleQueue()
    queue.put(3)
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()
    print(queue.get())

輸出:

Shell

4

multiprocessing.Pipe() 會回傳一對的連線 object,用來表示一個管道的兩端,預設上會是雙向的溝通,可以傳參數改成單向的傳遞。

Python Document - Pipe

Python

# 單向溝通
pipe_recv, pipe_send = multiprocessing.Pipe(duplex = False)

另外 Pipe 可以傳遞不同類型的資訊,以下為範例:

Python

from multiprocessing import Process, Pipe

def f(pipe_child):
    pipe_child.send(pipe_child.recv() + 1)
    pipe_child.send('test')

if __name__ == '__main__':
    pipe_parent, pipe_child = Pipe()
    pipe_parent.send(2)
    p = Process(target=f, args=(pipe_child,))
    p.start()
    p.join()
    print(pipe_parent.recv())
    print(pipe_parent.recv())

輸出:

Shell

3
test
Note
要注意不能從 Pipe 的同一邊同時讀和寫,有可能會造成資料的損壞。

如果說要控制 Process 之間的順序,或是限制 Process 之間對檔案存取的數量,你可以透過 Lock、Condition 之類的類型來達成。


Lock 用 acquire 來鎖定,用 release 來釋放,或是可以使用 with 來自動處理鎖定和釋放。

Python Document - Lock

Python

from multiprocessing import Process, Lock, Value

def f(lock, total, val):
    with lock:
        total.value += val

if __name__ == '__main__':
    lock = Lock()
    total = Value('i', 0)

    Proc = []
    for num in range(5):
        p = Process(target=f, args=(lock, total, num))
        Proc.append(p)
        p.start()
    for p in Proc:
        p.join()
    print(total.value)

輸出:

Shell

10

Semaphore 可以設定初始數值,並且透過 acquirerelease 來減少數值或是釋放,也可以用 with

Python Document - Semaphore

Python

from multiprocessing import Process, Semaphore
import time

def f(sem, val):
    with sem:
        print(val)
        time.sleep(1)

if __name__ == '__main__':
    sem = Semaphore(3)
    sem.acquire()
    for num in range(5):
        Process(target=f, args=(sem, num)).start()
    time.sleep(1)
    print('start')
    sem.release()

輸出:

Shell

0
1
start
3
2
4

Barrier 會先設定總共有幾個 Process,每個 Process 會呼叫 .wait() 並等待,直到所有的 Process 都呼叫完後,會統一 release 所有 Process 並繼續執行。通常都是拿來等待所有的 Process 執行完畢後,繼續執行下一步驟。

Python Document - Barrier

Python

from multiprocessing import Process, Barrier
import time

def f(barrier, val):
    start = time.time()
    time.sleep(val)
    barrier.wait()
    end = time.time()
    print(f'{val} - wait for {end - start} seconds')

if __name__ == '__main__':
    barrier = Barrier(5)

    for i in range(5):
        Process(target=f, args=(barrier, i)).start()

輸出:

Shell

0 - wait for 4.1258227825164795 seconds
2 - wait for 4.119823455810547 seconds
1 - wait for 4.124821662902832 seconds
3 - wait for 4.118821859359741 seconds
4 - wait for 4.117821931838989 seconds

Condition 透過 .wait().notify() 來控制 Process 之間的同步,通常用於 producer 和 consumer 的情境。在使用 .wait().notify() 時都要獲取到底層的 lock,可以透過 .acquire() 鎖定和 .release() 釋放,也可以使用 with.wait() 會等待其他 Process 呼叫 .notify(),在等待期間會自動釋放鎖,並在其他 Process 呼叫 .notify() 後,重新鎖定回去。.notify() 可以指定通知幾個 Process,或是使用 .notify_all() 通知全部在等待的 Process。

Python Document - Condition

Python

from multiprocessing import Process, Condition
import time

def f(cond, val):
    cond.acquire()
    cond.wait()
    print(val)
    cond.release()

if __name__ == '__main__':
    cond = Condition()

    for i in range(5):
        Process(target=f, args=(cond, i)).start()
    
    time.sleep(1)
    cond.acquire()
    print('Release 1 process')
    cond.notify(1)
    cond.release()

    time.sleep(1)
    cond.acquire()
    print('Release 2 processes')
    cond.notify(2)
    cond.release()

    time.sleep(1)
    cond.acquire()
    print('Release all processes')
    cond.notify_all()
    cond.release()

輸出:

Shell

Release 1 process
0
Release 2 processes
1
2
Release all processes
3
4

Event 通常拿來通知其他 Process 要等待,直到 flag 被設為 True,等待的話使用 .wait(),設定 flag 為 True 使用 .set(),清除則為 .clear()

Python Document - Event

Python

from multiprocessing import Process, Event
import time

def f(event, val):
    event.wait()
    print(val)

if __name__ == '__main__':
    event = Event()

    Process(target=f, args=(event, 0)).start()
    print('set')
    event.set()
    Process(target=f, args=(event, 1)).start()
    Process(target=f, args=(event, 2)).start()
    time.sleep(1)
    print('clear')
    event.clear()
    Process(target=f, args=(event, 3)).start()
    Process(target=f, args=(event, 4)).start()
    print('set')
    event.set()

輸出:

Shell

set
1
0
2
clear
set
3
4

除了資料傳遞之外,有時候 Process 之間會想要共同存取某個變數,這時候就會需要用到這個功能,以下也有幾個方法供參考,也都有附上小範例。


使用 Shared Memory 來與其他 process 分享 objects。

  • Value: 儲存在 shared memory 的 ctypes object
  • Array: 儲存在 shared memory 的 ctypes array

使用上需要傳入指定的型態,可以是 Type code 或是 ctype 的型態,Type code 的定義如下:

Source

Type codeC Type
‘b’signed char
‘B’unsigned char
‘u’wchar_t
‘h’signed short
‘H’unsigned short
‘i’signed int
‘I’unsigned int
’l'signed long
‘L’unsigned long
‘q’signed long long
‘Q’unsigned long long
‘f’float
’d'double

或是直接給予 ctype 的型態:

ctpyes list

Python

from ctypes import c_double
Array(c_double, range(5))

Python Document - Value

  • 取數值的話要用 .value
  • 可以在型態參數後面給予初始值

Python Document - Array

  • Array 存取與一般 list 無異,使用 []
  • 可以在型態參數後面給予長度或是 initializer

範例:

Python

from multiprocessing import Process, Value, Array
from ctypes import c_double

def f(num, arr):
    with num.get_lock():
        num.value += 10
    with arr.get_lock():
        arr[2] -= -1

if __name__ == '__main__':
    num = Value('i', 0)
    arr = Array(c_double, range(5))
    proc = Process(target=f, args=(num, arr))
    proc.start()
    proc.join()
    print(num.value)
    print(arr[:])

輸出:

Shell

10
[0.0, 1.0, 3.0, 3.0, 4.0]

Manager object 會控制一個 server process 來管理所有的 shared objects,並且讓其他的 process 可以操作這些 shared objects,甚至可以與不同機器上的 process 共享這些 objects。

目前 Manager 支持多種類型:list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array。

Python Document - Manager

範例:

Python

from multiprocessing import Process, Manager

def f(d, l):
    d['test'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

輸出:

Shell

{'test': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

以上都是使用 ctypes 的型態或是 Python 內建的型態,這邊介紹一個如何使用自訂的 class 的方法,透過繼承 BaseManager,和使用 register(),來得以創建自訂的型態,以下為範例:

Python

from multiprocessing import Process, Lock
from multiprocessing.managers import BaseManager, NamespaceProxy
import types

class SimpleClass(object):
    def __init__(self):
        self.var = 0
    def set(self, val):
        self.var = val
    def get(self):
        return self.var

class MyManager(BaseManager):
    pass

class SimpleProxy(NamespaceProxy):
    _exposed_ = tuple(dir(SimpleClass))

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            def wrapper(*args, **kwargs):
                return self._callmethod(name, args, kwargs)
            return wrapper
        return result

def f(obj, val, lock):
    with lock:
        obj.set(obj.get() + val)

if __name__ == '__main__':
    MyManager.register('SimpleClass', SimpleClass, SimpleProxy)
    with MyManager() as manager:
        obj = manager.SimpleClass()
        lock = Lock()

        procs = [None] * 10
        for i in range(10):
            procs[i] = Process(target=f, args=[obj, i, lock])
            procs[i].start()
        for i in range(10):
            procs[i].join()

        print(obj.var)

輸出:

Shell

45

我們先是自訂一個 class - SimpleClass,就是我們用來共享的 class,大家可以自由定義,範例中 class 裡面包含了一個 attribute 和 set 和 get 的 method。

Python

class SimpleClass(object):
    def __init__(self):
        self.var = 0
    def set(self, val):
        self.var = val
    def get(self):
        return self.var

接著自訂一個 MyManager 並繼承自 BaseManager,我們自訂的 SimpleClass 等等就會註冊在 MyManager 上面。

Python

class MyManager(BaseManager):
    pass

f 函式則是其他 Process 會執行的函示,負責將 SimpleClass 裡面的 var 數值加上 val,val 則是每個 Process 都不同,為了防止 race condition,另外加上了 lock。

Python

def f(obj, val, lock):
    with lock:
        obj.set(obj.get() + val)

有了 Manager 管理共享的 object,我們需要透過 Proxy 來存取共享的 object,但由於共享的 object 預設只會提供我們存取該 object 上的 method,無法直接存取 attribute,因此特別撰寫自訂的 Proxy - SimpleProxy,能讓我們透過 `obj.var` 來存取 attribute。

Python

class SimpleProxy(NamespaceProxy):
    _exposed_ = tuple(dir(SimpleClass))

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            def wrapper(*args, **kwargs):
                return self._callmethod(name, args, kwargs)
            return wrapper
        return result

透過 `MyManager.register` 向 MyManager 註冊我們自訂的 SimpleClass 和自訂的 SimpleProxy,接著使用 MyManager 創建我們自訂的 SimpleClass 並得到控制的 Proxy (obj)。

接著我們開啟 10 個 Process,並傳入所需的參數,最後等待所有的 Process 都執行完畢,並印出結果 45。

Note
如果沒有加上 lock 會導致輸出結果可能不是 45。

Python

if __name__ == '__main__':
    MyManager.register('SimpleClass', SimpleClass, SimpleProxy)
    with MyManager() as manager:
        obj = manager.SimpleClass()
        lock = Lock()

        procs = [None] * 10
        for i in range(10):
            procs[i] = Process(target=f, args=[obj, i, lock])
            procs[i].start()
        for i in range(10):
            procs[i].join()

        print(obj.var)


廣告 AD