Python 多进程解析:Multiprocessing 高效并行处理的奥秘

打印 上一主题 下一主题

主题 894|帖子 894|积分 2682

Python 多进程解析:Multiprocessing 高效并行处理的奥秘


  
在 Python 编程中,多进程(Multiprocessing)是一种进步步伐实行服从的重要本领。本文深入解析了多进程的概念与应用,帮助开辟者充分使用多核处理器的盘算本领。我们从根本的进程创建与启动开始,解说了如何通过 Queue 实现进程间的数据传递,并通过对比多进程与多线程的性能差异,展现了多进程在处理 CPU 密集型任务时的明显优势。文章还详细先容了进程池(Pool)的使用方法,包括 map 和 apply_async 的不同应用场景。末了,我们探究了共享内存和进程锁的使用,确保多进程在并发操作中的数据安全性。本文为希望掌握多进程编程的读者提供了全面且易懂的实践指导。
一 多进程

Multiprocessing 是一种编程和实行模式,它允很多个进程同时运行,以此进步应用步伐的服从和性能。在 Python 中,multiprocessing 模块可以帮助你创建多个进程,使得每个进程都可以并行处理任务,从而有效使用多核处理器的本领。
1 导入进程标准模块

  1. import multiprocessing as mp
复制代码
2 界说调用函数

  1. def job(a, d):
  2.     print('你好 世界')
复制代码
3 创建和启动进程

  1. # 创建进程
  2. p1 = mp.Process(target=job, args=(1, 2))
  3. # 启动进程
  4. p1.start()
  5. # 连接进程
  6. p1.join()
复制代码
二 存储进程结果 Queue

1 存入输出到 Queue
  1. # 该函数没有返回值!!!
  2. def job02(q):
  3.     res = 0
  4.     for i in range(1000):
  5.         res += i + i ** 2 + i ** 3
  6.     q.put(res)  #
  7. def my_result_process02():
  8.     q = mp.Queue()
  9.     p1 = mp.Process(target=job02, args=(q,))
  10.     p2 = mp.Process(target=job02, args=(q,))
  11.     p1.start()
  12.     p2.start()
  13.     p1.join()
  14.     p2.join()
  15.     res1 = q.get()
  16.     res2 = q.get()
  17.     print(res1)
  18.     print(res2)
  19.     print(res1 + res2)
复制代码
三 threading & multiprocessing 对比

1 创建多进程 multiprocessing

  1. def job03(q):
  2.     res = 0
  3.     for i in range(1000000):
  4.         res += i + i ** 2 + i ** 3
  5.     # 结果加 queue
  6.     q.put(res)
  7. # 多核运算多进程
  8. def multicore03():
  9.     q = mp.Queue()
  10.     p1 = mp.Process(target=job03, args=(q,))
  11.     p2 = mp.Process(target=job03, args=(q,))
  12.     p1.start()
  13.     p2.start()
  14.     p1.join()
  15.     p2.join()
  16.     res1 = q.get()
  17.     res2 = q.get()
  18.     print('multicore:', res1 + res2)
复制代码
2 创建多线程 multithread

  1. # 单核运算多线程
  2. def multithread03():
  3.     # thread可放入process同样的queue中
  4.     q = mp.Queue()
  5.     t1 = td.Thread(target=job03, args=(q,))
  6.     t2 = td.Thread(target=job03, args=(q,))
  7.     t1.start()
  8.     t2.start()
  9.     t1.join()
  10.     t2.join()
  11.     res1 = q.get()
  12.     res2 = q.get()
  13.     print('multithread:', res1 + res2)
复制代码
3 创建普通函数

  1. def normal03():
  2.     res = 0
  3.     for _ in range(2):
  4.         for i in range(1000000):
  5.             res += i + i ** 2 + i ** 3
  6.     print('normal:', res)
