Kafka一文入门

打印 上一主题 下一主题

主题 906|帖子 906|积分 2718

Kafka

消息队列的目标是为了什么?

消息队列真正的目标是通讯,屏蔽底层复杂的通讯协议,界说了一套应用层的更加简单的“生产者消费者”协议,从而达到了异步、解耦;

消息队列中间件之前的区别:

RabbitMQ:功能非常强
RocketMQ :性能堪比Kafka,功能也很多
Kafka:快
zeroMQ 、activeMQ等等
broker:消息的中转站,生产者把消息给broker就完成了任务,broker把消息主动推送给消费者(或者消费者主动轮询)


为什么kafka须要zookeeper?

Todo:Kafka学习条记(二):Zookeeper 在 Kafka 中的作用_kafka中zk的作用-CSDN博客
Kafka基本概念

Broker
消息中间件处理节点,一个Kafka节点就是一个broker
Topic
kafka根据消息的topic进行消息归类,发布到kafka的每一条消息都要有一个topic
Producer
消息生产者,向Broker发送消息的客户端
Consumer
消息消费者,消费Broker消息的客户端



消息的偏移量以顺序消费原理

消费者消费信息的两种方式:1.当前消息的偏移量+1开始消息 2.重新开始消费(--from-beginning)
kafka中的消息存储在日志文件中,日志中存储了消费偏移量以及主题中的消息


生产者将消息发送给broker,broker会将消息存储到日志文件中
  1. /usr/local/kafka/data/kafka-logs/主题-分区/00000000. log
复制代码
消息的生存是有序的,通过offset偏移量来生存消息的顺序;
消费者消费消息是也是根据消息的偏移量来获取当前须要的消息的位置
单播消息

如果多个消费者在同一个消费者组中,那么只有一个消费者可以接收到订阅的topic的一个分区的消息;换言之:每个分区的消息在一个消费者组中只被一个消费者消费。
  1. ./kafka-console-consumer.sh --bootstrap-server 192.168.88.130:9092--consumer-property group.id=testGroup --topic test
复制代码
多播消息

一个主题的消息可以被多个消费者组同时订阅和消费。多个消费者组订阅同一个主题时,主题的每个分区都会被该组中的一个消费者消费
  1. ./kafka-console-consumer.sh --bootstrap-server 192.168.88.130:9092 --consumer-property group.id=testGroup1 --topic test
  2. ./kafka-console-consumer.sh --bootstrap-server 192.168.88.130:9092 --Consumer-property group.id=testGroup2 --topic test
复制代码
消费者组信息查看

  1. ./kafka-consumer-groups.sh --bootstrap-server 192.168.88.130:9092 --list
复制代码
  1. ./kafka-consumer-groups.sh --bootstrap-server 192.168.88.130:9092 --describe --group GroupName
复制代码
可以查看到
group
topic
partion
分区
Current_Offset
消费者组当前消费topic的偏移量
Log_end_Offset
topic组中的末了一条消息偏移量
LAG
(积压消息量)未消费数量
Consumer-id
host
主题与分区

topic
概念:在kafka中是一个逻辑概念,用来分别消息种类,不同topic的消息,会被订阅该topic的消费者消费
标题:topic中的消息非常多,由于消息是存在日志文件中的,占用空间比力大,为相识决这个文件过大的标题,kafka提出了分区
分区
概念:一个主题的消息量是非常大的,因此可以通过分区设置,分布式存储消息;


优点: 1.分区存储解决了topic日志文件过大的标题
2.可以并行读、并行写topic,提高了消息读写的吞吐量
  1. ./kafka-topics.sh --create --zookeeper 192.168.88.130:2181 --partitions 2 --topic test1
复制代码
分区下可以根据稀疏索引快速界说到消息的具体位置
日志下各个文件存储的内容(重点)

test1-0:存储主题test1,分区0的消息,文件夹内里包括了稀疏索引00000.index文件,和000000.log消息文件
_consumer_offset-49:kafka内部自己创建了_consumer_offset主题包含50个分区。这个主题用来存放消费者消费某个主题的偏移量;消费者会定期将自己分区的偏移量offset提交给kafka内部的_consumer_offset主题中key是consumergroup+topic+分区号,value就是当前分区的offset,kafka会定期清除_consumer_offset中的消息,只保留最新的那条数据。
为什么要有50个_consumer_offset分区:为了抗高并发,通过consumegroupId的hash值在取模操作,得到消费者组应该提交的分区。
集群

创建主题时间除了可以指定分区,还可以指定备份
副本:副本是为主题中的分区设置多个备份,多个副本在kafka集群中的多个分区中,只有一个是leader,其他都是follower。
生产者 消费者处理消息都是在leader副本上,其他副本起的作用就是做数据备份
  1. ./kafka-topics.sh --create --zookeeper 192.168.88.130:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
复制代码




leader:生产者 消费者处理消息都是在leader副本上,leader把数据同步到其他副本。leader挂了,经过主从选举,从多个follower中选取新的leader
follower:接收leader同步过来的数据
isr:一个集合,用来存储可以同步和已经同步的节点,当leader挂了,会从isr中选举出新的leader;当某个节点性能太差,副本内容同步差的太多,就会从isr中剔除
关于集群消费-消费者组与分区(重点)



一个分区只能被一个消费者组的此中一个消费者消费,从而保证主题分区内消费消息的顺序性和一致性
一个消费者组可以订阅多个主题,一个主题也可以被多个消费者组订阅;
一个主题可以分别为多个物理分区,即 一个主题下的消息被存放在多个分区中,由于一个分区在一个消费者组中的只能被一个消费者消费,以是消费者组的数量一样平常不超过订阅的主题的分区数,消费者组内的每个消费者都会被分配一部分主题分区进行消费。
引:怎样保证不在同一个分区的消息的顺序消费

生产者

发送消息的同步与异步

同步发送:生产者乡broker发送消息后阻塞等待接收broker的ack,收到ack后继续发送消息
  1. void sendSyncMessage() throws ExecutionException, InterruptedException {
  2.         try {
  3.             //不指定分区的化,默认为(key.hashCode)% partitionNum
  4.             ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "hello", "hello world");
  5.             RecordMetadata metadata = (RecordMetadata) producer.send(record).get();
  6.             System.out.println("Topic:" + metadata.topic() + ", Partition:" + metadata.partition() + ", Offset:" + metadata.offset());
  7.         } catch (Exception e) {
  8.             e.printStackTrace();
  9.             //发送失败的处理
  10.         }
  11.     }
  12. 这个get方法就相当于是同步阻塞,阻塞获取发送消息的结果
复制代码
异步发送:生产者向broker发送消息后,无需等待ack,接着发送下一条消息
  1.         private static final CountDownLatch countDownLatch = new CountDownLatch(5);
  2.         @Test
  3.     void sendAsyncMessage() throws InterruptedException {
  4.         ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "hello", "hello world");
  5.         for (int i = 0; i < 5; i++) {
  6.             producer.send(record, (metadata, exception) -> {
  7.                 if (exception == null) {
  8.                     if (metadata != null)
  9.                         System.out.println("Topic:" + metadata.topic() + ", Partition:" + metadata.partition() + ", Offset:" + metadata.offset());
  10.                 } else {
  11.                     exception.printStackTrace();
  12.                 }
  13.                 countDownLatch.countDown();
  14.             });
  15.         }
  16.         countDownLatch.await();
  17.     }
复制代码
ack参数

Kafka producer有三种ack机制 初始化producer时在config中进行配置;
  1. Properties prop = new Properties();
  2. prop.put(ProducerConfig.ACK_CONFIG = '2')
复制代码
ack = 0:意味着producer不等待broker同步完成的确认,继续发送下一条(批)信息
提供了最低的延伸。但是最弱的恒久性,当服务器发生故障时,就很大概发生数据丢失。比方leader已经殒命,producer不知情,还会继续发送消息broker接收不到数据就会数据丢失。
ack = 1(默认):意味着producer要等待leader成功收到数据并得到确认,才发送下一条message。此选项提供了较好的恒久性较低的延伸性。
Partition的Leader殒命,follwer尚未复制,数据就会丢失。
ack = -1:意味着producer得到x-1台follwer确认,才发送下一条数据
(有个默认的配置min.insync.replicas = x(默认为1,laeder接收就返回),x应该大于等于2)
发送失败会重试

消息重试机制能够保证消息发送的可靠性,但是大概会造成消息的重复发送,好比在网络抖动的情况下,因此在接收端broker要做好幂等性处理
  1. Properties prop = new Properties();
  2. props.put(ProducerConfig.RETRIES_CONFIG,3);
  3. props.put(ProducerConfig.RETRY_BACKOFF_NS_CONFIG,300);
复制代码
消息发送的缓冲区

kafka默认会创建一个32MB的缓冲区,用来存放要发送的消息,生产者会有一个本地线程从缓冲区中获取16KB的消息发送到broker,如果线程拉不到16KB的数据,间隔10ms也会发送数据到broker
  1.         prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
复制代码
  1.     prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
复制代码
  1.     prop.put(ProducerConfig.LINGER_MS_CONFIG, 10);
复制代码
消费者

java客户端创建消费者

  1.             Properties prop = new Properties();
  2.         prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.88.130:9092,192.168.88.130:9093,192.168.88.130:9094");
  3.         prop.put(ConsumerConfig.GROUP_ID_CONFIG, "groupTest");
  4.         prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  5.         prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  6.         new KafkaConsumer<>(prop);
复制代码
消费者的主动提交和手动提交


主动提交和手动提交,提交是向Consumer_offset-n中消费者消费分区内信息的偏移量->key是consumergroupID+topic+分区号,value就是当前分区的offset
主动提交:消费者从broker中poll到消息之后,马上提交偏移量
  1.     //是否开启自动提交,默认为true
  2.     prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  3.     prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
复制代码
缺点:会丢消息,消费者poll下消息后,没有进行消费就挂了
手动提交:消费者poll下消息之后,对消息进行消费,消费完之后再提交偏移量
  1.         prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
复制代码
同步手动提交
  1. @Test
  2. void SyncCommit() throws InterruptedException {
  3. consumer.subscribe(Collections.singleton(TOPIC_NAME));
  4. while (true) {
  5.     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  6.     for (ConsumerRecord<String, String> record : records) {
  7.         System.out.println("partition: " + record.partition() +
  8.                            " offset: " + record.offset() +
  9.                            " value: " + record.value());
  10.     }
  11.     //手动提交偏移量,同步提交,当consumer提交完成,收到broker的ack才能停止阻塞
  12.     if (records.count() > 0) {
  13.         consumer.commitSync();
  14.         System.out.println("同步提交完成");
  15.     }
  16. }
  17. }
复制代码

异步手动提交:
  1. @Test
  2. void AsyncCommit() throws InterruptedException {
  3. consumer.subscribe(Collections.singleton(TOPIC_NAME));
  4. while (true) {
  5.     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  6.     for (ConsumerRecord<String, String> record : records) {
  7.         System.out.println("partition: " + record.partition() +
  8.                            " offset: " + record.offset() +
  9.                            " value: " + record.value());
  10.     }
  11.     //手动提交偏移量,异步提交,当consumer提交完成,不用阻塞
  12.     if (records.count() > 0) {
  13.         consumer.commitAsync(new OffsetCommitCallback() {
  14.             @Override
  15.             public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
  16.                 if (e != null) {
  17.                     System.err.println("commit offset failed: " + map);
  18.                     System.err.println("commit offset exception:" + e.getStackTrace());
  19.                 }else{
  20.                     System.out.println("异步手动提交");
  21.                 }
  22.             }
  23.         });
  24.     }
  25. }
  26. }
复制代码
消费者poll消息的过程

配置:
  1.     prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
复制代码
poll消息达到500条之后开始处理消息;
  1.         /*
  2.         * poll() 是长轮询拉消息
  3.         * */
  4.         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  5.         for (ConsumerRecord<String, String> record : records) {
  6.             System.out.println("partition: " + record.partition() +
  7.                     " offset: " + record.offset() +
  8.                     " value: " + record.value());
  9.         }
复制代码
Duration.okMillis(1000),poll是一个阻塞调用,意味着它会等待直到有消息数据量达到目标量或者超时发生
如果拉去消息的条数没有达到配置的拉去条数,就会阻塞等待获取消息,当拉取时间达到这个参数时间的时间,即使没有拉取到目标条数的信息,消费者也会开始消费消息
如果拉取消息的数量达到配置数,就不用等待这个参数时间直接开始处理

如果两次poll的间隔超过30s,集群会认为该消费者的消费本领过弱,该消费者被踢出消费组,触发rebalance机制重新分配该消费者所消费的分区,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少一点
  1.         //一次拉取消息的最大数量 (根据消息者消费能力进行设置)
  2.         prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
  3.         //两次poll之间的时间间隔超过了30s,kafka就会认为该消费者消费能力差,会将该消费者从消费者组剔除,kafka就会重新分配该消费者所消费的分区
  4.         //(根据消息者消费能力进行设置)
  5.         prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
复制代码

消费者的康健检查,心跳检测机制

消费者向broker发送心跳的间隔时间,kafka如果超过10s没有收到心跳,kafka就会认为该消费者挂掉了,会将该消费者从消费者组剔除,然后reBalance来重新分配分区的消费者
  1.         //消费者向broker发送心跳的间隔时间
  2.         prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
  3.         //kafka如果超过10s没有收到心跳,kafka就会认为该消费者挂掉了,会将该消费者从消费者组剔除
  4.         //kafka就会reBalance机制重新分配该消费者所消费的分区
  5.         //相当于是消费者的心跳超时时间
  6.         prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
复制代码

消费者消费操作

消费者通过 assign 方法显式地指派一组分区进行消费,而 seek 方法用于将指定分区的偏移量移动到特定的位置。
  1. consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
复制代码
  1. consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
  2. consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
复制代码
  1. consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
  2. consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
复制代码
指定时间进行消费,会先获取主题的所有分区,然后遍历每个分区,根据时间获取分区内偏移量,然后poll消息
  1.         List<PartitionInfo> partitions = consumer.partitionsFor(TOPIC_NAME);
  2.         long timestamp = new Date().getTime() - 1000 * 60 * 60 * 24;
  3.         HashMap<TopicPartition, Long> map = new HashMap<>();
  4.         for (PartitionInfo partitionInfo : partitions) {
  5.             map.put(new TopicPartition(TOPIC_NAME,partitionInfo.partition()), timestamp);
  6.         }
  7.         Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
  8.         for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
  9.             TopicPartition key = entry.getKey();
  10.             OffsetAndTimestamp value = entry.getValue();
  11.             if (key == null || value == null) continue;
  12.             //根据时间获取offset
  13.             Long offset = value.offset();
  14.             System.out.println("partition-" + key.partition() + "loffset-" + offset);
  15.             if (value != null) {
  16.                 //根据起始偏移量进行获取消息
  17.                 consumer.assign(Arrays.asList(key));
  18.                 consumer.seek(key, offset);
  19.             }
  20.             while (true) {
  21.                 /*
  22.                  * poll() 是长轮询拉消息
  23.                  * */
  24.                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  25.                 for (ConsumerRecord<String, String> record : records) {
  26.                     System.out.println("partition: " + record.partition() +
  27.                             " offset: " + record.offset() +
  28.                             " value: " + record.value());
  29.                 }
  30.                 //手动提交偏移量,同步提交,当consumer提交完成,收到broker的ack才能停止阻塞
  31.                 if (records.count() > 0) {
  32.                     consumer.commitSync();
  33.                     System.out.println("同步提交完成");
  34.                 }
  35.             }
  36.         }
复制代码
新消费组对主题进行消费的消费模式

新消费者组的消费模式
earliest:重新开始消费,之后只消费最新消息
latest:从最新的offset开始消费(默认)
  1. prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
  2. consumer = new KafkaConsumer<String, String>(prop);
复制代码
Springboot整合kafka

  1. <dependency>
  2.   <groupId>org.springframework.kafka</groupId>
  3.   <artifactId>spring-kafka</artifactId>
  4.   <version>2.4.3.RELEASE</version>
  5. </dependency>
复制代码
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: 192.168.88.130:9092,192.168.88.130:9092,192.168.88.1303:9092
  4.     producer:
  5.       # 发生错误后,消息重发的次数。
  6.       retries: 3
  7.       #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
  8.       batch-size: 16384
  9.       # 设置生产者内存缓冲区的大小。
  10.       buffer-memory: 33554432
  11.       # 键的序列化方式
  12.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  13.       # 值的序列化方式
  14.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  15.       # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
  16.       # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
  17.       # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
  18.       acks: 1
  19.     consumer:
  20.       # 设置kafka的消费者组
  21.       group-id: groupTest
  22.       # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
  23.       auto-commit-interval: 1S
  24.       # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
  25.       # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
  26.       # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
  27.       auto-offset-reset: earliest
  28.       # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
  29.       enable-auto-commit: false
  30.       # 键的反序列化方式
  31.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  32.       # 值的反序列化方式
  33.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  34.     listener:
  35.       #当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
  36.       #RECORD
  37.       #当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  38.       #BATCH
  39.       #当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
  40.       #TIME
  41.       #当每一批poll()的数据被消费者监听器〈(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
  42.       #COUNT
  43.       #TIME | COUNT有一个条件满足时提交
  44.       #COUNT_TINE
  45.       #当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,手动调用Acknowledgment .acknowledge()后提交
  46.       #NANUAL
  47.       #手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
  48.       #manual_immediate
  49.       # 在侦听器容器中运行的线程数。
  50.       concurrency: 5
  51.       #listner负责ack,每调用一次,就立即commit
  52.       ack-mode: manual_immediate
  53.       # 如果Broker上不存在至少一个配置的主题(topic),则容器是否无法启动,
  54.       # 该设置项结合Broker设置项allow.auto.create.topics=true,如果为false,则会自动创建不存在的topic
  55.       missing-topics-fatal: false
复制代码
消费者消费操作:
  1. @KafkaListener(groupId = "groupTest",topicPartitions = {
  2.     @TopicPartition(topic = "topic1",partitions = {"0","1"}),
  3.     @TopicPartition(topic = "topic2",
  4.                     partitions = {"0"},
  5.                     partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "100"))
  6. },concurrency = "2")// concurrency 消费者线程数
  7. public void listenTopicPartition(ConsumerRecord<String,String> record,Acknowledgment ack){
  8.     System.out.println("topic:"+record.topic()+" partition:"+record.partition()+" offset:"+record.offset()+" value:"+record.value());
  9.     ack.acknowledge();
  10. }
复制代码
kafka集群中的controller、Rebalance、HW

Controller

Controller:kafka集群在启动的时间,每个节点都会在zookeeper中创建一个暂时序号节点,节点序号最小的broker将会成为集群中的controller;controller负责管理整个集群的所有分区和副本的状态:
1.当某个分区的leader副本broker挂掉后,controller会选举新的副本作为leader;
ISR:可以和leader同步以及已经同步的节点的集合,存放顺序是根据同步的偏移量跟leader越靠近,排的越靠前,controller从isr中选出靠前的节点作为leader(同步量最高的节点会被作为leader)
2.检测到某个分区的isr厘革之后(集群中有broker增加或减少节点),会关照其他broker更新其元数据信息
3.当集群有新的分区增加或减少,controller会关照其他节点
rebalance

条件:消费者没有指定分区消费,如果消费者指定了分区消费,那其他broker挂掉之后,就不会讲分区rebalance给已经制定了分区的节点
触发时机:


  • 当消费者两次消费间隔超过30s,会被kafka认为消费者消费本领弱,会将这个消费者从消费者组剔除,然后rebalance之前的分区给其他节点负责
  • 当消费者下线,之前负责的分区也会rebalance给其他消费者
在触发rebalance之前,分区分配给消费者的策略是:




    • range:根据公式盘算得到每个消费消费哪几个分区︰第一个消费者是分区总数/消费者数量+1,之后的消费者是分区总数/消费者数量
    • 轮询:大家轮着来
    • sticky:粘合策略,如果须要rebalance,会在之前已分配的基础上调解,不会改变之前的分配情况。如果这个策略没有开,那么就要进行全部的重新分配。发起开启。

HW&LOE原理

HW是已完成同步的位置。消息在写入broker时,且每个broker完成这条消息的同步后, hw才会厘革。在这之前消费者是消费不到这条消息的。在同步完成之后,HW更新之后,消费者才能消费到这条消息,这样的目标是防止消息的丢失。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

干翻全岛蛙蛙

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表