多并发发短信处置惩罚(头条项目-07)
1 pipeline操纵 Redis数据库Redis 的 C/S 架构:
[*]基于客户端-服务端模子以及哀求/相应协议的 TCP服务。
[*]客户端向服务端发送⼀个查询哀求,并监听Socket返回。
[*]通常是以 壅闭模式,等候服务端相应。
[*]服务端处置惩罚下令,并将结果返回给客户端。
存在的题目:
[*]如果Redis服务端 同时处置惩罚多个哀求,加上⽹络耽误,那么服务端利⽤率不⾼,服从低沉。
办理的办法:
[*]管道pipeline
1.1 pipeline的先容
管道pipeline
[*]可以⼀次性发送多条下令并在执⾏完后⼀次性将结果返回。
[*]pipeline通过 镌汰客户端与Redis的通讯次数 来实现低沉来回延时时间。
实现的原理
[*]实现的原理是 队列。
[*]Client可以将三个下令 放到⼀个tcp报⽂⼀起发送。
[*]Server则可以 将三条下令的处置惩罚结果放到⼀个tcp报⽂ 返回。
[*]队列是先辈先出,如许就包管数据的次序性。
1.2 pipeline操纵Redis数据库
1.2.1 实现步调
1. 创建Redis管道
2. 将Redis哀求添加到队列
3. 执⾏哀求
1.2.2 代码实现
# 创建Redis管道
pl = redis_conn.pipeline()
# 将Redis请求添加到队列
pl.setex('sms_%s' % phone, 60, smscode)
pl.setex('is_send_%s' % phone, 60, 1)
# 执⾏请求
pl.execute()
2 生产者消耗者筹划模式
存在的题目:
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvMzlhYjk0NGMwMDljNDhjZmI4MjUzYzk1MzhkNjg2YjEucG5n
性能优化:
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvMmI2ZjJkZjQ0ZWQwNDI2MWE4OWU5MjRmMTY0NGE2OTEucG5n
思索:怎样 将发送短信从主业务中解耦出来。
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvNWMwNmFiOTc3MmFlNDBkNTkwZTQwZGVkNTM5MmM3M2EucG5n
⽣产者消耗者筹划模式先容
[*]为了将发送短信从主业务中解耦出来,我们 引⼊⽣产者消耗者筹划模式。
[*]它是最常⽤的解耦⽅式之⼀,探求中心⼈(broker)搭桥,包管两个业务没有直接关联。
总结:
[*]⽣产者⽣成消息,缓存到消息队列 中,消耗者读取消息队列中的消息并执⾏。
[*]由芒果头条⽣成发送短信消息,缓存到消息队列中,消耗者读取消息队列中的发送短信消息并执⾏。
3 RabbitMQ先容和使用
3.1 RabbitMQ先容
[*]开源AMQP实现,Erlang 语⾔编写,⽀持多种客户端
[*]分布式、⾼可⽤、长期化、可靠、安全
[*]⽀持多种协议:AMQP、STOMP、MQTT、HTTP
[*]适⽤于多体系之间 的业务解耦的消息中心件
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvM2E1MGVhMzY1ZmExNDYyNTk1NjEyZDg0ZTI2ZjliZTUucG5n
3.2 消息队列 选择发起
3.2.1 Kafka
Kafka紧张特点是基于Pull的模式来处置惩罚消息消耗,寻求⾼吞吐量,⼀开始的⽬的就 是⽤于⽇志网络和传输,得当产⽣⼤量数据的互联⽹服务的数据网络业务。
⼤型公司发起可以选⽤,如果有⽇志收罗功能,肯定是⾸选kafka 了。
3.2.2 RocketMQ
天⽣为⾦融互联⽹范畴⽽⽣,对于可靠性要求很⾼的场景,尤其是电商⾥⾯的订单 扣款,以及业务削峰,在⼤量买卖业务涌⼊时,后端大概⽆法实时处置惩罚的情况。
RoketMQ在稳固性上大概更值得信任,这些业务场景在阿⾥双11已经履历了多次磨练,如果你的业务有上述并发场景,发起可以选择RocketMQ。
3.2.3 RabbitMQ
RabbitMQ: 团结erlang语⾔自己的并发上风,性能较好,社区活泼度也⽐较⾼,但是 倒霉于做⼆次开辟和维护。不外,RabbitMQ的社区⼗分活泼,可以办理开辟过程 中碰到的bug。
如果你的数据量没有那么⼤,⼩公司优先选择功能⽐较完备的RabbitMQ。
3.3 安装RabbitMQ(ubuntu 18.04)
安装⽅式1(保举):
[*]安装Erlang 参考安装Erlang版本
[*]安装RabbitMQ 参考官⽹安装步调
[*]rabbitmq-server 安装包下载链接
安装⽅式2:
# 1. 安装erlang
#由于rabbitmq需要erlang语⾔的⽀持,在安装rabbitmq之前需要安装erlang
sudo apt-get install erlang-nox
# 2. 安装Rabbitmq
#更新源
sudo apt-get update
#安装
sudo apt-get install rabbitmq-server 服务器操纵:
# 重启服务器
$ sudo systemctl restart rabbitmq-server
# 启动服务器
$ sudo systemctl start rabbitmq-server
# 关闭服务器
$ sudo systemctl stop rabbitmq-server
# 查看服务器状态
sudo service rabbitmq-server status
# 查看rabbitmq 基本信息
sudo rabbitmqctl status 3.4 添加admin,并赋予administrator权限
# 添加admin⽤户,密码设置为admin。
sudo rabbitmqctl add_user admin admin
# 赋予权限
sudo rabbitmqctl set_user_tags admin administrator
# 赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源
sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
# 查看⽤户列表
sudo rabbitmqctl list_users
# 删除⽤户
$ sudo rabbitmqctl delete_user admin 3.5 启动服务器测试
# 安装了Rabbitmq后,默认也安装了该管理⼯具,执⾏命令即可启动
sudo rabbitmq-plugins enable rabbitmq_management(先定位到rabbitmq安装⽬录)
# 浏览器访问
http://localhost:15672/ 3.6 Python访问RabbitMQ
[*]RabbitMQ 提供默认的administrator账户
[*]⽤户名和暗码:guest、guest
[*]协议:amqp
[*]所在:localhost
[*]端⼝:15672
[*]查察队列中的消息:sudo rabbitctl list_queues
# Python3虚拟环境下,安装pika
$ pip install pika
# ⽣产者代码:producer.py
import pika
# 链接到RabbitMQ服务器
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost', 5672, '/', credentials))
# 创建频道
channel = connection.channel()
# 声明消息队列
channel.queue_declare(queue='zhuozi')
# routing_key是队列名 body是要插⼊的内容
channel.basic_publish(exchange='',
routing_key='zhuozi', body=b'Hello RabbitMQ!')
print("开始向 'zhuozi' 队列中发布消息 '汉堡做好啦!'")
# 关闭链接
connection.close()
# 消费者代码:consumer.py
import pika
# 链接到rabbitmq服务器
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost', 5672, '/', credentials))
# 创建频道,声明消息队列
channel = connection.channel()
# 和⽣产者声明同⼀个队列,如果⼀⽅挂掉,不会丢失数据
channel.queue_declare(queue='zhuozi')
# 定义接受消息的回调函数
def callback(channel, method, properties, body):
print(body)
# 告诉RabbitMQ使⽤callback来接收信息
channel.basic_consume(on_message_callback=callback, queue='zhuozi',
auto_ack=True)
# 开始接收信息
channel.start_consuming() 3.7 RabbitMQ设置长途访问
直接使⽤新建的管理员⽤户访问即可长途访问。
4 Celery 先容和使用
存在题目:
[*]消耗者取到消息之后,必要 异步处置惩罚。
[*]任务大概出现⾼并发的情况,必要多任务的⽅式执⾏。
[*]耗时任务很多种,每种耗时任务编写的⽣产者和消耗者代码有重复。
[*]取到的消息什么时间执⾏,以什么样的⽅式执⾏。
结论:
[*]现实开辟中,我们可以 借助成熟的⼯具Celery 来完成。
[*]有了Celery,我们在使⽤⽣产者消耗者模式时,只必要关注任务自己,极⼤的 简化了步调员的开辟流程。
4.1 Celery先容
Celery先容:
[*]⼀个 简单、机动且可靠、处置惩罚⼤量消息的分布式体系,可以在⼀台大概多台呆板上运⾏。
[*]单个 Celery历程每分钟可处置惩罚数以百万计 的任务。
[*]通过消息进⾏通讯,使⽤消息队列(broker)在客户端和消耗者之间进⾏和谐。
安装Celery:
$ pip install -U Celery Celery官⽅⽂档
4.2 创建Celery实例并加载设置
4.2.1 界说Celery包
# mgproject/mgproject/celery_tasks
在项⽬包⽬录下创建celery_tasks(python package) 4.2.2 创建Celery实例
在celery_tasks包⽬录下 创建main.py⽂件
# celery_tasks/main.py
# celery启动⽂件
from celery import Celery
# 创建celery实例
celery_app = Celery('mangguo') 4.2.3 加载Celery设置
在celery_tasks包⽬录下 创建config.py⽂件
# celery_tasks/config.py
# 指定消息队列的位置
broker_url= 'amqp://guest:guest@localhost:5672'
# 修改celery_tasks/main.py
# celery启动⽂件
from celery import Celery
# 创建celery实例
celery_app = Celery('mangguo')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config') 4.3 界说发送短信托务
在celery_tasks包⽬录下创建 sms(python包)/tasks.py
4.3.1 注册任务
celery_tasks.main.py
from celery import Celery
# 创建celery实例
celery_app = Celery('mangguo')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')
# ⾃动注册celery任务
celery_app.autodiscover_tasks(['celery_tasks.sms']) 4.3.2 界说任务
celery_tasks.sms.tasks.py
import os
import sys
# 添加导包路径
B_DIR = os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__))))
sys.path.insert(1, B_DIR)
sys.path.insert(0, os.path.join(B_DIR, 'utils'))
import logging
from celery_tasks.main import celery_app
# 为celery使⽤django配置⽂件进⾏设置
if not os.getenv('DJANGO_SETTINGS_MODULE'):
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.dev')
from huyi_sms.sms3 import send_sms_code
logger = logging.getLogger('django')
@celery_app.task(name='huyi_send_sms_code')
def huyi_send_sms_code(phone, smscode_str):
"""
发送短信异步任务
:param phone: ⼿机号
:param smscode: 短信验证码
:return: 成功 code=2 或 失败 smsid=0
"""
try:
# 调⽤外部接⼝执⾏发送短信任务
ret = send_sms_code(smscode_str, phone)
except Exception as e:
logger.error(e)
if ret.get('code') != 2:
logger.error(e)
return ret.get('code', None) 4.4 启动Celery服务
$ cd ~/Desktop/projects/mangguo/mgproject
$ celery -A celery_tasks.main worker -l info
[*]-A指对应的应⽤步调, 其参数是项⽬中 Celery实例的位置。
[*]worker指这⾥要启动的worker。
[*]-l指⽇志品级,⽐如info品级。
4.5 调⽤发送短信托务
# verifications/views.py
from mgproject.celery_tasks.sms.tasks import huyi_send_sms_code
# Celery异步发送短信验证码
ret = huyi_send_sms_code.delay(phone, smscode_str)
# 8. 根据外部接⼝返回值响应前端结果
if ret:# 执⾏⼀个任务就返回⼀个taskid 689e889c-a607-49f3-9777-248a8dcce310
return JsonResponse({'code': '200', 'errormsg': 'OK'})
return JsonResponse({'code': '5001', 'errormsg': '发送短信验证码错误'}) 4.6 增补celery worker的⼯作模式
[*]默认是 历程池⽅式,历程数以当前呆板的CPU核数为参考,每个CPU开四个进 程。
[*]怎样⾃⼰指定历程数:celery -A proj worker --concurrency=4
[*]怎样改变历程池⽅式为协程⽅式:
[*]celery -A proj worker --concurrency=1000 -P eventlet -c 1000
# 安装eventlet模块
$ pip install eventlet
# 启⽤ Eventlet 池
$ celery -A celery_tasks.main worker -l info -P eventlet -c 1000 https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvYjQ3N2RlOTRhYTYxNDY5OThhN2VkNWNlNTYyMjAzMDMucG5n
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvOWExYzc2ZDRhZjk3NDUyZjlkNTA3YzY2NTUzMjA0Y2YucG5n
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]