Kafka服务器的简朴部署以及消息的生产、消费、监控

打印 上一主题 下一主题

主题 836|帖子 836|积分 2508

目次
1. 在服务器上安装Kafka
1.1 直接安装
1.2 使用镜像方式配置到服务器
1. 准备Kafka镜像
(1) 长途拉取Kafka镜像
(2) 在本地下载镜像并上传至服务器启动

2. 创建配置目次
1. 3 编写Docker Compose文件
1. 4 启动Kafka服务
(2) 测试Kafka服务
2. 在项目中进行调用
2.1 消息的生产:
(1)首先安装 Confluent.Kafka 库(NuGet 包)
(2)向Kafka所在服务器生产消息:
2.2 消息的消费:
3. 自定义Kafka监控


1. 在服务器上安装Kafka

在服务器上安装Kafka有2种方式:
1.1 直接安装

本次安装没有涉及到服务器直接安装,可参考Kafka (快速)安装部署_kafka安装-CSDN博客
1.2 使用镜像方式配置到服务器

1. 准备Kafka镜像


(1) 长途拉取Kafka镜像

官方Kafka镜像可以从Confluent的Docker镜像库获取。在长途服务器上执行以下命令:

  1. docker pull confluentinc/cp-kafka:latest
  2. docker pull confluentinc/cp-zookeeper:latest
复制代码
如果加速器无效,也可以通过离线下载和上传的方式办理。
(2) 在本地下载镜像并上传至服务器启动

1. 在网络畅通的本地呆板上拉取Kafka镜像和zookeeper镜像:
  1. docker pull confluentinc/cp-zookeeper:latest
复制代码

  1. docker pull confluentinc/cp-kafka:latest
复制代码
保存镜像到文件:
  1. docker save confluentinc/cp-zookeeper:latest -o cp-zookeeper.tar
复制代码

  1. docker save confluentinc/cp-kafka:latest -o cp-kafka.tar
复制代码

2. 上传到服务器
通过scp或其他工具将镜像文件上传到服务器:
  1. scp cp-kafka.tar user@server_ip:/path/to/destination
复制代码

3. 在服务器上加载镜像
在目的服务器上加载镜像:
  1. docker load -i /path/to/destination/cp-zookeeper.tar
复制代码

  1. docker load -i /path/to/destination/cp-kafka.tar
复制代码


2. 创建配置目次

创建一个目次用于存放Kafka的Docker Compose配置文件:
  1. mkdir kafka-docker cd kafka-docker
复制代码

1. 3 编写Docker Compose文件

创建docker-compose.yml文件,文件内容:
  1. version: '3.7'
  2. services:
  3.   zookeeper:
  4.     image: confluentinc/cp-zookeeper:latest
  5.     container_name: zookeeper
  6.     environment:
  7.       ZOOKEEPER_CLIENT_PORT: 2181
  8.       ZOOKEEPER_TICK_TIME: 2000
  9.     ports:
  10.       - "2181:2181"
  11.   kafka:
  12.     image: confluentinc/cp-kafka:latest
  13.     container_name: kafka
  14.     ports:
  15.       - "9092:9092"
  16.     environment:
  17.       KAFKA_BROKER_ID: 1
  18.       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  19.       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://<server_ip>:9092
  20.       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  21.     depends_on:
  22.       - zookeeper
复制代码
将<server_ip>替换为服务器的实际IP地址。
1. 4 启动Kafka服务

在kafka-docker目次下运行:
  1. docker-compose up -d
复制代码
(1) 验证服务状态
查看运行中的容器:
  1. docker ps
复制代码
确保zookeeper和kafka容器都已启动。
(2) 测试Kafka服务

进入Kafka容器:
  1. docker exec -it kafka bash
