ToB企服应用市场:ToB评测及商务社交产业平台

标题: 消息中间件Kafka(PHP版本) [打印本页]

作者: 鼠扑    时间: 2024-7-22 13:06
标题: 消息中间件Kafka(PHP版本)
        小编最近必要用到消息中间件,有必要要复习一下从前的东西,有必要的自取,强调一点,如果真的想相识透彻,肯定要动手脑袋会了不代表就会写了
        Kafka是由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的全部动作流数据。 这种动作(网页欣赏,搜索和其他用户的行动)是在当代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来办理。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的办理方案。Kafka的目标是通过Hadoop的并行加载机制来同一线上和离线的消息处理,也是为了通过集群来提供实时的消息
Kafka 的特性

高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
持久性、可靠性: Kafka 可以或许允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据可以或许持久存储。
容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群可以或许正常工作
高并发: 支持数千个客户端同时读写
Kafka 的利用场景

活动跟踪:Kafka 可以用来跟踪用户举动,比如我们经常归去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你欣赏购物的时间,你的欣赏信息,你的搜索指数,你的购物爱好都会作为一个个消息通报给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
通报消息:Kafka 别的一个根本用途是通报消息,应用程序向用户发送通知就是通过通报消息来实现的,这些应用组件可以生成消息,而不必要关心消息的格式,也不必要关心消息是如何发送的。
度量指标:Kafka也经常用来记载运营监控数据。包罗收集各种分布式应用的数据,生产各种操纵的会合反馈,比如报警和报告。
日志记载:Kafka 的根本概念泉源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记载数据库的更新时间,通过kafka以同一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
流式处理:流式处理是有一个可以或许提供多种应用程序的范畴。
限流削峰:Kafka 多用于互联网范畴某一时候哀求特殊多的情况下,可以把哀求写入Kafka 中,避免直接哀求后端程序导致服务崩溃。
消息:
Kafka中的数据单位被称为消息,也被称为记载,可以把它看作数据库表中某一行的记载。
批次:为了进步效率,消息会分批次写入Kafka,批次就代指的是一组消息。
主题:
消息的种类称为主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。
分区:
主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个呆板上,有可能会摆设在多个呆板上,
由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法包管主题中全部的分区有序
生产者:
向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不停的向某个主题发送消息。
消费者:
订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。
消费者群组:
生产者与消费者的关系就犹如餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,
消费者群组(Consumer Group)指的就是由一个或多个消费者构成的群体。
偏移量:
偏移量(Consumer Offset)是一种元数据,它是一个不停递增的整数值,用来记载消费者发生重平衡时的位置,以便用来规复数据。
broker:
一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
broker 集群:
broker 是集群 的构成部门,broker 集群由一个或多个 broker 构成,每个集群都有一个 broker 同时充当了集群控制器的脚色(自动从集群的活跃成员中选举出来)。
副本:Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:向导者副本(Leader Replica) 和 跟随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的告急本领。
安装

下载地址

librdkafka获取地址:https://github.com/edenhill/librdkafka
kafka获取地址:https://github.com/arnaud-lb/php-rdkafka
安转java情况

下载地址:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
下载解压完成之后,设置系统变量:path(路径为:安装目录/bin)
设置情况变量:JAVA_HOME(路径为:安装目录\bin)
查看是否安装成功:java -version

Zookeeper安装

下载地址:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz
设置情况变量:path(路径为:安装目录\bin)
新建data文件夹,新建logs文件夹
config文件夹:zoo_sample.cfg  新复制一个:zoo.cfg
编辑zoo.cfg文件:
新增(配置路径【肯定要配置\\,要不然不识别】:安装路径\\zookeeper\\apache-zookeeper-3.6.4-bin\\):
dataDir= 安装路径\zookeeper\apache-zookeeper-3.6.4-bin\data
dataLogDir=安装路径\zookeeper\apache-zookeeper-3.6.4-bin\log
audit.enable=truezookeeper/conf/zoo.cfg 参数详解
tickTime=2000:
        这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间隔断,也就是每个 tickTime 时间就会发送一个心跳,单位是毫秒
        
initLimit=10:
        这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间隔断数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒,10秒内要启动集群并出现leader和floower。
syncLimit=5:
        这个配置项标识 Leader 与Follower 之间发送消息,哀求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒,超出时间以为是死机。
dataDir:
        快照日志的存储路径
dataLogDir:
        事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严肃影响zk的性能,当zk吞吐量较大的时间,产生的事物日志、快照日志太多
clientPort=12181:
        这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问哀求。修改他的端口改大点
启动zkServer

启动目录:\bin\zkServer.sh
启动命令:zkServer.sh start
查找看到:binding to port .0.0.0.0/0.0.0.2181能表示成功

安装Scala

下载地址:https://downloads.lightbend.com/scala/2.11.12/scala-2.11.12.msi
安装:一键安装(不绝next,直到完成)
配置情况变量(这个必要配置):安装目录/bin
判断是否安装完成:scala -version

Kafka安装

下载地址:https://kafka.apache.org/downloads
Kafka安装目录下新建目录logs
编辑config\server.properties文件
log.dirs=安装目录\\logs(注意双斜线,如果是cmd命令出现命令行太长,那就把Kafka安装安装在磁盘的最表面,D盘的最外层)
新增参数:listeners=PLAINTEXT://localhost:9092

启动

肯定要先启动zookeeper(命令:zkServer)
然后启动kafka(命令(cmd进入Kafka安装目录):.\bin\windows\kafka-server-start.bat .\config\server.properties)
查找看到:from now on will use node localhost:9092
能表示成功(如果启动不了,删除logs文件夹下的文件)

操纵:

创建topics(主题):

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

查看主题:


kafka-topics.bat --bootstrap-server localhost:9092 --list
生产者:

cmd进入:安装目录\bin\windows
打开生产者:kafka-console-producer.bat --broker-list localhost:9092 --topic test
消费者:

cmd进入:安装目录\bin\windows
打开消费者:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
测试:

生产者发送消息,消费者订阅收到消息

注意:
1.一共打开四个窗口,1.zookeeper 2.kafka  3.生产者  4.消费者  (注意,这四个窗口不能关闭,要不绝开着)
2.肯定是生产者生产消息,消费者才会收到消息(注意,生产者和消费者的topics肯定要是一样的,要不然收不到消息)


方法

getOutQlen方法

利用方法:$producer->getOutQLen();
作用:
1.用于获取生产者(Producer)内部队列中等待发送到Kafka broker的消息数量。
2.getOutQLen() 方法允许你查询这个内部队列中当前待发送的消息数量。通常用于监控和调试目标,帮助相识生产者的发送速率和队列积压情况
3.getOutQLen() 返回的是近似值,它可能在调用之间发生变化
输出数据:
int(0)

poll方法


利用方法:
while ($producer->getOutQLen() > 0) {
    $producer->poll(1);
}
作用:
用于从Kafka集群中拉取消息,当消费者调用poll()方法时,如果在规定的时间内没有收到任何消息,它会立刻返回,并且没有任何消息被拉取到(轮询一次就相当于拉取肯定时间段broker中可消费的数据)

 
flush方法


利用方法:$producer->flush(10000);
作用:将生产者内部缓冲区中的消息强制发送到Kafka broker的过程。

consumerstart方法


利用方法:$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
作用:在Kafka中,consumerstart方法是用于启动消费者线程并开始从Kafka集群中拉取消息的方法。

Consume方法


利用方法:$topic->consume(0, 120*10000);
作用:是指Kafka消费者从Kafka集群中读取消息的过程。首先必要从集群中先拉取数据

Purge方法


利用方法:$producer->purge(RD_KAFKA_PURGE_F_QUEUE);
作用:是指清除已完成或已过期的哀求,以释放缓存资源。

initTransactions方法


作用:用于初始化一个变乱。Kafka从0.11.0.0版本开始支持变乱性生产者API,允许生产者将多个消息组合成一个变乱,确保这些消息被原子性地写入Kafka。这意味着要么全部消息都成功写入,要么都不写入,包管了消息的一致性

beginTransaction方法


作用:用于开始一个新的生产者变乱。Kafka从0.11.0.0版本开始支持变乱性生产者API,它允许你将多个消息组合成一个变乱,确保这些消息被原子性地写入Kafka。这意味着要么全部消息都成功写入,要么都不写入,这包管了消息的一致性

commitTransaction方法


作用:它用于提交一个变乱。当你利用 Kafka 的变乱性生产者 API 时,你可以将一系列的消息发送操纵组合成一个原子性的变乱。这意味着这些操纵要么全部成功,要么全部失败,从而确保数据的一致性和顺序性

abortTransaction方法


作用:中止变乱,当发送消息或提交变乱过程中发生错误时利用

getMetadata方法

利用方法:$producer->getMetadata(false, $topic, 10*1000);
作用:
用于获取Kafka集群的元数据
获取数据包罗:1.主题(topics)2.分区(partitions)3.副本(replicas)4.ISR(In-Sync Replicas)等信息。
通常,客户端库(如PHP的php-kafka)会在初始化时或必要时自动实行此操纵,以便相识集群的状态和可用主题。

代码:


  1. class Kafka extends CI_Controller {
  2.     //定义变量(分区)
  3.     private $borker_list = "";
  4.     //定义变量(配置)
  5.     private $conf = "";
  6.     //定义变量(主题)
  7.     private $topics = "";
  8.     //定义变量(分组)
  9.     private $topics_group = "";
  10.     //构造
  11.     public function __construct(){
  12.         parent::__construct();
  13.         //初始化数据
  14.         $this->borker_list = "localhost:9092";
  15.         $this->topics = "test";
  16.         $this->topics_group = "test-group";
  17.     }
  18.     //消息生产者
  19.     public function producter(){
  20.         //初始化
  21.         $conf=  new RdKafka\Conf();
  22.         //设置分区
  23.         $conf->set('metadata.broker.list', $this->borker_list);
  24.         //初始化生产者
  25.         $producer = new RdKafka\Producer($conf);
  26.         //设置主题
  27.         $topic = $producer->newTopic($this->topics);
  28.         //产生信息
  29.         for ($i = 0; $i < 10; $i++) {
  30.             $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
  31.             $producer->poll(0);
  32.         }
  33.         //消息刷新
  34.         for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
  35.             $result = $producer->flush(10000);
  36.             if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
  37.                 break;
  38.             }
  39.         }
  40.         //刷新结果
  41.         if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
  42.             throw new \RuntimeException('Was unable to flush, messages might be lost!');
  43.         }
  44.     }
  45.     //消息订阅者
  46.     public function consumer(){
  47.         set_time_limit(0);
  48.         //初始化
  49.         $conf = new RdKafka\Conf();
  50.         //设置分区
  51.         $conf->set('metadata.broker.list', $this->borker_list);
  52.         $conf->set('group.id',$this->topics_group);
  53.         //初始化消费者
  54.         $rk = new RdKafka\Consumer($conf);
  55.         //主题配置
  56.         $topicConf = new RdKafka\TopicConf();
  57.         $topicConf->set('auto.commit.interval.ms', 100);
  58.         $topicConf->set('offset.store.method', 'file');
  59.         $topicConf->set('offset.store.path', sys_get_temp_dir());
  60.         $topicConf->set('auto.offset.reset', 'smallest');
  61.         $topic = $rk->newTopic($this->topics, $topicConf);
  62.         $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
  63.         while (true) {
  64.             $message = $topic->consume(0, 120*10000);
  65.             switch ($message->err) {
  66.                 case RD_KAFKA_RESP_ERR_NO_ERROR:
  67.                     //没有错误打印信息
  68.                     var_dump($message);
  69.                     break;
  70.                 case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  71.                     echo "等待接收信息\n";
  72.                     break;
  73.                 case RD_KAFKA_RESP_ERR__TIMED_OUT:
  74.                     echo "超时\n";
  75.                     break;
  76.                 default:
  77.                     throw new \Exception($message->errstr(), $message->err);
  78.                     break;
  79.             }
  80.         }
  81.     }
  82.     //获取元数据(包括主题(topics)、分区(partitions)、副本(replicas)和ISR(In-Sync Replicas)等信息)
  83.     public function gettest(){
  84.         //初始化
  85.         $conf=  new RdKafka\Conf();
  86.         //设置分区
  87.         $conf->set('metadata.broker.list', $this->borker_list);
  88.         //初始化生产者
  89.         $producer = new RdKafka\Producer($conf);
  90.         $topic = $producer->newTopic($this->topics);
  91. //        $result = $producer->getMetadata(false, $topic, 10*1000);
  92.         $result = $producer->getOutQLen();
  93.         var_dump($result);die;
  94.     }
  95.     //获取元数据
  96.     public function metadata(){
  97.         $conf = new RdKafka\Conf();
  98.         $conf->setDrMsgCb(function ($kafka, $message) {
  99.             file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);
  100.         });
  101.         $conf->setErrorCb(function ($kafka, $err, $reason) {
  102.             printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
  103.         });
  104.         $conf->set('group.id', 'myConsumerGroup');
  105.         $rk = new RdKafka\Consumer($conf);
  106.         $rk->addBrokers("127.0.0.1");
  107.         $allInfo = $rk->getMetadata(true, NULL, 60e3);
  108.         $topics = $allInfo->getTopics();
  109.         //循环输出
  110.         foreach ($topics as $topic) {
  111.             $topicName = $topic->getTopic();
  112.             if ($topicName == "__consumer_offsets") {
  113.                 continue ;
  114.             }
  115.             $partitions = $topic->getPartitions();
  116.             foreach ($partitions as $partition) {
  117.                 $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId());
  118.                 echo  "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";
  119.                 echo  "offset:" . ($topPartition->getOffset()) . PHP_EOL;
  120.             }
  121.         }
  122.     }
  123. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4