本日内容概要 - 2024.5.31
- 死锁与递归锁(了解)
- 信号量(了解)
- Event事件(了解)
- 线程q(了解)
- 进程池与线程池(掌握)
- 协程(了解)
- 协程实现TCP服务端的并发效果(了解)
本日内容具体
死锁与递归锁
当你知道锁的使用枪锁必须要释放锁,其实你在操作锁的时候也极其轻易产生死锁现象。
- from threading import Thread, Lock
- import time
- mutexA = Lock()
- mutexB = Lock()
- # 类被实例化多次,产生的对象肯定是不相等的
- # 如果要实现实例化多次类,产生的对象是相等的 只能是单利模式
- class MyThread(Thread):
- def run(self):
- self.func1()
- self.func2()
- def func1(self):
- mutexA.acquire()
- print('%s 抢到了锁A锁' % self.name) # 当前线程的名称
- mutexB.acquire()
- print('%s 抢到了锁B锁' % self.name)
- mutexB.release()
- mutexA.release()
- def func2(self):
- mutexB.acquire()
- print('%s 抢到了锁B锁' % self.name)
- time.sleep(2)
- mutexA.acquire()
- print('%s 抢到了锁A锁' % self.name)
- mutexA.release()
- mutexB.release()
- if __name__ == '__main__':
- for i in range(10):
- t = MyThread()
- t.start()
复制代码 递归锁
- """
- 递归锁的特点
- 可以被连续的acquire和release
- 内部有一个计数器 每acquire一次计数加1 没release一次计数器减一
- """
复制代码 信号量
信号量在不同阶段大概对应不同的技能点
在并发编程中信号量指的就是锁!!
- """
- 如果我们将互斥锁比喻成一个厕所
- 那么信号量就是公共厕所
- """
- from threading import Thread, Semaphore
- import time
- import random
- sm = Semaphore(5) # 数字写几就代表几个坑位
- def task(name):
- sm.acquire()
- print('%s正在蹲坑' % name)
- time.sleep(random.randint(1, 3))
- sm.release()
- if __name__ == '__main__':
- for i in range(20):
- t = Thread(target=task, args=('伞兵%s号' % i,))
- t.start()
复制代码 Event事件
一些进程/线程必要期待另外一些进程/线程完毕之后才能运行,类似于发射信号一样
- from threading import Thread, Event
- import time
- event = Event()
- def light():
- print('红灯开始了')
- time.sleep(3)
- print('红灯结束了')
- event.set() # 告诉等待红灯的人可以走了
- def car(name):
- print('%s 车正在等红灯' % name)
- event.wait() # 等待别人给你发信号
- print('%s 车油门飙走了' % name)
- if __name__ == '__main__':
- t = Thread(target=light)
- t.start()
- for i in range(1, 11):
- t = Thread(target=car, args=('%s' % i,))
- t.start()
复制代码 线程q(了解)
- """
- 同一个进程多个线程数据是共享的
- 为什么同一个进程还要使用队列
- 因为队列是: 管道 + 锁
- 所以队列还是为了保证数据的安全
- """
- import queue
- # 1 队列q 先进先出
- # q = queue.Queue()
- # q.put(1)
- # q.get()
- # q.get_nowait()
- # q.full()
- # q.empty()
- # print(q.get())
- # 2 堆栈q 后进先出
- # q = queue.LifoQueue(3)
- # q.put(2)
- # q.put(3)
- # print(q.get()) # 3
- # 优先级q 你可以给放入队列中的数据设置进出优先级
- q = queue.PriorityQueue(4)
- q.put((10, '11'))
- q.put((100, '22'))
- q.put((0, '33'))
- q.put((-5, '44'))
- print(q.get()) # (-5, '44')
- # 第一个数字代表优先级 数字越小,优先级越高
复制代码 进程池与线程池(掌握)
先回顾一下吧,之前TCP服务端实现并发的效果,每来一个人就开设一个进程大概线程行止理。
- """
- 无论是开设进程还是开设线程,都需要消耗资源,只不过开设线程的消耗比开设进程的消耗稍微小一点而已。
- 我们不可能无限制的开设进程和线程,因为计算机的硬件资源跟不上。
- 我们的宗旨保证计算机硬件正常工作的情况下最大限度的利用它。
- """
- # 池的概念
- """
- 什么是池?
- 池是用来保障计算机硬件安全的情况下最大限度的利用计算机
- 它降低了程序的运行效果但是保证了计算机硬件的安全,从而让程序能够正常运行。
- """
复制代码 基本操作
- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
- import time
- import os
- # pool = ThreadPoolExecutor(5) # 池子里面固定只有五个线程
- # 括号内可以传数字 不穿的话默认会开设当前计算机cpu个数 + 4 与 32 之间取最小值
- pool = ProcessPoolExecutor()
- # 括号内可以传数字 不传的话默认会开设当前计算机CPU个数进程
- """
- 池子造出来以后 里面会固定存在5个线程
- 这五个线程不会出现重复创建和销毁的过程
- """
- def task(n):
- print(n,os.getpid())
- time.sleep(2)
- return n * n
- def callback(n):
- print('call_back>>>:',n.result())
- """
- 提交任务
- 同步:提交任务之后原地等待返回结果,期间不做任何操作
- 异步:提交任务之后不原地等待返回结果,继续往下执行
- 返回结果如何获取???
- 异步提交任务的返回结果 应该通过回调机制来获取
- 回调机制
- 就相当于给每个异步任务绑定了一个定时炸弹
- 一旦该任务有结果立刻触发爆炸
- """
- if __name__ == '__main__':
- # pool.submit(task, 1) # 向池子中提交任务 异步提交
- # print('主')
- t_list = []
- for i in range(20):
- res = pool.submit(task, i).add_done_callback(callback) # <Future at 0x286a8ade310 state=running>
- # print(res.result()) # result()方法 同步提交
- # t_list.append(res)
- # 等待线程池中所有的任务执行完毕之后再继续往下执行
- # pool.shutdown() # 关闭线程池 等待线程池中所有任务运行完毕
- # for t in t_list:
- # print('>>>:', t.result())
- """
- 程序由并发变成了串行
- res.result() 拿到的是异步提交任务的返回结果
- """
复制代码 总结
- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
- pool = ProcessPoolExecutor()
- res = pool.submit(task, i).add_done_callback(callback)
复制代码 协程
- """
- 进程:资源单位
- 线程:执行单位
- 协程:这个概念完全是程序员自己想出来的 根本不存在
- 单线程下实现并发
- 程序员自己在代码层面上检查我们所有遇到的IO操作
- 一旦遇到IO了 我们在代码级别完成切换
- 这样给CPU的感觉是你这个程序一直在运行 没有IO
- 从而提升程序的运行效率
-
- 多道技术
- 切换+保存状态
- CPU两种切换
- 1.程序遇到IO
- 2.程序长时间占用
-
- TCP服务器
- accept
- recv
-
- 代码如何做到
- 切换+保存状态
- 切换
- 切换不一定是提升效率 也有可能是降低效率
- IO切 提升
- 没有IO切 降低
-
- 保存状态
- 保存上一次执行的状态 下一次接着上一次的操作继续往后执行
- yield
- """
复制代码 严重切换是否就肯定提升效率
- import time
- # 串行执行计算机密集型的任务 0.17453885078430176
- # def func1():
- # for i in range(1000000):
- # i + 1
- #
- #
- # def func2():
- # for i in range(1000000):
- # i + 1
- # start_time = time.time()
- # func1()
- # func2()
- # print(time.time() - start_time)
- # 切换 + yield
- import time
- def func1():
- while True:
- 100000 + 1
- yield
- def func2():
- g = func1()
- for i in range(100000):
- i + 1
- next(g)
- start_time = time.time()
- # func1()
- func2()
- print(time.time() - start_time) # 0.012012958526611328
复制代码 gevent模块
- from gevent import monkey;monkey.patch_all()
- # monkey.patch_all()
- import time
- from gevent import spawn
- """
- gevent模块本身无法检查常见的一些IO操作
- 在使用时需额外的导入一句话,可支持简写
- """
- def heng():
- print('哼')
- time.sleep(2)
- print('哼')
- def ha():
- print('哈')
- time.sleep(3)
- print('哈')
- def heihei():
- print('hahaha')
- time.sleep(5)
- print('hahahha')
- start_time = time.time()
- g1 = spawn(heng)
- g2 = spawn(ha)
- g3 = spawn(heihei)
- g1.join() # 等待被检测的任务执行完毕 再往后继续执行 3.0827553272247314
- g2.join()
- g3.join()
- # heng()
- # ha()
- print(time.time() - start_time) # 5.043513774871826
复制代码 协程实现TCP服务端的并发
- from gevent import monkey;monkey.patch_all()
- from gevent import spawn
- import socket
- def connection(conn):
- while True:
- try:
- data = conn.recv(1024)
- if len(data) == 0: break
- conn.send(data.upper())
- except ConnectionError as e:
- print(e)
- break
- conn.close()
- def server(ip, port):
- server = socket.socket()
- server.bind((ip, port))
- server.listen(5)
- while True:
- conn, addr = server.accept()
- spawn(connection, conn)
- if __name__ == '__main__':
- g = spawn(server, '127.0.0.1', 8080)
- g.join()
-
-
-
- # 客服端
- import socket
- from threading import Thread, current_thread
- def x_client():
- client = socket.socket()
- client.connect(('127.0.0.1', 8080))
- n = 0
- while True:
- msg = '%s say hello %s' % (current_thread().name, n)
- n += 1
- client.send(msg.encode('utf-8'))
- data = client.recv(1024)
- print(data.decode('utf-8'))
- if __name__ == '__main__':
- for i in range(500):
- t = Thread(target=x_client)
- t.start()
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |