【进程池与线程池 协程】

打印 上一主题 下一主题

主题 664|帖子 664|积分 1992

本日内容概要 - 2024.5.31




  • 死锁与递归锁(了解)
  • 信号量(了解)
  • Event事件(了解)
  • 线程q(了解)
  • 进程池与线程池(掌握)
  • 协程(了解)
  • 协程实现TCP服务端的并发效果(了解)
本日内容具体


死锁与递归锁
当你知道锁的使用枪锁必须要释放锁,其实你在操作锁的时候也极其轻易产生死锁现象。
  1. from threading import Thread, Lock
  2. import time
  3. mutexA = Lock()
  4. mutexB = Lock()
  5. # 类被实例化多次,产生的对象肯定是不相等的
  6. # 如果要实现实例化多次类,产生的对象是相等的 只能是单利模式
  7. class MyThread(Thread):
  8.     def run(self):
  9.         self.func1()
  10.         self.func2()
  11.     def func1(self):
  12.         mutexA.acquire()
  13.         print('%s 抢到了锁A锁' % self.name) # 当前线程的名称
  14.         mutexB.acquire()
  15.         print('%s 抢到了锁B锁' % self.name)
  16.         mutexB.release()
  17.         mutexA.release()
  18.     def func2(self):
  19.         mutexB.acquire()
  20.         print('%s 抢到了锁B锁' % self.name)
  21.         time.sleep(2)
  22.         mutexA.acquire()
  23.         print('%s 抢到了锁A锁' % self.name)
  24.         mutexA.release()
  25.         mutexB.release()
  26. if __name__ == '__main__':
  27.     for i in range(10):
  28.         t = MyThread()
  29.         t.start()
复制代码
递归锁
  1. """
  2. 递归锁的特点
  3.         可以被连续的acquire和release
  4.         内部有一个计数器 每acquire一次计数加1 没release一次计数器减一
  5. """
复制代码
信号量
信号量在不同阶段大概对应不同的技能点
在并发编程中信号量指的就是锁!!
  1. """
  2. 如果我们将互斥锁比喻成一个厕所
  3. 那么信号量就是公共厕所
  4. """
  5. from threading import Thread, Semaphore
  6. import time
  7. import random
  8. sm = Semaphore(5)  # 数字写几就代表几个坑位
  9. def task(name):
  10.     sm.acquire()
  11.     print('%s正在蹲坑' % name)
  12.     time.sleep(random.randint(1, 3))
  13.     sm.release()
  14. if __name__ == '__main__':
  15.     for i in range(20):
  16.         t = Thread(target=task, args=('伞兵%s号' % i,))
  17.         t.start()
复制代码
Event事件
一些进程/线程必要期待另外一些进程/线程完毕之后才能运行,类似于发射信号一样
  1. from threading import Thread, Event
  2. import time
  3. event = Event()
  4. def light():
  5.     print('红灯开始了')
  6.     time.sleep(3)
  7.     print('红灯结束了')
  8.     event.set()  # 告诉等待红灯的人可以走了
  9. def car(name):
  10.     print('%s 车正在等红灯' % name)
  11.     event.wait()  # 等待别人给你发信号
  12.     print('%s 车油门飙走了' % name)
  13. if __name__ == '__main__':
  14.     t = Thread(target=light)
  15.     t.start()
  16.     for i in range(1, 11):
  17.         t = Thread(target=car, args=('%s' % i,))
  18.         t.start()
复制代码
线程q(了解)
  1. """
  2. 同一个进程多个线程数据是共享的
  3. 为什么同一个进程还要使用队列
  4. 因为队列是: 管道 + 锁
  5. 所以队列还是为了保证数据的安全
  6. """
  7. import queue
  8. # 1 队列q 先进先出
  9. # q = queue.Queue()
  10. # q.put(1)
  11. # q.get()
  12. # q.get_nowait()
  13. # q.full()
  14. # q.empty()
  15. # print(q.get())
  16. # 2 堆栈q 后进先出
  17. # q = queue.LifoQueue(3)
  18. # q.put(2)
  19. # q.put(3)
  20. # print(q.get()) # 3
  21. # 优先级q 你可以给放入队列中的数据设置进出优先级
  22. q = queue.PriorityQueue(4)
  23. q.put((10, '11'))
  24. q.put((100, '22'))
  25. q.put((0, '33'))
  26. q.put((-5, '44'))
  27. print(q.get())  # (-5, '44')
  28. # 第一个数字代表优先级 数字越小,优先级越高
复制代码
进程池与线程池(掌握)
先回顾一下吧,之前TCP服务端实现并发的效果,每来一个人就开设一个进程大概线程行止理。
  1. """
  2. 无论是开设进程还是开设线程,都需要消耗资源,只不过开设线程的消耗比开设进程的消耗稍微小一点而已。
  3. 我们不可能无限制的开设进程和线程,因为计算机的硬件资源跟不上。
  4. 我们的宗旨保证计算机硬件正常工作的情况下最大限度的利用它。
  5. """
  6. # 池的概念
  7. """
  8. 什么是池?
  9.         池是用来保障计算机硬件安全的情况下最大限度的利用计算机
  10.         它降低了程序的运行效果但是保证了计算机硬件的安全,从而让程序能够正常运行。
  11. """
复制代码
基本操作
  1. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
  2. import time
  3. import os
  4. # pool = ThreadPoolExecutor(5) # 池子里面固定只有五个线程
  5. # 括号内可以传数字 不穿的话默认会开设当前计算机cpu个数 + 4 与 32 之间取最小值
  6. pool = ProcessPoolExecutor()
  7. # 括号内可以传数字 不传的话默认会开设当前计算机CPU个数进程
  8. """
  9. 池子造出来以后 里面会固定存在5个线程
  10. 这五个线程不会出现重复创建和销毁的过程
  11. """
  12. def task(n):
  13.     print(n,os.getpid())
  14.     time.sleep(2)
  15.     return n * n
  16. def callback(n):
  17.     print('call_back>>>:',n.result())
  18. """
  19. 提交任务
  20.     同步:提交任务之后原地等待返回结果,期间不做任何操作
  21.     异步:提交任务之后不原地等待返回结果,继续往下执行
  22.         返回结果如何获取???
  23.         异步提交任务的返回结果 应该通过回调机制来获取
  24.         回调机制
  25.             就相当于给每个异步任务绑定了一个定时炸弹
  26.             一旦该任务有结果立刻触发爆炸
  27. """
  28. if __name__ == '__main__':
  29.     # pool.submit(task, 1)  # 向池子中提交任务  异步提交
  30.     # print('主')
  31.     t_list = []
  32.     for i in range(20):
  33.         res = pool.submit(task, i).add_done_callback(callback)  # <Future at 0x286a8ade310 state=running>
  34.         # print(res.result())  # result()方法 同步提交
  35.         # t_list.append(res)
  36.     # 等待线程池中所有的任务执行完毕之后再继续往下执行
  37.     # pool.shutdown()  # 关闭线程池 等待线程池中所有任务运行完毕
  38.     # for t in t_list:
  39.     #     print('>>>:', t.result())
  40.     """
  41.     程序由并发变成了串行
  42.     res.result() 拿到的是异步提交任务的返回结果
  43.     """
复制代码
总结
  1. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
  2. pool = ProcessPoolExecutor()
  3. res = pool.submit(task, i).add_done_callback(callback)
复制代码
协程

  1. """
  2. 进程:资源单位
  3. 线程:执行单位
  4. 协程:这个概念完全是程序员自己想出来的 根本不存在
  5.         单线程下实现并发
  6.         程序员自己在代码层面上检查我们所有遇到的IO操作
  7.         一旦遇到IO了 我们在代码级别完成切换
  8.         这样给CPU的感觉是你这个程序一直在运行 没有IO
  9.         从而提升程序的运行效率
  10.        
  11. 多道技术
  12.         切换+保存状态
  13.         CPU两种切换
  14.                 1.程序遇到IO
  15.                 2.程序长时间占用
  16.                
  17. TCP服务器
  18.         accept
  19.         recv
  20.        
  21. 代码如何做到
  22.         切换+保存状态
  23. 切换
  24.         切换不一定是提升效率 也有可能是降低效率
  25.         IO切    提升
  26.         没有IO切 降低
  27.        
  28. 保存状态
  29.         保存上一次执行的状态 下一次接着上一次的操作继续往后执行
  30.         yield
  31. """
复制代码
严重切换是否就肯定提升效率
  1. import time
  2. # 串行执行计算机密集型的任务 0.17453885078430176
  3. # def func1():
  4. #     for i in range(1000000):
  5. #         i + 1
  6. #
  7. #
  8. # def func2():
  9. #     for i in range(1000000):
  10. #         i + 1
  11. # start_time = time.time()
  12. # func1()
  13. # func2()
  14. # print(time.time() - start_time)
  15. # 切换 + yield
  16. import time
  17. def func1():
  18.     while True:
  19.         100000 + 1
  20.         yield
  21. def func2():
  22.     g = func1()
  23.     for i in range(100000):
  24.         i + 1
  25.         next(g)
  26. start_time = time.time()
  27. # func1()
  28. func2()
  29. print(time.time() - start_time) # 0.012012958526611328
复制代码
gevent模块
  1. from gevent import monkey;monkey.patch_all()
  2. # monkey.patch_all()
  3. import time
  4. from gevent import spawn
  5. """
  6. gevent模块本身无法检查常见的一些IO操作
  7. 在使用时需额外的导入一句话,可支持简写
  8. """
  9. def heng():
  10.     print('哼')
  11.     time.sleep(2)
  12.     print('哼')
  13. def ha():
  14.     print('哈')
  15.     time.sleep(3)
  16.     print('哈')
  17. def heihei():
  18.     print('hahaha')
  19.     time.sleep(5)
  20.     print('hahahha')
  21. start_time = time.time()
  22. g1 = spawn(heng)
  23. g2 = spawn(ha)
  24. g3 = spawn(heihei)
  25. g1.join() # 等待被检测的任务执行完毕 再往后继续执行 3.0827553272247314
  26. g2.join()
  27. g3.join()
  28. # heng()
  29. # ha()
  30. print(time.time() - start_time)  # 5.043513774871826
复制代码
协程实现TCP服务端的并发
  1. from gevent import monkey;monkey.patch_all()
  2. from gevent import spawn
  3. import socket
  4. def connection(conn):
  5.     while True:
  6.         try:
  7.             data = conn.recv(1024)
  8.             if len(data) == 0: break
  9.             conn.send(data.upper())
  10.         except ConnectionError as e:
  11.             print(e)
  12.             break
  13.     conn.close()
  14. def server(ip, port):
  15.     server = socket.socket()
  16.     server.bind((ip, port))
  17.     server.listen(5)
  18.     while True:
  19.         conn, addr = server.accept()
  20.         spawn(connection, conn)
  21. if __name__ == '__main__':
  22.     g = spawn(server, '127.0.0.1', 8080)
  23.     g.join()
  24.    
  25.    
  26.    
  27. # 客服端
  28. import socket
  29. from threading import Thread, current_thread
  30. def x_client():
  31.     client = socket.socket()
  32.     client.connect(('127.0.0.1', 8080))
  33.     n = 0
  34.     while True:
  35.         msg = '%s say hello %s' % (current_thread().name, n)
  36.         n += 1
  37.         client.send(msg.encode('utf-8'))
  38.         data = client.recv(1024)
  39.         print(data.decode('utf-8'))
  40. if __name__ == '__main__':
  41.     for i in range(500):
  42.         t = Thread(target=x_client)
  43.         t.start()
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

干翻全岛蛙蛙

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表