消息队列Kafka简朴使用(可以直接上手)

打印 上一主题 下一主题

主题 553|帖子 553|积分 1659

1.消息中心件简介

消息中心件(Message Middleware)是一种在分布式系统中用于解耦差异服务或组件的软件,它通过异步消息传递的方式来实现服务之间的通信。消息中心件答应系统组件之间通过发送和吸收消息举行交互,而无需知道彼此的详细实现细节,从而提高了系统的可扩展性、灵活性和可靠性。
关键特性:

  • 异步通信:消息发送者不必要等待吸收者的即时响应,可以继续执行其他任务;
  • 解耦:消息中心件降低了服务之间的耦合度,使得各个服务可以独立开发和摆设;
  • 可靠传输:消息中心件通常提供消息长期化功能,确保消息不会因系统故障而丢失;
  • 负载均衡:消息中心件可以均衡差异服务之间的负载,避免单个服务的过载;
  • 消息排序:包管消息按照特定的次序举行处理;
  • 事务管理:支持事务性消息,确保消息处理的原子性、一致性、隔离性和长期性(ACID特性)。
常见使用场景:

  • 应用解耦:在差异的服务或应用之间传递消息,降低它们之间的直接依赖;
  • 事故驱动架构:在事故驱动的系统中,消息中心件作为事故总线,传递事故消息;
  • 分布式系统:在分布式情况中,消息中心件用于服务间的通信;
  • 大数据处理:在数据分析和处理系统中,消息中心件用于网络和分发大量数据;
  • 微服务架构:微服务之间通过消息中心件举行通信,实现服务的独立性和动态扩展。
常见消息中心件:

  • Apache Kafka:适用于高吞吐量、可扩展的分布式消息系统;
  • RabbitMQ:基于AMQP协议的开源消息代理软件,适用于复杂的消息路由;
  • ActiveMQ:也是一个基于JMS的开源消息代理,支持多种跨语言的通讯协议;
  • Amazon SQS:亚马逊提供的简朴队列服务,适用于云情况中的消息队列;
  • Apache Pulsar:是一个用于服务器到服务器的消息传递系统,具有高吞吐量、低耽误的特点。
消息模型:

  • 点对点(Point-to-Point):消息从一个发送者发送到一个吸收者;
  • 发布/订阅(Publish/Subscribe):消息从一个发送者发送到多个吸收者;
  • 请求/回复(Request/Reply):发送者发送消息并等待吸收者的响应
2.Kafka介绍

Kafka是一个由LinkedIn公司开发的分布式流处理平台,它具有高吞吐量、可扩展性强、可长期化、可容错等特点,被广泛用于构建实时的数据管道和流式应用程序。Kafka的消息模型主要基于发布/订阅(Publish/Subscribe)模式,但也包含了点对点(Point-to-Point)模型的某些特性。
发布/订阅模型:

  • 主题(Topics):在Kafka中,消息被发布到称为“主题”的种别中。主题雷同于消息队列,但它可以拥有多个订阅者;
  • 生产者(Producers):生产者是发布消息到Kafka主题的实体。生产者可以选择将消息发送到主题内的任何一个分区;
  • 消费者(Consumers):消费者是订阅主题并读取消息的实体。在Kafka中,消费者可以是属于消费者组的一部分;
  • 消费者组(Consumer Groups):消费者组是一组消费者的聚集,它们共同消费一个主题的消息,但每个消费者只能消费到消息的一个子集。Kafka确保每个分区只由消费者组中的一个消费者来消费,以避免重复消费;
  • 代理(Broker):Kafka集群中的服务器,存储数据并处理客户端的请求;
  • ZooKeeper:Kafka使用ZooKeeper来协调broker,并保持集群配置的一致性。
架构:

分区(Partitions):

  • 消息分区:一个主题可以有一个或多个分区,每个分区是一个有序且不可变的消息序列。分区内的每条消息都会被分配一个次序的、不可变的ID,称为偏移量(Offset)。
  • 分区副本:为了提高可用性,Kafka答应为每个分区创建多个副本,这些副本分布在差异的broker上。副本分为向导者副本(Leader)和跟随者副本(Follower),生产者和消费者只与向导者副本交互,跟随着副本只是被动跟随。
发布/订阅消息模型的特点:

  • 长期化:Kafka将消息长期化到磁盘,纵然在系统故障的情况下也不会丢失消息;
  • 高吞吐量:通太过区和并行处理,Kafka能够处理大量的消息;
  • 可扩展性:可以轻松地向Kafka集群添加更多的broker,无需停机,以增加处理能力;
  • 容错性:通过副本机制,Kafka能够在broker失败的情况下继续工作;
  • 有序性:在单个分区内部,消息是有序的,消费者按照次序读取消息。在分区内部,消息是有序的,在差异的分区之间,消息的次序是不包管的,假如必要全局有序,则只能使用单个分区。
3.Kafka安装

百度网盘链接:kafka_2.13-3.7.1.tgz
Windows安装

条件条件: Java情况(JDK 1.8或更高版本)

  • 下载Kafka: 从Kafka官网下载下载Windows版本的Kafka

  • 解压Kafka安装包: 将下载的Kafka压缩包解压到一个目次下,调解一下目次,把kafka_2.13-3.7.1改成kafka,其中bin文件夹里还有个windows文件夹,是Windows使用的脚本文件

  • 启动Zookeeper服务: Kafka自己包含了Zookeeper
    启动Zookeeper之前先配置一下,在config文件夹下zookeeper.properties文件,主要修改下面两项
    1. # 数据存储
    2. dataDir=E:/projects/Kafka/kafka/zookeeper-data
    3. # 日志文件
    4. dataLogDir=E:/projects/Kafka/kafka/zookeeper-logs
    复制代码
    命令行进入/bin/windows目次下,启动,没有报错停止就是启动成功了
    1. zookeeper-server-start.bat ../../config/zookeeper.properties
    复制代码
  • 启动Kafka服务
    启动Kafka之前也要配置一下,在config文件夹下server.properties文件
    1. # 集群模式下,每台Kafka服务器需要一个唯一的broker.id值
    2. broker.id=0
    3. # 日志文件
    4. log.dirs=E:/projects/Kafka/kafka/kafka-logs
    5. # zookeeper连接
    6. zookeeper.connect=localhost:2181
    7. # 端口(不用改)
    8. listeners=PLAINTEXT://:9092
    复制代码
    另起命令行窗口,进入/bin/windows目次下,然后运行以下命令,没有报错停止就是启动成功了
    1. kafka-server-start.bat ../../config/server.properties
    复制代码
Linux(CentOS7)安装

条件条件: Java情况(JDK 1.8或更高版本)

  • 下载Kafka: 从Kafka官网下载下载,右键复制链接

  • 通过wget下载
    1. wget https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz
    复制代码
  • 解压Kafka安装包
    1. tar -xzf kafka_2.13-3.7.1.tgz -C /usr/local
    2. mv /usr/local/Kafka/kafka_2.13-3.7.1 /usr/local/kafka
    复制代码
  • 启动Zookeeper服务: Kafka自己包含了Zookeeper
    启动Zookeeper之前先配置一下,在config文件夹下zookeeper.properties文件,主要修改下面两项
    1. # vim编辑zookeeper配置文件
    2. vi /usr/local/kafka/config/zookeeper.properties
    3. # i进入编辑
    4. i
    5. # esc退出编辑
    6. esc
    7. # 保存退出
    8. :qw
    9. # 数据存储
    10. dataDir=/usr/local/kafka/zookeeper-data
    11. # 日志文件
    12. dataLogDir=/usr/local/kafka/zookeeper-logs
    复制代码
    进入/bin目次下,启动,没有报错停止就是启动成功了
    1. cd /usr/local/kafka/bin
    2. zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties
    复制代码
  • 启动Kafka服务
    启动Kafka之前也要配置一下,在config文件夹下server.properties文件,
    1. # 集群模式下,每台Kafka服务器需要一个唯一的broker.id值
    2. broker.id=0
    3. # 日志文件
    4. log.dirs=/usr/local/kafka/kafka-logs
    5. # zookeeper连接
    6. zookeeper.connect=localhost:2181
    7. # 端口(不用改)
    8. listeners=PLAINTEXT://:9092
    复制代码
    另起命令行窗口,进入/bin目次下,然后运行以下命令,没有报错停止就是启动成功了
    1. cd /usr/local/kafka/bin
    2. kafka-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties
    复制代码
Docker安装

条件条件: Java情况(JDK 1.8或更高版本),确保安装了Docker

  • 拉取ZooKeeper镜像(Kafka依赖ZooKeeper):
    1. docker pull wurstmeister/zookeeper
    复制代码
  • 拉取Kafka镜像: 使用以下命令拉取最新的Kafka镜像:
    1. docker pull wurstmeister/kafka
    复制代码
  • 启动ZooKeeper容器: 容器启动起来就成功了
    1. docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
    复制代码
  • 启动Kafka容器:容器启动起来就成功了
    1. docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
    复制代码

    • --name kafka: 设置容器的名字为“kafka”。
    • -p 9092:9092: 将容器的9092端口映射到宿主机的9092端口。
    • --link zookeeper:zookeeper: 毗连到名为“zookeeper”的另一个Docker容器,并且在当前的容器中可以通过zookeeper这个别名来访问它。
    • --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181: 设置情况变量,指定ZooKeeper的毗连字符串。
    • --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092: 设置情况变量,指定Kafka的advertised listeners。
    • --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092: 设置情况变量,指定Kafka的listeners。
    • --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1: 设置情况变量,指定offsets topic的副本因子。
    • wurstmeister/kafka: 使用的Docker镜像名字。

4.可视化工具kafka-eagle的安装

百度网盘链接:kafka-eagle-bin-3.0.1.tar.gz
Windows安装

大部分借鉴大佬的安装,做个记录方便以后查看,原文地址
条件条件


  • Kafka已经安装并运行
  • Java情况(JDK 1.8或更高版本)
  • MySQL数据库(用于存储Kafka-Eagle的元数据)

  • 下载Kafka-Eagle
    Kafka-Eagle官网
    Github地址
  • 解压Kafka-Eagle安装包: 将下载的Kafka-Eagle压缩包解压到一个目次下,可以修改一下目次


  • 配置两个情况变量: JDK(JAVA_HOME)和Kafka-Eagle(KE_HOME),以Kafka-Eagle为例:


  • 创建数据库(MySQL)ke,启动项目会自动创建表
  • 为了能监控Kafka数据,Kafka必要开启JMX,对外袒露更多数据,方便某些监控之类的插件来使用,修改\kafka\bin\windows目次下的kafka-server-start.bat脚本,然后重启Kafka
    1. # 在35行下面另起一行添加如下
    2. set JMX_PORT=9999
    复制代码
  • 配置Kafka-Eagle: 编辑E:\projects\Kafka\kafka-eagle\efak-web-3.0.1\conf\system-config.properties文件,配置Kafka和数据库信息:
    1. # 配置Zookeeper(5行附近)
    2. efak.zk.cluster.alias=cluster1
    3. cluster1.zk.list=localhost:2181
    4. #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
    5. # 配置JXM地址(59行附近)
    6. cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi
    7. # 配置MySQL数据库(124行附近)
    8. efak.driver=com.mysql.cj.jdbc.Driver
    9. efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    10. efak.username=root
    11. efak.password=123456
    12. # 配置sqlite数据库(116行附近),和MySQL二选一
    13. efak.driver=org.sqlite.JDBC
    14. efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
    15. efak.username=root
    16. efak.password=www.kafka-eagle.org
    复制代码
  • 进入bin目次,运行以下命令
    1. ke.bat start
    复制代码
  • 访问Kafka-Eagle Web界面: 打开浏览器,访问http://localhost:8048/,使用默认用户名admin和密码123456登录。

在安装过程中出现三个问题

  • Tomcat日志输出乱码
    办理:在efak-web-3.0.1\kms\conf目次下修改logging.properties配置文件,将Tomcat日志输出编码改为GBK
    1. # 大概在51行,将UTF-8改为GBK
    2. java.util.logging.ConsoleHandler.encoding = GBK
    复制代码
  • MySQL时区非常
    办理:用root用户设置时区
    1. set global time_zone='+8:00';
    复制代码
  • 表缺失,下载建表sql脚本,百度网盘链接:kafka-eagle-createTable.sql
Linux(CentOS7)安装

条件条件


  • Kafka已经安装并运行
  • Java情况(JDK 1.8或更高版本)
  • MySQL数据库(用于存储Kafka-Eagle的元数据)

  • 下载Kafka-Eagle: 找到对应下载地址,右键复制链接,wget下载
    Kafka-Eagle官网
    Github地址
    1. wget https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz
    复制代码
  • 解压Kafka-Eagle安装包
    1. # 解压、移动目录等自己看着改,简洁一些就行
    2. tar -zxvf kafka-eagle-bin-3.0.1.tar.gz -d /usr/local
    3. mv /usr/local/kafka-eagle-bin-3.0.1 /usr/local/kafka-eagle
    4. cd /usr/local/kafka-eagle
    5. tar -zxvf efak-web-3.0.1-bin.tar.gz
    复制代码
  • 设置情况变量,在/etc/profile文件里
    1. # 使用vim编辑文件
    2. vi /etc/profile
    复制代码
    设置情况
    1. # 设置值,根据自己文件位置设置
    2. export JAVA_HOME=/usr/local/jdk/java-1.8.0-openjdk-1.8.0.161-3.b14.el6_9.x86_64
    3. export JRE_HOME=${JAVA_HOME}/jre
    4. export KE_HOME=/usr/local/kafka-eagle/efak-web-3.0.1-bin
    5. export CLASSPATH=$CLASSPATH:.:${JAVA_HOME}/lib:${JAVA_HOME}/jre/lib
    6. export PATH=${KE_HOME}/bin:${JAVA_HOME}/bin:${JAVA_HOME}/jre/bin:$PATH
    复制代码
    使其收效
    1. # 退出,执行使其生效
    2. source /etc/profile
    复制代码
  • 创建数据库(MySQL)ke,启动项目会自动创建表
  • 配置Kafka-Eagle: 编辑\usr\local\kafka-eagle\efak-web-3.0.1\conf\system-config.properties文件,配置Kafka和数据库信息:
    1. # 配置Zookeeper(5行附近)
    2. efak.zk.cluster.alias=cluster1
    3. cluster1.zk.list=localhost:2181
    4. #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
    5. # 配置JXM地址(59行附近)
    6. cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi
    7. # 配置MySQL数据库(124行附近)
    8. efak.driver=com.mysql.cj.jdbc.Driver
    9. efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    10. efak.username=root
    11. efak.password=123456
    12. # 配置sqlite数据库(116行附近),和MySQL二选一
    13. # efak.driver=org.sqlite.JDBC
    14. # efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
    15. # efak.username=root
    16. # efak.password=www.kafka-eagle.org
    复制代码
  • 运行以下命令
    1. cd /usr/local/kafka-eagle/efak-web-3.0.1
    2. ./bin/ke.sh start
    复制代码
  • 访问Kafka-Eagle Web界面: 打开浏览器,访问http://xxxip:8048/,使用默认用户名admin和密码123456登录。
Docker安装


  • 拉取Kafka-Eagle镜像: 使用以下命令拉取Kafka-Eagle镜像:
    1. docker pull soulstone/kafka-eagle
    复制代码
  • 启动Kafka-Eagle容器
    1. docker run -d --name kafka-eagle -p 8048:8048 \
    2. -e ZK_HOSTS="zookeeper:2181" \
    3. -e KAFKA_EAGLE_URL="jdbc:mysql://localhost:3306/ke" \
    4. -e KAFKA_EAGLE_USER="root" \
    5. -e KAFKA_EAGLE_PASSWORD="123456" soulstone/kafka-eagle
    复制代码
    请确保将MySQL的URL、用户名和密码更换为你自己的数据库信息。
  • 访问Kafka-Eagle Web界面: 打开浏览器,访问http://localhost:8048/,使用默认用户名admin和密码123456登录。
5.Kafka消息队列的使用

SpringBoot整合Demo


  • 创建两个Maven项目: 一个是生产者KafkaProducerDemo,用于发布消息,另一个是消费者KafkaConsumerDemo,用于接受消息,两项目目次布局如下:


  • 引入依赖: 两个项目都在pom.xml中添加Spring Kafka的依赖,整个pom.xml如下
    1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    2.     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    3.     <modelVersion>4.0.0</modelVersion>
    4.     <groupId>com.thkl</groupId>
    5.     <artifactId>KafkaDemo</artifactId>
    6.     <packaging>jar</packaging>
    7.     <version>1.0-SNAPSHOT</version>
    8.     <parent>
    9.     <groupId>org.springframework.boot</groupId>
    10.     <artifactId>spring-boot-starter-parent</artifactId>
    11.     <version>2.3.5.RELEASE</version>
    12.     </parent>
    13.     <dependencies>
    14.     <dependency>
    15.       <groupId>org.springframework.boot</groupId>
    16.       <artifactId>spring-boot-starter-web</artifactId>
    17.       <version>2.3.5.RELEASE</version>
    18.     </dependency>
    19.     <dependency>
    20.       <groupId>org.springframework.kafka</groupId>
    21.       <artifactId>spring-kafka</artifactId>
    22.       <version>2.8.3</version>
    23.     </dependency>
    24.     <dependency>
    25.       <groupId>org.projectlombok</groupId>
    26.       <artifactId>lombok</artifactId>
    27.       <version>1.18.10</version>
    28.     </dependency>
    29.     </dependencies>
    30.     <build>
    31.     </build>
    32. </project>
    复制代码
  • 配置Kafka: 两个项目都在application.yml中配置Kafka的属性,留意端口号别一样
    1. server:
    2.   # 端口号
    3.   port: 8080
    4. spring:
    5.   kafka:
    6.     # Kafka服务器的地址和端口
    7.     bootstrap-servers: localhost:9092
    8.     consumer:
    9.       # 消费者组的ID
    10.       group-id: thkl-group
    复制代码
  • 在KafkaProducerDemo中创建Kafka生产者:
    1. @Service
    2. // 可以通过lombok的@AllArgsConstructor注解自动用构造函数注入Beans
    3. @AllArgsConstructor
    4. public class KafkaProducerService {
    5. private KafkaTemplate kafkaTemplate;
    6. public void sendMessage(String message) {
    7.      kafkaTemplate.send("thkl-topic", message);
    8. }
    9. }
    复制代码
    1. @RestController
    2. @AllArgsConstructor
    3. public class KafkaProducerController {
    4.     private KafkaProducerService kafkaProducerService;
    5.     // 发送消息接口
    6.     @GetMapping("/send")
    7.     public String sendMessage(String message) {
    8.         kafkaProducerService.sendMessage(message);
    9.         return "Message sent successfully";
    10.     }
    11. }
    复制代码
  • 在KafkaConsumerDemo中创建Kafka消费者:
    1. @Service
    2. public class KafkaConsumerService {
    3.     // 使用@KafkaListener注解来创建Kafka消费者
    4.     @KafkaListener(topics = "thkl-topic", groupId = "thkl-group")
    5.     public void receiveMessage(String message) {
    6.         System.out.println("Received message: " + message);
    7.     }
    8. }
    复制代码
  • 启动类: 两项目都添加
    1. @SpringBootApplication
    2. public class KafkaApplication {
    3.     public static void main(String[] args) {
    4.         SpringApplication.run(KafkaApplication.class, args);
    5.     }
    6. }
    复制代码
  • 启动两个项目,端口号不可以一样,否则启动不起来
    在KafkaProducerDemo发布消息

    就可以在KafkaConsumerDemo吸收到消息了

KafkaTemplate 主要API

KafkaTemplate是Spring Framework提供的一个用于简化Kafka消息发送的抽象类。它提供了一系列方法来发送消息到Kafka主题,包括同步发送、异步发送、发送消息和对象等。以下是一些主要的KafkaTemplateAPI方法:

  • 同步发送

    • send(String topic, Object message): 发送一个对象到指定主题。
    • send(String topic, K key, V value): 发送一个键值对到指定主题。

  • 异步发送

    • send(String topic, Object message, long timeout, TimeUnit unit): 异步发送一个对象到指定主题,并等待响应。
    • send(String topic, K key, V value, long timeout, TimeUnit unit): 异步发送一个键值对到指定主题,并等待响应。

  • 发送消息和对象

    • send(String topic, Object message, Map<String, Object> headers): 发送一个对象到指定主题,并附带消息头。
    • send(String topic, K key, V value, Map<String, Object> headers): 发送一个键值对到指定主题,并附带消息头。

  • 发送消息

    • send(String topic, Object message, String key): 发送一个对象到指定主题,并指定消息的键。
    • send(String topic, K key, V value): 发送一个键值对到指定主题。

  • 发送对象

    • send(String topic, Object message): 发送一个对象到指定主题。

  • 发送消息和对象(可选键)

    • send(String topic, Object message, String key, Map<String, Object> headers): 发送一个对象到指定主题,并指定消息的键和消息头。
    • send(String topic, K key, V value, Map<String, Object> headers): 发送一个键值对到指定主题,并附带消息头。

  • 发送消息(可选键)

    • send(String topic, Object message, String key): 发送一个对象到指定主题,并指定消息的键。
    • send(String topic, K key, V value): 发送一个键值对到指定主题。

  • 发送消息(可选键和消息头)

    • send(String topic, Object message, String key, Map<String, Object> headers): 发送一个对象到指定主题,并指定消息的键和消息头。
    • send(String topic, K key, V value, Map<String, Object> headers): 发送一个键值对到指定主题,并附带消息头。

  • 发送消息(消息头)

    • send(String topic, Object message, Map<String, Object> headers): 发送一个对象到指定主题,并附带消息头。
    • send(String topic, K key, V value, Map<String, Object> headers): 发送一个键值对到指定主题,并附带消息头。

  • 发送消息(键)

    • send(String topic, Object message, String key): 发送一个对象到指定主题,并指定消息的键。
    • send(String topic, K key, V value): 发送一个键值对到指定主题。

  • 发送消息(键和消息头)

    • send(String topic, Object message, String key, Map<String, Object> headers): 发送一个对象到指定主题,并指定消息的键和消息头。
    • send(String topic, K key, V value, Map<String, Object> headers): 发送一个键值对到指定主题,并附带消息头。

注: 这些方法的详细实现大概会根据Kafka客户端版本和Spring Kafka版本有所差异。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

尚未崩坏

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表