复制代码
4 创建对比时间函数

  1. def time_result03():
  2.     st = time.time()
  3.     normal03()
  4.     st1 = time.time()
  5.     print('normal time:', st1 - st)
  6.     multithread03()
  7.     st2 = time.time()
  8.     print('multithread time:', st2 - st1)
  9.     multicore03()
  10.     print('multicore time:', time.time() - st2)
复制代码
5 运行结果

  1. normal03: 499999666667166666000000
  2. normal03 time: 0.6855959892272949
  3. multithread03: 499999666667166666000000
  4. multithread03 time: 0.6804449558258057
  5. multicore03: 499999666667166666000000
  6. multicore03 time: 0.38849496841430664
复制代码
我运行的是 normal03 > multithread03 > multicore03,normal03 和 multithread03 相差不大,multicore03 比 normal03 和 multithread03 快将近一倍。
四 进程池 Pool

使用进程池 Pool ,Python 会自行解决多进程问题。
1 进程池 Pool() 和 map()

map()  返回的是多结果。
  1. def job04(x):
  2.     # Pool的函数有返回值
  3.     return x * x
  4. def multicore04():
  5.     # Pool的函数有返回值
  6.     pool = mp.Pool()
  7.     # 自分配 CPU 计算
  8.     res = pool.map(job04, range(10))
  9.     print(res)
复制代码
2 自界说核数量

Pool 默认大小是 CPU的核数,传入 processes 参数自界说须要的核数量。
  1. def multicore05():
  2.           # 定义CPU核数量为3
  3.     pool = mp.Pool(processes=3)  
  4.     res = pool.map(job04, range(10))
  5.     print(res)
复制代码
3 apply_async 单结果返回

apply_async() 中只能传递一个值,它只会放入一个核举行运算,但是传入值时要注意是可迭代的, 所以在传入值后须要加逗号, 同时须要用 get() 方法获取返回值。
  1. def multicore06():
  2.     pool = mp.Pool()
  3.     res = pool.apply_async(job04, (2,))
  4.     # 用get获得结果
  5.     print(res.get())
复制代码
4 apply_async 多结果返回

  1. def multicore07():
  2.     pool = mp.Pool()
  3.     multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]
  4.     # 用get获得结果
  5.     print([res.get() for res in multi_res])
复制代码
5 划重点



  • Pool 默认调用是 CPU 的核数,传入 processes 参数可自界说CPU核数。
  • map() 放入迭代参数,返回多个结果。
  • apply_async() 只能放入一组参数,并返回一个结果,如果想得到 map() 的效果须要通过迭代。
五 共享内存 shared memory

1 界说 Shared Value
  1. value1 = mp.Value('i', 0)
  2. value2 = mp.Value('d', 3.14)
复制代码
2 界说 Shared Array
它只能是一维数组
  1. array = mp.Array('i', [1, 2, 3, 4])
复制代码
其中 d 和 i 参数用来设置数据类型的,d 体现一个双精浮点类型,i 体现一个带符号的整型,参考数据类型如下:
Type codeC TypePython TypeMinimum size in bytesNotes'b'signed charint1'B'unsigned charint1'u'wchar_tUnicode character2(1)'h'signed shortint2'H'unsigned shortint2'i'signed intint2'I'unsigned intint2'l'signed longint4'L'unsigned longint4'q'signed long longint8'Q'unsigned long longint8'f'floatfloat4'd'doublefloat8 具体链接:Efficient arrays of numeric values
六 进程锁 Lock

1 不加进程锁

争抢共享内存
  1. def job08(v, num):
  2.     for _ in range(5):
  3.         time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
  4.         v.value += num  # v.value获取共享变量值
  5.         print(v.value, end="\n")
  6. def multicore08():
  7.     v = mp.Value('i', 0)  # 定义共享变量
  8.     p1 = mp.Process(target=job08, args=(v, 1))
  9.     p2 = mp.Process(target=job08, args=(v, 3))  # 设定不同的number看如何抢夺内存
  10.     p1.start()
  11.     p2.start()
  12.     p1.join()
  13.     p2.join()
复制代码
2 加进程锁

  1. def job09(v, num, l):
  2.     l.acquire()  # 锁住
  3.     for _ in range(5):
  4.         # print(v.value, num)
  5.         time.sleep(0.1)
  6.         v.value = v.value + num  # 获取共享内存
  7.         print(v.value)
  8.     l.release()  # 释放
  9. def multicore09():
  10.     l = mp.Lock()  # 定义一个进程锁
  11.     v = mp.Value('i', 0)  # 定义共享内存
  12.    
  13.     p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入
  14.     p1.start()
  15.     p1.join()
  16.     p2 = mp.Process(target=job09, args=(v, 3, l))
  17.     p2.start()
  18.     p2.join()
  19. # def multicore10():
  20. #     l = mp.Lock()  # 定义一个进程锁
  21. #     v = mp.Value('i', 0)  # 定义共享内存
  22. #     p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入
  23. #     p2 = mp.Process(target=job09, args=(v, 3, l))
  24. #     p1.start()
  25. #     p2.start()
  26. #     p1.join()
  27. #     p2.join()
复制代码
在这个示例中,必须先实行 p1 以到达预期效果。分别运行 multicore09 和 multicore10 会发现一些有意思的情况。
七 完备代码示例

:建议在运行 main.py 对应的代码功能时,逐利用用注释举行操作。
  1. # This is a sample Python script.# Press ⌃R to execute it or replace it with your code.# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.import multiprocessing as mp
  2. import threading as tdimport time as timedef print_hi(name):    # Use a breakpoint in the code line below to debug your script.    print(f'Hi, {name}')  # Press ⌘F8 to toggle the breakpoint.    # 创建进程    p1 = mp.Process(target=job, args=(1, 2))    # 启动进程    p1.start()    # Shared Value    value1 = mp.Value('i', 0)    value2 = mp.Value('d', 3.14)    # Shared Array,只能是一维数组    array = mp.Array('i', [1, 2, 3, 4])
  3. def job(a, d):
  4.     print('你好 世界')
  5. # 该函数没有返回值!!!
  6. def job02(q):
  7.     res = 0
  8.     for i in range(1000):
  9.         res += i + i ** 2 + i ** 3
  10.     q.put(res)  #
  11. def my_result_process02():
  12.     q = mp.Queue()
  13.     p1 = mp.Process(target=job02, args=(q,))
  14.     p2 = mp.Process(target=job02, args=(q,))
  15.     p1.start()
  16.     p2.start()
  17.     p1.join()
  18.     p2.join()
  19.     res1 = q.get()
  20.     res2 = q.get()
  21.     print(res1)
  22.     print(res2)
  23.     print(res1 + res2)
  24. def job03(q):    res = 0    for i in range(1000000):        res += i + i ** 2 + i ** 3    # 结果加 queue    q.put(res)# 多核运算多进程def multicore03():    q = mp.Queue()    p1 = mp.Process(target=job03, args=(q,))    p2 = mp.Process(target=job03, args=(q,))    p1.start()    p2.start()    p1.join()    p2.join()    res1 = q.get()    res2 = q.get()    print('multicore03:', res1 + res2)# 单核运算多线程def multithread03():    # thread可放入process同样的queue中    q = mp.Queue()    t1 = td.Thread(target=job03, args=(q,))    t2 = td.Thread(target=job03, args=(q,))    t1.start()    t2.start()    t1.join()    t2.join()    res1 = q.get()    res2 = q.get()    print('multithread03:', res1 + res2)def normal03():    res = 0    for _ in range(2):        for i in range(1000000):            res += i + i ** 2 + i ** 3    print('normal03:', res)def time_result03():    st = time.time()    normal03()    st1 = time.time()    print('normal03 time:', st1 - st)    multithread03()    st2 = time.time()    print('multithread03 time:', st2 - st1)    multicore03()    print('multicore03 time:', time.time() - st2)def job04(x):
  25.     # Pool的函数有返回值
  26.     return x * x
  27. def multicore04():
  28.     # Pool的函数有返回值
  29.     pool = mp.Pool()
  30.     # 自分配 CPU 计算
  31.     res = pool.map(job04, range(10))
  32.     print(res)
  33. def multicore05():    pool = mp.Pool(processes=3)  # 界说CPU核数量为3    res = pool.map(job04, range(10))    print(res)def multicore06():    pool = mp.Pool()    # apply_async() 中只能传递一个值,它只会放入一个核举行运算,但是传入值时要注意是可迭代的,    # 所以在传入值后须要加逗号, 同时须要用get()方法获取返回值    res = pool.apply_async(job04, (2,))    # 用get得到结果    print(res.get())def multicore07():
  34.     pool = mp.Pool()
  35.     multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]
  36.     # 用get获得结果
  37.     print([res.get() for res in multi_res])
  38. def job08(v, num):
  39.     for _ in range(5):
  40.         time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
  41.         v.value += num  # v.value获取共享变量值
  42.         print(v.value, end="\n")
  43. def multicore08():
  44.     v = mp.Value('i', 0)  # 定义共享变量
  45.     p1 = mp.Process(target=job08, args=(v, 1))
  46.     p2 = mp.Process(target=job08, args=(v, 3))  # 设定不同的number看如何抢夺内存
  47.     p1.start()
  48.     p2.start()
  49.     p1.join()
  50.     p2.join()
  51. def job09(v, num, l):    l.acquire()  # 锁住    for _ in range(5):        # print(v.value, num)        time.sleep(0.1)        v.value = v.value + num  # 获取共享内存        print(v.value)    l.release()  # 释放def multicore09():    l = mp.Lock()  # 界说一个进程锁    v = mp.Value('i', 0)  # 界说共享内存    p1 = mp.Process(target=job09, args=(v, 1, l))  # 须要将lock传入    p1.start()    p1.join()    p2 = mp.Process(target=job09, args=(v, 3, l))    p2.start()    p2.join()def multicore10():    l = mp.Lock()  # 界说一个进程锁    v = mp.Value('i', 0)  # 界说共享内存    p1 = mp.Process(target=job09, args=(v, 1, l))  # 须要将lock传入    p2 = mp.Process(target=job09, args=(v, 3, l))    p1.start()    p2.start()    p1.join()    p2.join()# Press the green button in the gutter to run the script.if __name__ == '__main__':    print_hi('什么是 Multiprocessing')    my_result_process02()    time_result03()    multicore04()    multicore05()    multicore06()    multicore07()    multicore08()    multicore09()    # multicore10()# See PyCharm help at https://www.jetbrains.com/help/pycharm/
复制代码
复制粘贴并覆盖到你的 main.py 中运行,运行结果如下。
  1. Hi, 什么是 Multiprocessing
  2. 你好 世界
  3. 249833583000
  4. 249833583000
  5. 499667166000
  6. normal03: 499999666667166666000000
  7. normal03 time: 0.7139420509338379
  8. multithread03: 499999666667166666000000
  9. multithread03 time: 0.6696178913116455
  10. multicore03: 499999666667166666000000
  11. multicore03 time: 0.3917398452758789
  12. [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  13. [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  14. 4
  15. [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  16. 3
  17. 4
  18. 7
  19. 8
  20. 11
  21. 12
  22. 1515
  23. 16
  24. 19
  25. 1
  26. 2
  27. 3
  28. 4
  29. 5
  30. 8
  31. 11
  32. 14
  33. 17
  34. 20
复制代码
八 源码地址

代码地址:
国内看 Gitee 之 什么是 Multiprocessing.py
国外看 GitHub 之 什么是 Multiprocessing.py
引用 莫烦 Python

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

乌市泽哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表