复制代码
在容器内创建一个主题(如果是容器方式部署,一样平常kafka-topics.sh命令会无效,因为如果你使用的是 Kafka 镜像(例如通过 Docker 运行 Kafka),那么 Kafka 的命令行工具(如 kafka-topics.sh)通常不会直接暴露在主机操作系统中。但是可以在 Docker 容器中执行命令在 Docker 环境中使用 Kafka 的命令行工具来查看 Topic 和其他操作,这里不做赘述):
  1. kafka-topics --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --repl
复制代码
至此,服务器上的Kafka服务就可以通过镜像容器的方式向外提供了,此时外部可以通过IP和Kafka的端标语向Kafka生产消息和消费消息。
2. 在项目中进行调用

2.1 消息的生产:

以C#语言为例:
(1)首先安装 Confluent.Kafka 库(NuGet 包

在命令行中,使用以下命令安装:
  1. dotnet add package Confluent.Kafka
复制代码
或者也可以通过 Visual Studio 的 NuGet 包管理器来安装。
(2)向Kafka所在服务器生产消息:

  1. [Description("Kafka测试")]
  2. [HttpPost]
  3. public void ScanDataUseKafka()
  4. {
  5.     // Kafka 服务器的地址(替换为您的服务器 IP 或主机名)
  6.     string bootstrapServers = "IP:port";
  7.     string topic = "test-topic";
  8.     // 配置 Kafka Producer
  9.     var config = new ProducerConfig
  10.     {
  11.         BootstrapServers = bootstrapServers
  12.     };
  13.     // 创建 Producer
  14.     using (var producer = new ProducerBuilder<Null, string>(config).Build())
  15.     {
  16.         try
  17.         {
  18.             string message = "Hello Kafka !";
  19.             // 发送消息
  20.             var result = producer.ProduceAsync(topic, new Message<Null, string> { Value = message }).GetAwaiter().GetResult();
  21.             Console.WriteLine($"消息发送成功: {result.TopicPartitionOffset}");
  22.             /*}*/
  23.         }
  24.         catch (ProduceException<Null, string> ex)
  25.         {
  26.             Console.WriteLine($"消息发送失败: {ex.Error.Reason}");
  27.         }
  28.     }
  29. }
复制代码
2.2 消息的消费:

  1. [Description("Kafka消费测试")]
  2. [HttpPost]
  3. public void KafkaConsumer()
  4. {
  5.     // Kafka 配置
  6.     var config = new ConsumerConfig
  7.     {
  8.         BootstrapServers = "IP:port",  // Kafka Broker 地址
  9.         GroupId = "test-consumer-group",      // 消费者组 ID
  10.         AutoOffsetReset = AutoOffsetReset.Earliest // 设置消息消费偏移量的起始点,Earliest 从最早消息开始消费,Latest 从最新的消息开始消费
  11.     };
  12.     // 创建 Kafka 消费者实例
  13.     using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
  14.     {
  15.         // 订阅指定的 Topic
  16.         consumer.Subscribe("test-topic");  // 这里替换为你需要消费的 Topic 名称
  17.         CancellationTokenSource cts = new CancellationTokenSource();
  18.         Console.CancelKeyPress += (_, e) =>
  19.         {
  20.             e.Cancel = true;
  21.             cts.Cancel(); // 当按下 Ctrl+C 时停止消费
  22.         };
  23.         try
  24.         {
  25.             while (!cts.Token.IsCancellationRequested)
  26.             {
  27.                 try
  28.                 {
  29.                     // 拉取消息
  30.                     var consumeResult = consumer.Consume(cts.Token);
  31.                     // 输出消息内容
  32.                     Console.WriteLine($"Received message: {consumeResult.Message.Value} from topic {consumeResult.Topic} partition {consumeResult.Partition} offset {consumeResult.Offset}");
  33.                 }
  34.                 catch (ConsumeException e)
  35.                 {
  36.                     Console.WriteLine($"Error occurred: {e.Error.Reason}");
  37.                 }
  38.             }
  39.         }
  40.         catch (OperationCanceledException)
  41.         {
  42.             // 捕获取消操作异常
  43.             Console.WriteLine("Consuming has been canceled.");
  44.         }
  45.         finally
  46.         {
  47.             // 确保消费者关闭
  48.             consumer.Close();
  49.         }
  50.     }
  51. }
复制代码
3. 自定义Kafka监控


目前已经有Kafka监控工具Kafka Manager、Kafka Tools等等,但也可以根据项目的使用要求和业务特征对Kafka数据的生产和消费进行监控,再复杂一点还可以自己开发独立的Kafka监控网页。
以下是通过C#代码监控KafkaTopic、节点、堆积量、偏移量等的简朴应用:
  1. [Description("Kafka消费监视")]
  2. [HttpPost]
  3. public void KafkaConsumeMonitorss()
  4. {
  5.     // Kafka 服务器的地址
  6.     string bootstrapServers = "IP:port";
  7.     // 配置 AdminClient(用于查询元数据)
  8.     var config = new AdminClientConfig
  9.     {
  10.         BootstrapServers = bootstrapServers
  11.     };
  12.     // 创建 AdminClient 实例
  13.     using (var adminClient = new AdminClientBuilder(config).Build())
  14.     {
  15.         try
  16.         {
  17.             // 获取 Kafka 集群的元数据(所有主题和分区信息)
  18.             var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
  19.             Console.WriteLine($"Kafka 集群包含 {metadata.Topics.Count} 个主题:");
  20.             // 遍历所有主题
  21.             foreach (var topicMetadata in metadata.Topics)
  22.             {
  23.                 string topic = topicMetadata.Topic;
  24.                 Console.WriteLine($"\n主题: {topic}");
  25.                 // 遍历主题的所有分区
  26.                 foreach (var partitionMetadata in topicMetadata.Partitions)
  27.                 {
  28.                     var partition = partitionMetadata.PartitionId;
  29.                     // 获取分区的最新偏移量(log end offset)
  30.                     using (var consumer = new ConsumerBuilder<Ignore, string>(new ConsumerConfig
  31.                     {
  32.                         BootstrapServers = bootstrapServers,
  33.                         GroupId = "test-consumer-group", // 消费者组名称
  34.                         AutoOffsetReset = AutoOffsetReset.Earliest
  35.                     }).Build())
  36.                     {
  37.                         var highwaterMark = consumer.QueryWatermarkOffsets(new TopicPartition(topic, partition), TimeSpan.FromSeconds(5));
  38.                         var logEndOffset = highwaterMark.High;
  39.                         // 获取消费者的当前偏移量(consumer offset)
  40.                         var consumerOffsets = consumer.Committed(new List<TopicPartition> { new TopicPartition(topic, partition) }, TimeSpan.FromSeconds(5));
  41.                         var consumerOffset = consumerOffsets[0]?.Offset ?? Offset.Unset.Value;
  42.                         // 如果没有消费过,设置 consumerOffset 为 0
  43.                         if (consumerOffset < 0)
  44.                         {
  45.                             consumerOffset = 0;
  46.                         }
  47.                         // 计算堆积数量
  48.                         var backlog = logEndOffset - consumerOffset;
  49.                         Console.WriteLine($"  分区 {partition}:");
  50.                         Console.WriteLine($"    Log End Offset: {logEndOffset}");
  51.                         Console.WriteLine($"    Consumer Offset: {consumerOffset}");
  52.                         Console.WriteLine($"    Message Backlog: {backlog}");
  53.                     }
  54.                 }
  55.             }
  56.         }
  57.         catch (Exception ex)
  58.         {
  59.             Console.WriteLine($"发生错误: {ex.Message}");
  60.         }
  61.     }
  62. }
复制代码

如果需要监控Kafka各过程中的更多的参数和指标,可以使用与上面的相似的或相近的类或者方法进行拓展来获取更多的参数和指标。此外对于C#而言,C#中的Confluent.Kafka库(Nuget包)也会持续更新,后面会有更多的特性支持,使C#对接Kafka的监控更加细化和美满。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

美食家大橙子

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表