如何保证消息有序消耗
方案1:
生产者发送消息时指定雷同的key发往雷同的分区,发送的顺序是有序的,消息队列的数据接口是一个队列,先进先出原则,消耗者依次消耗,此时即可保证有序消耗;
方案2:
单线程有序 ,但服从极低,但貌似作用不大;
方案3:
生产者按雷同key发雷同分区稳定,消耗者接收消息对key举行hash取模(防止线程之间快慢不一导致无序),对于雷同的key放到同一个队列再去消耗;即高效又保证有序
方案3可共同策略模式再来一套组合拳(差别的key差别的处置惩罚策略)!
Cunsumer是被动推还是自动拉
全部消息服务消耗者获取消息都会有两种形式,一种是消耗者被动的接收broker推送过来的消息,另一种是消耗者自动到broker上去拿消息。
首先阐明Kafka这个消息队列是接纳的自动拉取消息的形式;接下来说一说push和pull的优缺点;
**push缺:**消息推送频率由broker控制,鄙俚服务处置惩罚消息的速度跟不上推送速度容易把鄙俚consumer搞瓦解了
push优: 消息体系都致力于让 consumer 以最大的速率最快速的消耗消息
**pull优:**消息的处置惩罚速度由consumer自己控制,可根据自己的消耗能力自己配置;不会出现consumer瓦解问题
pull缺: 如果 broker 没有可供消耗的消息,将导致 consumer 不绝在循环中轮询,直到新消息到达。为了制止这点,Kafka 有个参数可以让 consumer壅闭知道新消息到达(固然也可以壅闭知道消息的数量达到某个特定的量如许就可以批量发送)。
kafka 维护消耗状态跟踪的方法
1、当broker中的一条消息被consumer拉取后立即删除该消息;
优:淘汰broker的空间占用
缺:consumer拉取消息后消息消耗失败了,broker也删除了,导致消息未消耗
2、consumer拉取完消息,broker将消息置为已发送,consumer消耗乐成后关照broker更改消息状态为以消耗
优:解决消息丢失问题
缺:1、consumer消耗乐成,但是没有正常关照broker,会重复消耗
3、broker 必须维护每条消息的状态,并且每次都要先锁住消息然后更改状态然后开释锁;如果一条消息长时间没有收到消耗乐成的关照,就会一致被锁定,且消息量打起来还要维护状态,不太可
解决方案: Topic 被分成了多少分区,每个分区在同一时间只被一个 consumer 消耗 ;每个分区消耗的就是一个offset(整数),同时该方案还可以消耗历史数据。
Zookeeper VS Kafka
zookeeper作用于Kafka的分布式应用,也就是说对单节点的Kafka没啥大用
主要为kafka提供以下功能:
1、 用于提交偏移量 , 如果节点在任何环境下都失败了,它都可以从之条件交的偏移量中获取
2、 leader 检测
3、 分布式同步
4、 配置管理
5、 辨认新节点何时离开或毗连、集群、节点及时状态
如何判定kafka集群中的某一节点是否还存活
1、心跳机制:kafka的每一个节点都会在zookeeper中注册,如果zookeeper发现过了自己的心跳机制该节点还没有给到反馈,则认为其死亡
2、同步机制:如果某一结点为 follower ,如果不能正常的同步leader节点的数据则也可认为其死亡
拦截器&序列化器&分区器
生产者生产消息到broker需颠末拦截器 -> 序列化器 -> 分区器一系列的数据封装、转化、发送;
拦截器
作用:
1、可以用来在消息发送前做一些预处置惩罚工作,比如给消息添加前缀内容等;
2、可以用来在发送回调逻辑前做一些定制化的工作,比如统计乐成率等
使用:
- public interface ProducerInterceptor<K, V> extends Configurable {
- // 该方法会在消息发送之前被调用
- ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
- // 该方法会在消息成功提交或发送失败之后被调用
- void onAcknowledgement(RecordMetadata metadata, Exception exception);
- // 关闭interceptor,主要用于执行一些资源清理工作
- void close();
- }
复制代码 使用该拦截器仅需自界说拦截器类并实现该接口,在onSend()方法写相关业务代码;且在KafkaProducer配置好自界说拦截器参数(properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,自界说拦截器.class.getName());)
onAcknowledgement() 是KafkaProducer在消息被乐成应答或者发送失败时被调用,还要优于用户设定的Callback方法之前执行。(ps:该方法一般在producer的I/O线程中运行,所以要充足快,不然会影响消息发送服从)
close() 见名知意,不多表明啦
PS:
可界说多个拦截器;发送消息的时候这条消息就像过减速带一样一个拦截器一个拦截器的过,并且后一个拦截器入参为前一个拦截器处置惩罚后的数据;
拦截器执行顺序取决于生产者配置拦截器的顺序
序列化器
生产者序列化器将消息按指定的序列化格式序列化成字节码;生产者序列化器将消息按指定的序列化格式再序列化回来;
如果生产者和消耗者的序列化器用的不是一个;数据将会序列化不乐成;
分区器
- public interface Partitioner extends Configurable, Closeable {
- //计算分区号 参数分别是主题、键、序列化后的键、值、序列化后的值和集群的元数据信息
- int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
- //表示通知分区程序用来创建新的批次。
- void close();
- //表示通知分区程序用来创建新的批次
- default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
- }
- }
复制代码 PS:此中重点关注的是key这个参数;如果key不为空,则默认的分区器会先对key举行hash,保证雷同的key进入到同一个分区;如果key为空,那么消息以轮询的方式发送到topic的各个可用的分区;
Kafka高吞吐量计划
分布式架构:Kafka接纳分布式架构,将数据分散保存在多个Broker上,每个节点都有自己的副本,可以实现数据的高可用和负载平衡,从而进步整个体系的吞吐量
零拷贝技术:淘汰了数据复制和传输次数;也就是说Kafka不必要将数据从内核缓冲区复制到用户空间,也不必要将数据从一个应用程序复制到另外一个应用程序,淘汰了数据开销
批量处置惩罚:Kafka能将多个消息批量处置惩罚,淘汰网络传输次数
磁盘顺序写入:Kafka将数据写入磁盘接纳顺序写入,比随机写入快的多
零拷贝&传统传输
传统传输:A首先将文件从磁盘读取到内核缓冲区,然后将数据从内核缓冲区复制到A的用户空间。接下来,A将数据从它的用户空间复制到B的用户空间,末了B将数据写入磁盘或者举行其他的操纵。
零拷贝:A将文件从磁盘读取到内核缓冲区后,直接将数据从内核缓冲区传输到B的用户空间,而不必要颠末A的用户空间。
PS:同时零拷贝相较于传传统数据传输也有一些缺点;
(兼容性较差:零拷贝必要操纵体系和硬件平台的支持,不兼容会导致无法使用)
(安全性较低:零拷贝的绕过用户空间,直接在内核缓冲区传输,可能出现数据被其他程序或软件篡改)
磁盘随机写入&顺序写入
磁盘的读写速度受到寻道时间、旋转延迟和数据传输速率三个因素的影响。此中,寻道时间是指磁头从磁盘的一个位置移动到另一个位置所必要的时间;旋转延迟是指等待磁盘旋转到正确的扇区的时间;数据传输速率是指磁盘的读写速度。
顺序写入:磁盘可以预先将必要写入的数据缓存到内存中,并将其按照顺序写入到磁盘的相邻扇区中。如许,磁盘的磁头只必要沿着一个方向移动,不必要举行频繁的寻道操纵,旋转延迟也可以被最小化。因此,顺序写入的速度会更快。
随机写入:写入的数据可能会分散在磁盘的差别区域,并且可能会在差别时间举行写入。如许,磁头必要频繁地移动到差别的位置举行寻道操纵,旋转延迟也会增长。这会导致磁盘的读写速度变慢,因此随机写入的速度会比顺序写入慢。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |