Python/Process
Python中的进程类使用的是 multiprocessing 模块,该模块的API大部分复制了 threading 模块的API。
每一个进程启动时都会最先产生一个线程,即主线程。主线程所在的进程为主进程。使用多进程的时候需要特别注意,必须要有 if __name__ == '__main__':, 该语句下的代码相当于主进程,没有该语句会报错。
multiprocessing 模块的API
编辑import multiprocessing
print('子进程的列表:{}'.format(multiprocessing.active_children()))
print('电脑的CPU数量:{}'.format(multiprocessing.cpu_count()))
print('现在运行的进程:{}'.format(multiprocessing.current_process()))
创建线程
编辑创建线程的方法有两种,一种是直接使用 multiprocessing 模块里面的类来进行创建,一种是继承 multiprocessing 模块的类写一个类来对线程进行创建。
直接创建线程
编辑直接从 multiprocessing.Process 继承创建一个新的子类,并实例化后调用 start() 方法启动新进程,即相当于它调用了进程的 run() 方法。
该方法的参数如下:
Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- target 指要创建的进程的方法名
- name 指给此进程命名,命名后可以调用 multiprocessing.current_process().name 方法输出该进程的名字
- args/kwargs 指 target 指向的方法需要传递的参数,必须是元组形式,如果只有一个参数,需要以添加逗号。
- daemon参数设置为True,则进程为守护进程。如果主进程结束了但该守护进程没有运行完,守护进程就会被强制结束。
创建一个新进程,每隔两秒对传入的数加2,而主进程每隔1秒对传入的数加1,代码示例如下:
import multiprocessing
from multiprocessing import Process
import time
def process1(num):
while True:
num += 2
print('{} is running >> {}'.format(multiprocessing.current_process().name, num))
time.sleep(2)
if __name__ == '__main__':
# 设置进程
new_pro = Process(target=process1, name='Add2', args=(100,))
# 进程开始
new_pro.start()
n = 0
while True:
n += 1
print('{} is running >> {}'.format(multiprocessing.current_process().name, n))
time.sleep(1)
继承创建
编辑继承Process定义进程类,重写run方法。
import multiprocessing
from multiprocessing import Process
import time
# 继承进程类
class MyProcess(Process):
def __init__(self, num):
super().__init__() # 必须调用父类的初始化方法
self.num = num
def run(self) -> None:
while True:
self.num += 2
print('{} is running >> {}'.format(multiprocessing.current_process().name, self.num))
time.sleep(2)
if __name__ == '__main__':
new_pro = MyProcess(100)
# 设置进程名字
new_pro.name = "Add2"
new_pro.start()
n = 0
while True:
n += 1
print('{} is running >> {}'.format(multiprocessing.current_process().name, n))
time.sleep(1)
Process的方法
编辑join方法会阻塞除了被join外的所有进程。参数为阻塞timeout秒数。如果参数为None,阻塞至被join的进程终止。
进程锁
编辑lock = multiprocessing.Lock()
进程通信
编辑进程之间不共享数据。如果进程之间需要通信,则要用到 Queue 模块或者 Pipe 模块或共享内存来实现。
Queue
编辑任何一个子进程都可以对Queue进行存(put)和取(get)。
- multiprocessing.Queue([maxsize])
- put(obj[, block[, timeout]]) 将 obj 放入队列。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出 queue.Full 异常。反之 (block 是 False 时),仅当有可用缓冲槽时才放入对象,否则抛出 queue.Full 异常 (在这种情形下 timeout 参数会被忽略)。
- get([block[, timeout]]) 从队列中取出并返回对象。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出 queue.Empty 异常。反之 (block 是 False 时),仅当有可用对象能够取出时返回,否则抛出 queue.Empty 异常 (在这种情形下 timeout 参数会被忽略)。
Pipe
编辑Pipe只提供两个端点,只允许两个子进程进行存(send)和取(recv)。Pipe实现了两个子进程之间的通信。
from multiprocessing import Pool
def main():
with (Pool() as pool):
pool.starmap(process_function, [param for param in range(25)])
Value
编辑Value数据共享类最多能够共享一个值,该函数的参数如下:
Value(typecode_or_type, args, lock=True)
上述方法中,参数 typecode_or_type 定义 ctypes() 对象的类型,可以传 Type code 或 C Type,具体对照表见下文。args 传递给 typecode_or_type 构造函数的参数,lock 默认为True,创建一个互斥锁来限制对Value对象的访问,如果传入一个锁,如Lock或RLock的实例,将用于同步。如果传入False,Value的实例就不会被锁保护,它将不是进程安全的。
Type code | C Type | Python Type | Minimum size in bytes |
---|---|---|---|
'b' | signed char | int | 1 |
'B' | unsigned char | int | 1 |
'u' | Py_UNICODE | Unicode character | 2 |
'h' | signed short | int | 2 |
'H' | unsigned short | int | 2 |
'i' | signed int | int | 2 |
'I' | unsigned int | int | 2 |
'l' | signed long | int | 4 |
'L' | unsigned long | int | 4 |
'q' | signed long long | int | 8 |
'Q' | unsigned long long | int | 8 |
'f' | float | float | 4 |
'd' | double | float | 8 |
例如下面我们在一个进程中传入值并对其进行改动,另一个进程输出传入的值,代码如下:
from multiprocessing import Process, Value
def process1(n):
n.value = 101
def process2(n):
print('改变后的参数 = {}'.format(n.value))
if __name__ == '__main__':
# 初始化value
num = Value('d', 0)
new_pro1 = Process(target=process1, args=(num,))
new_pro1.start()
new_pro1.join()
new_pro2 = Process(target=process2, args=(num,))
new_pro2.start()
Array
编辑上述的 Value 类只能传递一个参数,但是 Array 可以传递很多的参数,该方法参数如下:
Array(typecode_or_type, size_or_initializer, **kwds[, lock])
typecode_or_type 参数与上面的相同,同样参照上表,size_or_initializer 如果它是一个整数,那么它确定数组的长度,并且数组将被初始化为0。否则,size_or_initializer 是用于初始化数组的序列,其长度决定数组的长度。kwds 是传递给 typecode_or_type 构造函数的参数,lock 参数与上面的相同。
例如,我们传入一个0数组给一个进程,然后在另一个进程中计算其和,代码如下:
from multiprocessing import Process, Array
def process1(n):
n[3] = 0
n[4] = 0
def process2(n):
# 从队列中得到数据
print('数组的和为 = {}'.format(sum(n)))
if __name__ == '__main__':
# 初始化数组
num = Array('d', range(5))
new_pro1 = Process(target=process1, args=(num,))
new_pro1.start()
new_pro1.join()
new_pro2 = Process(target=process2, args=(num,))
new_pro2.start()
进程池
编辑进程池可以提供指定数量的进程给用户使用。即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行。
进程池的话Python中有两个方法可以实现,一个是 multiprocessing 模块自带的 Pool 类;还有一个是 concurrent.futures.ProcessPoolExecutor 进行实现,其中后者的API与线程池的API一模一样。下面介绍前一种方式。
- Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) 该类一般初始化使用的是processes ,该参数是要使用的工作进程数目。如果 processes 为 None,则使用 os.cpu_count() 返回的值。
- apply(func[, args[, kwds]]) 使用阻塞的方式调用func,必须等待上⼀个进程执行完任务后才能执行下一个进程,了解即可,几乎不用
- apply_async(func[, args[, kwds[, callback[, error_callback]]]]) 使用非阻塞的方式调用 func(任务并行执行),args 为传递给 func 的参数列表,kwds 为传递给 func 的关键字参数列表。
- terminate() 不管任务是否完成,立即终止
- close() 关闭Pool,使其不再接受新的任务。
- join()主进程阻塞,等待子进程的退出,必须在 close 或 terminate 之后使用。
下面创建一个容量为4的进程池,并让10个进程都停留三秒输出进程名,代码如下:
import multiprocessing
from multiprocessing import Process, Pool
import time
def proc():
time.sleep(3)
print('{} Already Endding >> {}'.format(multiprocessing.current_process().name,
time.strftime('%H:%M:%S',time.localtime(time.time()))))
if __name__ == '__main__':
# 创建容量为4的进程池
pool = Pool(4)
for i in range(10):
pool.apply_async(proc)
pool.close()
# 阻塞主进程,等所有子进程运行完后再通过
pool.join()
print('{} Already Endding >> {}'.format(multiprocessing.current_process().name,
time.strftime('%H:%M:%S', time.localtime(time.time()))))