Python 多进程解析:Multiprocessing 高效并行处理的奥秘
在 Python 编程中,多进程(Multiprocessing)是一种进步步伐实行服从的重要本领。本文深入解析了多进程的概念与应用,帮助开辟者充分使用多核处理器的盘算本领。我们从根本的进程创建与启动开始,解说了如何通过 Queue 实现进程间的数据传递,并通过对比多进程与多线程的性能差异,展现了多进程在处理 CPU 密集型任务时的明显优势。文章还详细先容了进程池(Pool)的使用方法,包括 map 和 apply_async 的不同应用场景。末了,我们探究了共享内存和进程锁的使用,确保多进程在并发操作中的数据安全性。本文为希望掌握多进程编程的读者提供了全面且易懂的实践指导。
一 多进程
Multiprocessing 是一种编程和实行模式,它允很多个进程同时运行,以此进步应用步伐的服从和性能。在 Python 中,multiprocessing 模块可以帮助你创建多个进程,使得每个进程都可以并行处理任务,从而有效使用多核处理器的本领。
1 导入进程标准模块
- import multiprocessing as mp
复制代码 2 界说调用函数
- def job(a, d):
- print('你好 世界')
复制代码 3 创建和启动进程
- # 创建进程
- p1 = mp.Process(target=job, args=(1, 2))
- # 启动进程
- p1.start()
- # 连接进程
- p1.join()
复制代码 二 存储进程结果 Queue
1 存入输出到 Queue
- # 该函数没有返回值!!!
- def job02(q):
- res = 0
- for i in range(1000):
- res += i + i ** 2 + i ** 3
- q.put(res) #
- def my_result_process02():
- q = mp.Queue()
- p1 = mp.Process(target=job02, args=(q,))
- p2 = mp.Process(target=job02, args=(q,))
- p1.start()
- p2.start()
- p1.join()
- p2.join()
- res1 = q.get()
- res2 = q.get()
- print(res1)
- print(res2)
- print(res1 + res2)
复制代码 三 threading & multiprocessing 对比
1 创建多进程 multiprocessing
- def job03(q):
- res = 0
- for i in range(1000000):
- res += i + i ** 2 + i ** 3
- # 结果加 queue
- q.put(res)
- # 多核运算多进程
- def multicore03():
- q = mp.Queue()
- p1 = mp.Process(target=job03, args=(q,))
- p2 = mp.Process(target=job03, args=(q,))
- p1.start()
- p2.start()
- p1.join()
- p2.join()
- res1 = q.get()
- res2 = q.get()
- print('multicore:', res1 + res2)
复制代码 2 创建多线程 multithread
- # 单核运算多线程
- def multithread03():
- # thread可放入process同样的queue中
- q = mp.Queue()
- t1 = td.Thread(target=job03, args=(q,))
- t2 = td.Thread(target=job03, args=(q,))
- t1.start()
- t2.start()
- t1.join()
- t2.join()
- res1 = q.get()
- res2 = q.get()
- print('multithread:', res1 + res2)
复制代码 3 创建普通函数
- def normal03():
- res = 0
- for _ in range(2):
- for i in range(1000000):
- res += i + i ** 2 + i ** 3
- print('normal:', res)
复制代码 4 创建对比时间函数
- def time_result03():
- st = time.time()
- normal03()
- st1 = time.time()
- print('normal time:', st1 - st)
- multithread03()
- st2 = time.time()
- print('multithread time:', st2 - st1)
- multicore03()
- print('multicore time:', time.time() - st2)
复制代码 5 运行结果
- normal03: 499999666667166666000000
- normal03 time: 0.6855959892272949
- multithread03: 499999666667166666000000
- multithread03 time: 0.6804449558258057
- multicore03: 499999666667166666000000
- multicore03 time: 0.38849496841430664
复制代码 我运行的是 normal03 > multithread03 > multicore03,normal03 和 multithread03 相差不大,multicore03 比 normal03 和 multithread03 快将近一倍。
四 进程池 Pool
使用进程池 Pool ,Python 会自行解决多进程问题。
1 进程池 Pool() 和 map()
map() 返回的是多结果。
- def job04(x):
- # Pool的函数有返回值
- return x * x
- def multicore04():
- # Pool的函数有返回值
- pool = mp.Pool()
- # 自分配 CPU 计算
- res = pool.map(job04, range(10))
- print(res)
复制代码 2 自界说核数量
Pool 默认大小是 CPU的核数,传入 processes 参数自界说须要的核数量。
- def multicore05():
- # 定义CPU核数量为3
- pool = mp.Pool(processes=3)
- res = pool.map(job04, range(10))
- print(res)
复制代码 3 apply_async 单结果返回
apply_async() 中只能传递一个值,它只会放入一个核举行运算,但是传入值时要注意是可迭代的, 所以在传入值后须要加逗号, 同时须要用 get() 方法获取返回值。
- def multicore06():
- pool = mp.Pool()
- res = pool.apply_async(job04, (2,))
- # 用get获得结果
- print(res.get())
复制代码 4 apply_async 多结果返回
- def multicore07():
- pool = mp.Pool()
- multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]
- # 用get获得结果
- print([res.get() for res in multi_res])
复制代码 5 划重点
- Pool 默认调用是 CPU 的核数,传入 processes 参数可自界说CPU核数。
- map() 放入迭代参数,返回多个结果。
- apply_async() 只能放入一组参数,并返回一个结果,如果想得到 map() 的效果须要通过迭代。
五 共享内存 shared memory
1 界说 Shared Value
- value1 = mp.Value('i', 0)
- value2 = mp.Value('d', 3.14)
复制代码 2 界说 Shared Array
它只能是一维数组
- array = mp.Array('i', [1, 2, 3, 4])
复制代码 其中 d 和 i 参数用来设置数据类型的,d 体现一个双精浮点类型,i 体现一个带符号的整型,参考数据类型如下:
Type codeC TypePython TypeMinimum size in bytesNotes'b'signed charint1'B'unsigned charint1'u'wchar_tUnicode character2(1)'h'signed shortint2'H'unsigned shortint2'i'signed intint2'I'unsigned intint2'l'signed longint4'L'unsigned longint4'q'signed long longint8'Q'unsigned long longint8'f'floatfloat4'd'doublefloat8 具体链接:Efficient arrays of numeric values
六 进程锁 Lock
1 不加进程锁
争抢共享内存
- def job08(v, num):
- for _ in range(5):
- time.sleep(0.1) # 暂停0.1秒,让输出效果更明显
- v.value += num # v.value获取共享变量值
- print(v.value, end="\n")
- def multicore08():
- v = mp.Value('i', 0) # 定义共享变量
- p1 = mp.Process(target=job08, args=(v, 1))
- p2 = mp.Process(target=job08, args=(v, 3)) # 设定不同的number看如何抢夺内存
- p1.start()
- p2.start()
- p1.join()
- p2.join()
复制代码 2 加进程锁
- def job09(v, num, l):
- l.acquire() # 锁住
- for _ in range(5):
- # print(v.value, num)
- time.sleep(0.1)
- v.value = v.value + num # 获取共享内存
- print(v.value)
- l.release() # 释放
- def multicore09():
- l = mp.Lock() # 定义一个进程锁
- v = mp.Value('i', 0) # 定义共享内存
-
- p1 = mp.Process(target=job09, args=(v, 1, l)) # 需要将lock传入
- p1.start()
- p1.join()
- p2 = mp.Process(target=job09, args=(v, 3, l))
- p2.start()
- p2.join()
- # def multicore10():
- # l = mp.Lock() # 定义一个进程锁
- # v = mp.Value('i', 0) # 定义共享内存
- # p1 = mp.Process(target=job09, args=(v, 1, l)) # 需要将lock传入
- # p2 = mp.Process(target=job09, args=(v, 3, l))
- # p1.start()
- # p2.start()
- # p1.join()
- # p2.join()
复制代码 在这个示例中,必须先实行 p1 以到达预期效果。分别运行 multicore09 和 multicore10 会发现一些有意思的情况。
七 完备代码示例
注:建议在运行 main.py 对应的代码功能时,逐利用用注释举行操作。
- # This is a sample Python script.# Press ⌃R to execute it or replace it with your code.# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.import multiprocessing as mp
- import threading as tdimport time as timedef print_hi(name): # Use a breakpoint in the code line below to debug your script. print(f'Hi, {name}') # Press ⌘F8 to toggle the breakpoint. # 创建进程 p1 = mp.Process(target=job, args=(1, 2)) # 启动进程 p1.start() # Shared Value value1 = mp.Value('i', 0) value2 = mp.Value('d', 3.14) # Shared Array,只能是一维数组 array = mp.Array('i', [1, 2, 3, 4])
- def job(a, d):
- print('你好 世界')
- # 该函数没有返回值!!!
- def job02(q):
- res = 0
- for i in range(1000):
- res += i + i ** 2 + i ** 3
- q.put(res) #
- def my_result_process02():
- q = mp.Queue()
- p1 = mp.Process(target=job02, args=(q,))
- p2 = mp.Process(target=job02, args=(q,))
- p1.start()
- p2.start()
- p1.join()
- p2.join()
- res1 = q.get()
- res2 = q.get()
- print(res1)
- print(res2)
- print(res1 + res2)
- def job03(q): res = 0 for i in range(1000000): res += i + i ** 2 + i ** 3 # 结果加 queue q.put(res)# 多核运算多进程def multicore03(): q = mp.Queue() p1 = mp.Process(target=job03, args=(q,)) p2 = mp.Process(target=job03, args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print('multicore03:', res1 + res2)# 单核运算多线程def multithread03(): # thread可放入process同样的queue中 q = mp.Queue() t1 = td.Thread(target=job03, args=(q,)) t2 = td.Thread(target=job03, args=(q,)) t1.start() t2.start() t1.join() t2.join() res1 = q.get() res2 = q.get() print('multithread03:', res1 + res2)def normal03(): res = 0 for _ in range(2): for i in range(1000000): res += i + i ** 2 + i ** 3 print('normal03:', res)def time_result03(): st = time.time() normal03() st1 = time.time() print('normal03 time:', st1 - st) multithread03() st2 = time.time() print('multithread03 time:', st2 - st1) multicore03() print('multicore03 time:', time.time() - st2)def job04(x):
- # Pool的函数有返回值
- return x * x
- def multicore04():
- # Pool的函数有返回值
- pool = mp.Pool()
- # 自分配 CPU 计算
- res = pool.map(job04, range(10))
- print(res)
- def multicore05(): pool = mp.Pool(processes=3) # 界说CPU核数量为3 res = pool.map(job04, range(10)) print(res)def multicore06(): pool = mp.Pool() # apply_async() 中只能传递一个值,它只会放入一个核举行运算,但是传入值时要注意是可迭代的, # 所以在传入值后须要加逗号, 同时须要用get()方法获取返回值 res = pool.apply_async(job04, (2,)) # 用get得到结果 print(res.get())def multicore07():
- pool = mp.Pool()
- multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]
- # 用get获得结果
- print([res.get() for res in multi_res])
- def job08(v, num):
- for _ in range(5):
- time.sleep(0.1) # 暂停0.1秒,让输出效果更明显
- v.value += num # v.value获取共享变量值
- print(v.value, end="\n")
- def multicore08():
- v = mp.Value('i', 0) # 定义共享变量
- p1 = mp.Process(target=job08, args=(v, 1))
- p2 = mp.Process(target=job08, args=(v, 3)) # 设定不同的number看如何抢夺内存
- p1.start()
- p2.start()
- p1.join()
- p2.join()
- def job09(v, num, l): l.acquire() # 锁住 for _ in range(5): # print(v.value, num) time.sleep(0.1) v.value = v.value + num # 获取共享内存 print(v.value) l.release() # 释放def multicore09(): l = mp.Lock() # 界说一个进程锁 v = mp.Value('i', 0) # 界说共享内存 p1 = mp.Process(target=job09, args=(v, 1, l)) # 须要将lock传入 p1.start() p1.join() p2 = mp.Process(target=job09, args=(v, 3, l)) p2.start() p2.join()def multicore10(): l = mp.Lock() # 界说一个进程锁 v = mp.Value('i', 0) # 界说共享内存 p1 = mp.Process(target=job09, args=(v, 1, l)) # 须要将lock传入 p2 = mp.Process(target=job09, args=(v, 3, l)) p1.start() p2.start() p1.join() p2.join()# Press the green button in the gutter to run the script.if __name__ == '__main__': print_hi('什么是 Multiprocessing') my_result_process02() time_result03() multicore04() multicore05() multicore06() multicore07() multicore08() multicore09() # multicore10()# See PyCharm help at https://www.jetbrains.com/help/pycharm/
复制代码 复制粘贴并覆盖到你的 main.py 中运行,运行结果如下。
- Hi, 什么是 Multiprocessing
- 你好 世界
- 249833583000
- 249833583000
- 499667166000
- normal03: 499999666667166666000000
- normal03 time: 0.7139420509338379
- multithread03: 499999666667166666000000
- multithread03 time: 0.6696178913116455
- multicore03: 499999666667166666000000
- multicore03 time: 0.3917398452758789
- [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
- [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
- 4
- [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
- 3
- 4
- 7
- 8
- 11
- 12
- 1515
- 16
- 19
- 1
- 2
- 3
- 4
- 5
- 8
- 11
- 14
- 17
- 20
复制代码 八 源码地址
代码地址:
国内看 Gitee 之 什么是 Multiprocessing.py
国外看 GitHub 之 什么是 Multiprocessing.py
引用 莫烦 Python
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |