Python/Process
Python中的進程類使用的是 multiprocessing 模塊,該模塊的API大部分複製了 threading 模塊的API。
每一個進程啟動時都會最先產生一個線程,即主線程。主線程所在的進程為主進程。使用多進程的時候需要特別注意,必須要有 if __name__ == '__main__':, 該語句下的代碼相當於主進程,沒有該語句會報錯。
multiprocessing 模塊的API
編輯import multiprocessing
print('子进程的列表:{}'.format(multiprocessing.active_children()))
print('电脑的CPU数量:{}'.format(multiprocessing.cpu_count()))
print('现在运行的进程:{}'.format(multiprocessing.current_process()))
創建線程
編輯創建線程的方法有兩種,一種是直接使用 multiprocessing 模塊裡面的類來進行創建,一種是繼承 multiprocessing 模塊的類寫一個類來對線程進行創建。
直接創建線程
編輯直接從 multiprocessing.Process 繼承創建一個新的子類,並實例化後調用 start() 方法啟動新進程,即相當於它調用了進程的 run() 方法。
該方法的參數如下:
Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- target 指要創建的進程的方法名
- name 指給此進程命名,命名後可以調用 multiprocessing.current_process().name 方法輸出該進程的名字
- args/kwargs 指 target 指向的方法需要傳遞的參數,必須是元組形式,如果只有一個參數,需要以添加逗號。
- daemon參數設置為True,則進程為守護進程。如果主進程結束了但該守護進程沒有運行完,守護進程就會被強制結束。
創建一個新進程,每隔兩秒對傳入的數加2,而主進程每隔1秒對傳入的數加1,代碼示例如下:
import multiprocessing
from multiprocessing import Process
import time
def process1(num):
while True:
num += 2
print('{} is running >> {}'.format(multiprocessing.current_process().name, num))
time.sleep(2)
if __name__ == '__main__':
# 设置进程
new_pro = Process(target=process1, name='Add2', args=(100,))
# 进程开始
new_pro.start()
n = 0
while True:
n += 1
print('{} is running >> {}'.format(multiprocessing.current_process().name, n))
time.sleep(1)
繼承創建
編輯繼承Process定義進程類,重寫run方法。
import multiprocessing
from multiprocessing import Process
import time
# 继承进程类
class MyProcess(Process):
def __init__(self, num):
super().__init__() # 必须调用父类的初始化方法
self.num = num
def run(self) -> None:
while True:
self.num += 2
print('{} is running >> {}'.format(multiprocessing.current_process().name, self.num))
time.sleep(2)
if __name__ == '__main__':
new_pro = MyProcess(100)
# 设置进程名字
new_pro.name = "Add2"
new_pro.start()
n = 0
while True:
n += 1
print('{} is running >> {}'.format(multiprocessing.current_process().name, n))
time.sleep(1)
Process的方法
編輯join方法會阻塞除了被join外的所有進程。參數為阻塞timeout秒數。如果參數為None,阻塞至被join的進程終止。
進程鎖
編輯lock = multiprocessing.Lock()
進程通信
編輯進程之間不共享數據。如果進程之間需要通信,則要用到 Queue 模塊或者 Pipe 模塊或共享內存來實現。
Queue
編輯任何一個子進程都可以對Queue進行存(put)和取(get)。
- multiprocessing.Queue([maxsize])
- put(obj[, block[, timeout]]) 將 obj 放入隊列。如果可選參數 block 是 True (默認值) 而且 timeout 是 None (默認值), 將會阻塞當前進程,直到有空的緩衝槽。如果 timeout 是正數,將會在阻塞了最多 timeout 秒之後還是沒有可用的緩衝槽時拋出 queue.Full 異常。反之 (block 是 False 時),僅當有可用緩衝槽時才放入對象,否則拋出 queue.Full 異常 (在這種情形下 timeout 參數會被忽略)。
- get([block[, timeout]]) 從隊列中取出並返回對象。如果可選參數 block 是 True (默認值) 而且 timeout 是 None (默認值), 將會阻塞當前進程,直到隊列中出現可用的對象。如果 timeout 是正數,將會在阻塞了最多 timeout 秒之後還是沒有可用的對象時拋出 queue.Empty 異常。反之 (block 是 False 時),僅當有可用對象能夠取出時返回,否則拋出 queue.Empty 異常 (在這種情形下 timeout 參數會被忽略)。
Pipe
編輯Pipe只提供兩個端點,只允許兩個子進程進行存(send)和取(recv)。Pipe實現了兩個子進程之間的通信。
from multiprocessing import Pool
def main():
with (Pool() as pool):
pool.starmap(process_function, [param for param in range(25)])
Value
編輯Value數據共享類最多能夠共享一個值,該函數的參數如下:
Value(typecode_or_type, args, lock=True)
上述方法中,參數 typecode_or_type 定義 ctypes() 對象的類型,可以傳 Type code 或 C Type,具體對照表見下文。args 傳遞給 typecode_or_type 構造函數的參數,lock 默認為True,創建一個互斥鎖來限制對Value對象的訪問,如果傳入一個鎖,如Lock或RLock的實例,將用於同步。如果傳入False,Value的實例就不會被鎖保護,它將不是進程安全的。
Type code | C Type | Python Type | Minimum size in bytes |
---|---|---|---|
'b' | signed char | int | 1 |
'B' | unsigned char | int | 1 |
'u' | Py_UNICODE | Unicode character | 2 |
'h' | signed short | int | 2 |
'H' | unsigned short | int | 2 |
'i' | signed int | int | 2 |
'I' | unsigned int | int | 2 |
'l' | signed long | int | 4 |
'L' | unsigned long | int | 4 |
'q' | signed long long | int | 8 |
'Q' | unsigned long long | int | 8 |
'f' | float | float | 4 |
'd' | double | float | 8 |
例如下面我們在一個進程中傳入值並對其進行改動,另一個進程輸出傳入的值,代碼如下:
from multiprocessing import Process, Value
def process1(n):
n.value = 101
def process2(n):
print('改变后的参数 = {}'.format(n.value))
if __name__ == '__main__':
# 初始化value
num = Value('d', 0)
new_pro1 = Process(target=process1, args=(num,))
new_pro1.start()
new_pro1.join()
new_pro2 = Process(target=process2, args=(num,))
new_pro2.start()
Array
編輯上述的 Value 類只能傳遞一個參數,但是 Array 可以傳遞很多的參數,該方法參數如下:
Array(typecode_or_type, size_or_initializer, **kwds[, lock])
typecode_or_type 參數與上面的相同,同樣參照上表,size_or_initializer 如果它是一個整數,那麼它確定數組的長度,並且數組將被初始化為0。否則,size_or_initializer 是用於初始化數組的序列,其長度決定數組的長度。kwds 是傳遞給 typecode_or_type 構造函數的參數,lock 參數與上面的相同。
例如,我們傳入一個0數組給一個進程,然後在另一個進程中計算其和,代碼如下:
from multiprocessing import Process, Array
def process1(n):
n[3] = 0
n[4] = 0
def process2(n):
# 从队列中得到数据
print('数组的和为 = {}'.format(sum(n)))
if __name__ == '__main__':
# 初始化数组
num = Array('d', range(5))
new_pro1 = Process(target=process1, args=(num,))
new_pro1.start()
new_pro1.join()
new_pro2 = Process(target=process2, args=(num,))
new_pro2.start()
進程池
編輯進程池可以提供指定數量的進程給用戶使用。即當有新的請求提交到進程池中時,如果池未滿,則會創建一個新的進程用來執行該請求;反之,如果池中的進程數已經達到規定最大值,那麼該請求就會等待,只要池中有進程空閒下來,該請求就能得到執行。
進程池的話Python中有兩個方法可以實現,一個是 multiprocessing 模塊自帶的 Pool 類;還有一個是 concurrent.futures.ProcessPoolExecutor 進行實現,其中後者的API與線程池的API一模一樣。下面介紹前一種方式。
- Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) 該類一般初始化使用的是processes ,該參數是要使用的工作進程數目。如果 processes 為 None,則使用 os.cpu_count() 返回的值。
- apply(func[, args[, kwds]]) 使用阻塞的方式調用func,必須等待上⼀個進程執行完任務後才能執行下一個進程,了解即可,幾乎不用
- apply_async(func[, args[, kwds[, callback[, error_callback]]]]) 使用非阻塞的方式調用 func(任務並行執行),args 為傳遞給 func 的參數列表,kwds 為傳遞給 func 的關鍵字參數列表。
- terminate() 不管任務是否完成,立即終止
- close() 關閉Pool,使其不再接受新的任務。
- join()主進程阻塞,等待子進程的退出,必須在 close 或 terminate 之後使用。
下面創建一個容量為4的進程池,並讓10個進程都停留三秒輸出進程名,代碼如下:
import multiprocessing
from multiprocessing import Process, Pool
import time
def proc():
time.sleep(3)
print('{} Already Endding >> {}'.format(multiprocessing.current_process().name,
time.strftime('%H:%M:%S',time.localtime(time.time()))))
if __name__ == '__main__':
# 创建容量为4的进程池
pool = Pool(4)
for i in range(10):
pool.apply_async(proc)
pool.close()
# 阻塞主进程,等所有子进程运行完后再通过
pool.join()
print('{} Already Endding >> {}'.format(multiprocessing.current_process().name,
time.strftime('%H:%M:%S', time.localtime(time.time()))))