一文速通Python并行计算:09 Python多进程编程-进程之间的数据同步-基于互 ...

打印 上一主题 下一主题

主题 1877|帖子 1877|积分 5631

一文速通 Python 并行计算:09 Python 多进程编程-进程之间的数据同步-基于互斥锁、递归锁、信号量、条件变量、事故和屏蔽


摘要:

多进程同步机制包括互斥锁、递归锁、信号量、条件变量、事故和屏蔽等:互斥锁用于掩护共享资源,递归锁支持同一进程重复加锁,信号量可控制访问资源的数量,条件变量用于进程间等待特定条件,事故用于进程间通信和同步,屏蔽用于多个进程在特定点同步,确保协同运行。

关于我们更多介绍可以查看云文档:Freak 嵌入式工作室云文档,大概访问我们的 wiki:****https://github.com/leezisheng/Doc/wik
原文链接:

FreakStudio的博客
往期保举:

学嵌入式的你,还不碰面向对象??!
全网最适合入门的面向对象编程教程:00 面向对象计划方法导论
全网最适合入门的面向对象编程教程:01 面向对象编程的根本概念
全网最适合入门的面向对象编程教程:02 类和对象的 Python 实现-利用 Python 创建类
全网最适合入门的面向对象编程教程:03 类和对象的 Python 实现-为自定义类添加属性
全网最适合入门的面向对象编程教程:04 类和对象的Python实现-为自定义类添加方法
全网最适合入门的面向对象编程教程:05 类和对象的Python实现-PyCharm代码标签
全网最适合入门的面向对象编程教程:06 类和对象的Python实现-自定义类的数据封装
全网最适合入门的面向对象编程教程:07 类和对象的Python实现-类型注解
全网最适合入门的面向对象编程教程:08 类和对象的Python实现-@property装饰器
全网最适合入门的面向对象编程教程:09 类和对象的Python实现-类之间的关系
全网最适合入门的面向对象编程教程:10 类和对象的Python实现-类的继承和里氏更换原则
全网最适合入门的面向对象编程教程:11 类和对象的Python实现-子类调用父类方法
全网最适合入门的面向对象编程教程:12 类和对象的Python实现-Python利用logging模块输出程序运行日记
全网最适合入门的面向对象编程教程:13 类和对象的Python实现-可视化阅读代码神器Sourcetrail的安装利用
全网最适合入门的面向对象编程教程:全网最适合入门的面向对象编程教程:14 类和对象的Python实现-类的静态方法和类方法
全网最适合入门的面向对象编程教程:15 类和对象的 Python 实现-__slots__魔法方法
全网最适合入门的面向对象编程教程:16 类和对象的Python实现-多态、方法重写与开闭原则
全网最适合入门的面向对象编程教程:17 类和对象的Python实现-鸭子类型与“file-like object“
全网最适合入门的面向对象编程教程:18 类和对象的Python实现-多重继承与PyQtGraph串口数据绘制曲线图
全网最适合入门的面向对象编程教程:19 类和对象的 Python 实现-利用 PyCharm 主动生成文件注释和函数注释
全网最适合入门的面向对象编程教程:20 类和对象的Python实现-组合关系的实现与CSV文件生存
全网最适合入门的面向对象编程教程:21 类和对象的Python实现-多文件的组织:模块module和包package
全网最适合入门的面向对象编程教程:22 类和对象的Python实现-异常和语法错误
全网最适合入门的面向对象编程教程:23 类和对象的Python实现-抛出异常
全网最适合入门的面向对象编程教程:24 类和对象的Python实现-异常的捕获与处置惩罚
全网最适合入门的面向对象编程教程:25 类和对象的Python实现-Python判定输入数据类型
全网最适合入门的面向对象编程教程:26 类和对象的Python实现-上下文管理器和with语句
全网最适合入门的面向对象编程教程:27 类和对象的Python实现-Python中异常层级与自定义异常类的实现
全网最适合入门的面向对象编程教程:28 类和对象的Python实现-Python编程原则、哲学和规范大汇总
全网最适合入门的面向对象编程教程:29 类和对象的Python实现-断言与防御性编程和help函数的利用
全网最适合入门的面向对象编程教程:30 Python的内置数据类型-object根类
全网最适合入门的面向对象编程教程:31 Python的内置数据类型-对象Object和类型Type
全网最适合入门的面向对象编程教程:32 Python的内置数据类型-类Class和实例Instance
全网最适合入门的面向对象编程教程:33 Python的内置数据类型-对象Object和类型Type的关系
全网最适合入门的面向对象编程教程:34 Python的内置数据类型-Python常用复合数据类型:元组和定名元组
全网最适合入门的面向对象编程教程:35 Python的内置数据类型-文档字符串和__doc__属性
全网最适合入门的面向对象编程教程:36 Python的内置数据类型-字典
全网最适合入门的面向对象编程教程:37 Python常用复合数据类型-列表和列表推导式
全网最适合入门的面向对象编程教程:38 Python常用复合数据类型-利用列表实现堆栈、队列和双端队列
全网最适合入门的面向对象编程教程:39 Python常用复合数据类型-集合
全网最适合入门的面向对象编程教程:40 Python常用复合数据类型-罗列和enum模块的利用
全网最适合入门的面向对象编程教程:41 Python常用复合数据类型-队列(FIFO、LIFO、优先级队列、双端队列和环形队列)
全网最适合入门的面向对象编程教程:42 Python常用复合数据类型-collections容器数据类型
全网最适合入门的面向对象编程教程:43 Python常用复合数据类型-扩展内置数据类型
全网最适合入门的面向对象编程教程:44 Python内置函数与魔法方法-重写内置类型的魔法方法
全网最适合入门的面向对象编程教程:45 Python实现常见数据结构-链表、树、哈希表、图和堆
全网最适合入门的面向对象编程教程:46 Python函数方法与接口-函数与事故驱动框架
全网最适合入门的面向对象编程教程:47 Python函数方法与接口-回调函数Callback
全网最适合入门的面向对象编程教程:48 Python函数方法与接口-位置参数、默认参数、可变参数和关键字参数
全网最适合入门的面向对象编程教程:49 Python函数方法与接口-函数与方法的区别和lamda匿名函数
全网最适合入门的面向对象编程教程:50 Python函数方法与接口-接口和抽象基类
全网最适合入门的面向对象编程教程:51 Python函数方法与接口-利用Zope实现接口
全网最适合入门的面向对象编程教程:52 Python函数方法与接口-Protocol协议与接口
全网最适合入门的面向对象编程教程:53 Python字符串与序列化-字符串与字符编码
全网最适合入门的面向对象编程教程:54 Python字符串与序列化-字符串格式化与format方法
全网最适合入门的面向对象编程教程:55 Python字符串与序列化-字节序列类型和可变字节字符串
全网最适合入门的面向对象编程教程:56 Python字符串与序列化-正则表达式和re模块应用
全网最适合入门的面向对象编程教程:57 Python字符串与序列化-序列化与反序列化
全网最适合入门的面向对象编程教程:58 Python字符串与序列化-序列化Web对象的定义与实现
全网最适合入门的面向对象编程教程:59 Python并行与并发-并行与并发和线程与进程
一文速通Python并行计算:00 并行计算的根本概念
一文速通Python并行计算:01 Python多线程编程-根本概念、切换流程、GIL锁机制和生产者与消费者模型
一文速通Python并行计算:02 Python多线程编程-threading模块、线程的创建和查询与保卫线程
一文速通Python并行计算:03 Python多线程编程-多线程同步(上)—基于互斥锁、递归锁和信号量
一文速通Python并行计算:04 Python多线程编程-多线程同步(下)—基于条件变量、事故和屏蔽
一文速通Python并行计算:05 Python多线程编程-线程的定时运行
一文速通Python并行计算:06 Python多线程编程-基于队列进行通信
一文速通Python并行计算:07 Python多线程编程-线程池的利用和多线程的性能评估
更多出色内容可看:

给你的 Python 加加速:一文速通 Python 并行计算
一文搞懂 CM3 单片机调试原理
肝了半个月,嵌入式技能栈大汇总出炉
电子计算机类比赛的“武林秘籍”
一个MicroPython的开源项目集锦:awesome-micropython,包含各个方面的Micropython工具库
Avnet ZUBoard 1CG开辟板—深度学习新选择
工程师不要迷信开源代码,还要注重根本功
什么?配色个性化的电机驱动模块?!!
什么?XIAO主控新出三款扩展板!
手把手教你实现Arduino发布第三方库
万字长文手把手教你实现MicroPython/Python发布第三方库
一文速通电子计划大赛,电子人必看的获奖秘籍
一文速通光电计划大赛,电子人必看!
工科比赛“无脑”操作指南:知识学习硬件选购→代码调试→陈诉撰写的保姆级路线图
单场会议拍摄收费6000+?拍摄本领和步骤都在这里
0基础如何冲击大唐杯国奖?学姐的的备赛心得都在这里
文档获取:

可访问如下链接进行对文档下载:
https://github.com/leezisheng/Doc
该文档是一份关于 并行计算Python 并发编程 的学习指南,内容涵盖了并行计算的根本概念、Python 多线程编程、多进程编程以及协程编程的焦点知识点:

正文

进程之间数据除了共享内存、共享进程外不共享,但是共享同一套文件体系,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,需要进程之间进行同步。
进程的同步原语包括:Lock、Event、Condition、Semaphore、Rlock、Barrier。相关的同步原语和线程的库很雷同。
如下代码是多个进程差别步共享同一打印终端,采取并发运行,服从高,但竞争同一打印终端,带来了打印错乱。
  1. from multiprocessing import Process
  2. import os,time
  3. def work():
  4.     print('%s is running' %os.getpid())
  5.     time.sleep(2)
  6.     print('%s is done' %os.getpid())
  7. if __name__ == '__main__':
  8.     for i in range(3):
  9.         p=Process(target=work)
  10.         p.start()
复制代码
以下为运行结果,可以看到并应该是三个进程轮流输出的两条语句变为乱序输出。

1.基于互斥锁的进程数据同步

如下 work 函数中利用 acquire() 和 release() ,来控制共享数据的读写权限:
  1. from multiprocessing import Process,Lock
  2. import os,time
  3. def work(lock):
  4.     lock.acquire()
  5.     print('%s is running' %os.getpid())
  6.     time.sleep(2)
  7.     print('%s is done' %os.getpid())
  8.     lock.release()
  9. if __name__ == '__main__':
  10.     lock=Lock()
  11.     for i in range(3):
  12.         p=Process(target=work,args=(lock,))
  13.         p.start()
复制代码

可以看到按序输出,加锁实现由并发变成了串行,牺牲了运行服从,但避免了竞争。

2.基于递归锁的进程数据同步

示例代码如下:
  1. from multiprocessing import Process,RLock
  2. import multiprocessing
  3. import time
  4. mutex = RLock()
  5. def test(mutex):
  6.     if mutex.acquire():
  7.         print("I am %s" % multiprocessing.current_process().name)
  8.         if mutex.acquire():
  9.             print("I am %s" % multiprocessing.current_process().name)
  10.             mutex.release()
  11.             time.sleep(1)
  12.         mutex.release()
  13. if __name__ == '__main__':
  14.     for i in range(0, 10):
  15.         p = Process(target=test,args=(mutex,))
  16.         p.start()
复制代码
输出如下,可以看到输出正常:


3.基于信号量的进程数据同步

以下为示例代码,从运行结果中我们可以看到两个 print 依次输出。
  1. from multiprocessing import Process,Semaphore
  2. import multiprocessing
  3. import time
  4. _# 创建一个信号量,信号量是一个内部数据_
  5. _# 用于标明当前的共享资源可以有多少并发读取_
  6. semaphore = Semaphore(1)
  7. def test(semaphore):
  8.     _# 测试控制该资源的信号量。_
  9.     if semaphore.acquire():
  10.         _# 若此信号量的值为正,则允许进行使用该资源。线程将信号量减 1。_
  11.         print("I am %s,pid:%s" % (multiprocessing.current_process().name,multiprocessing.current_process().pid))
  12.         print("%s am runing" % multiprocessing.current_process().name)
  13.         semaphore.release()
  14. if __name__ == '__main__':
  15.     for i in range(0, 10):
  16.         p = Process(target=test,args=(semaphore,))
  17.         p.start()
复制代码

4.基于条件变量的进程数据同步

这里,我们以生产者消费者模型为例,在多线程的基于条件变量的线程同步示例中,我们利用全局变量看成缓存,在本例中,我们同样如此。
有些同学会问,不是说进程中的数据是独立的吗?没错,在多进程编程中,差别的进程之间默认情况下是无法共享数据的。但 Python 提供了一些机制来实现多进程间的数据共享,其中之一是共享内存。共享内存允许多个进程共享一个存储地区,一个进程写入共享内存中的信息,其他进程可以方便的读取。在 Python 中可以利用 Value、Array 将数据存储在共享内存中,也可以利用 multiprocessing 模块中 sharedctypes 自定义共享内存的 ctypes 对象。

下例中,以 value 对象作为缓存,只要缓存不满,生产者一直向缓存生产;只要缓存不空,消费者一直从缓存取出(之后烧毁)。
当缓冲队列不为空的时候,生产者将通知消费者;当缓冲队列不满的时候,消费者将通知生产者。
  1. from multiprocessing import Process,Condition,Value
  2. import multiprocessing
  3. import time
  4. condition = Condition()
  5. products = Value('i', 0)
  6. _# 生产者进程_
  7. def Producer(condition,products):
  8.     while True:
  9.         if condition.acquire():
  10.         _# 消费者通过拿到锁来修改共享的资源_
  11.             if products.value < 10:
  12.             _# 如果产品数量小于 10,继续生成,并通过 notify 方法通知消费者_
  13.             _# 只要缓存不满,生产者一直向缓存生产;_
  14.                 products.value += 1;
  15.                 print("Producer(%s):deliver one, now products:%s" % (multiprocessing.current_process().name, products.value))
  16.                 _# 当缓冲队列不为空的时候,生产者将通知消费者_
  17.                 condition.notify()
  18.             else:
  19.             _# 如果已经满了,那么生产者进入等待状态,直到被唤醒_
  20.                 print("Producer(%s):already 10, stop deliver, now products:%s" %(multiprocessing.current_process().name, products.value))
  21.                 condition.wait()
  22.             _# 释放资源_
  23.             condition.release()
  24.             time.sleep(1)
  25. _# 消费者进程_
  26. def Consumer(condition,products):
  27.     while True:
  28.         if condition.acquire():
  29.             if products.value > 1:
  30.             _# 只要缓存不空,消费者一直从缓存取出(之后销毁)_
  31.                 products.value -= 1
  32.                 print("Consumer(%s):consume one, now products:%s" % (multiprocessing.current_process().name, products.value))
  33.                 _# 当缓冲队列不满的时候,消费者将通知生产者。_
  34.                 condition.notify()
  35.             else:
  36.             _# 缓存空,消费者线程等待_
  37.                 print("Consumer(%s):only 1, stop consume, products:%s" % (multiprocessing.current_process().name, products.value))
  38.                 condition.wait()
  39.             _# 释放资源_
  40.             condition.release()
  41.             time.sleep(1)
  42. if __name__ == '__main__':
  43.     ProducerProcess = Process(target=Producer,args=(condition,products))
  44.     ConsumerProcess = Process(target=Consumer, args=(condition, products))
  45.     ProducerProcess.start()
  46.     ConsumerProcess.start()
复制代码
如下为运行情况,可以看到正常运行。

5.基于事故的进程数据同步

示例代码和运行结果如下,与多线程中雷同。
  1. from multiprocessing import Process,Event
  2. import multiprocessing
  3. event = Event()
  4. def worker(event_obj, i):
  5.     print('{i}号进程等待事件信号'.format(i=i))
  6.     event_obj.wait()
  7.     print('{i}号进程收到事件信号'.format(i=i))
  8. if __name__ == '__main__':
  9.     for i in range(5):
  10.         p = Process(target=worker, args=(event, i))
  11.         p.start()
  12.     event.set()
复制代码

6.基于屏蔽的进程数据同步

下面的代码展示了如何利用 barrier() 函数来同步两个进程。我们有 4 个进程,进程 1 和进程 2 由 barrier 语句管理,进程 3 和进程 4 没有同步策略。
  1. import multiprocessing
  2. from multiprocessing import Barrier, Lock, Process
  3. import time
  4. from datetime import datetime
  5. def test_with_barrier(synchronizer, serializer):
  6.     name = multiprocessing.current_process().name
  7.     synchronizer.wait()
  8.     now = time.time()
  9.     time.sleep(1)
  10.     with serializer:
  11.         print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))
  12. def test_without_barrier():
  13.     name = multiprocessing.current_process().name
  14.     now = time.time()
  15.     print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))
  16. if __name__ == '__main__':
  17.     synchronizer = Barrier(2)
  18.     serializer = Lock()
  19.     Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start()
  20.     Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start()
  21.     Process(name='p3 - test_without_barrier', target=test_without_barrier).start()
  22.     Process(name='p4 - test_without_barrier', target=test_without_barrier).start()
复制代码
运行代码,将看到进程 1 和进程 2 在同一时间打印:

下面这幅图表示了 barrier 如何同步两个进程:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

半亩花草

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表