【C#】Kafka for .NET 入坑记录

打印 上一主题 下一主题

主题 866|帖子 866|积分 2598

【C#】Kafka for .NET 入坑记录

一、有关Kafka

Kafka 是一个分布式、多副本、基于Zookeeper的消息流处理平台。Kafka 最初由Linkedin开发,为了办理 data pipeline 问题。2011年募捐给 Apache基金会,后来成为Apache的顶级开源项目。
Kafka 的未来:及时数据流平台。(出自饶军)
1-1、基本概念



  • 消息:Kafka 中的数据单元,也被称为记录。
  • 批次:消息会分批次写入Kafka,批次就是指一组消息。
  • 主题:消息的种类,也可以说一个主题代表一类消息。
  • 分区:物理上最小的存储单元。一个主题可以分为多个分区,每个分区内消息是有序的。
  • 生产者:生产消息,发送到主题。
  • 消费者:从主题获取消息,处理消息。
  • 消费者群组:由一个或多个消费者组成的群体。在同一个消费组中,一个消息只能被一个消息者消息。
  • 偏移量:一个不断自增的整数值,分区中一条消息的唯一标示符。可体现消费者在分区的位置。
  • broker:一个独立的Kafka服务器。
  • broker集群:由一个或多个broker组成的集群。
  • 副本:消息的备份。Kafka定义了两类副本(领导者副本、追随者副本)。
  • 重平衡:其个消费者实例挂掉后,其他消息者实例自动重新分配的过程。
相干词汇:
  1. 消息 - message
  2. 分区 - partition
  3. 主题 - topic
  4. 生产者 - producer
  5. 消费者 - consumer
  6. 消费者群组 - consumer group
  7. 偏移量 - consumer offset
  8. 缓存代理 - broker
  9. 副本 - replica
  10. 领导者副本 - leader replica
  11. 追随者副本 - follower replica
  12. 重平衡 - rebalance
复制代码
1-2、Kafka发行版本:



  • Apache Kafak:也称社区版 Kafka。
  • Confluent Kafka:Confluent 公司提供的 Kafka。
    2014年,Kafka的3个首创人 Jay Kreps、Naha Narkhede 和 Jun Rao 离开 LinkedIn,创立了Confluent公司,专注于提供基于 Kafka 的企业流处理办理方案。Confluent Kafka 提供了一些 Apache Kafka 没有的高级特性,如跨数据中央备份、Schema 注册中央以及集群监控工具等。
  • Cloudera/Hortonworks Kafka:Cloudera 提供的 CDH Kafka 和 Hortonworks 提供的 HDP Kafka。集成了 Apache Kafka,通过便捷化的界面操作将 Kafka 的安装、运维、管理、监控全部统一在控制台中。
二、Technology Libraries

2-1、Confluent.Kafka

引用库
  1. <PackageReference Include="Confluent.Kafka" Version="2.8.0" />
复制代码
  1. const string topic = "test";
  2. const string host = "localhost:9092";
  3. #region 消费者
  4. var t1 = Task.Run(async () =>
  5. {
  6.          var cfg = new ConsumerConfig()
  7.          {
  8.                  GroupId = $"test-consumer-group-console",
  9.                  BootstrapServers = host,
  10.                  AutoOffsetReset = AutoOffsetReset.Earliest,
  11.                  EnableAutoCommit = true  // 自动提交
  12.          };
  13.          using var builder = new ConsumerBuilder<Ignore, string>(cfg).Build();
  14.          builder.Subscribe(topic);
  15.          var cts = new CancellationTokenSource();
  16.          Console.CancelKeyPress += (s, e) =>
  17.          {
  18.                  e.Cancel = true;
  19.                  cts.Cancel();
  20.          };
  21.          try
  22.          {
  23.                  while (true)
  24.                  {
  25.                          try
  26.                          {
  27.                                  var consumer = builder.Consume(cts.Token);
  28.                                  Console.WriteLine($"Message: {consumer.Message.Value} received from {consumer.TopicPartitionOffset}");
  29.                                  // builder.Commit(consumer); // 手动提交
  30.                          }
  31.                          catch (ConsumeException e)
  32.                          {
  33.                                  Console.WriteLine($"Error occured: {e.Error.Reason}");
  34.                          }
  35.                          await Task.Yield();
  36.                  }
  37.          }
  38.          catch (OperationCanceledException ex)
  39.          {
  40.                  builder.Close();
  41.                  Console.WriteLine($"ex: {ex}");
  42.          }
  43. });
  44. #endregion 消费者
  45. #region 生产者
  46. var t2 = Task.Run(async () =>
  47. {
  48.         var cfg = new ProducerConfig
  49.         {
  50.                 BootstrapServers = host,
  51.         };
  52.         var cts = new CancellationTokenSource();
  53.         Console.CancelKeyPress += (s, e) =>
  54.         {
  55.                 e.Cancel = true;
  56.                 cts.Cancel();
  57.         };
  58.         // 生产者
  59.         using var producer = new ProducerBuilder<Null, string>(cfg).Build();
  60.         while (true)
  61.         {
  62.                 if (cts.IsCancellationRequested) break;
  63.                 try
  64.                 {
  65.                         await producer.ProduceAsync(topic, new Message<Null, string>
  66.                         {
  67.                                 Value = JsonConvert.SerializeObject(new KafkaMessage
  68.                                 {
  69.                                         Ts = DateTimeOffset.Now,
  70.                                         Text = new Faker("en").Address.FullAddress()
  71.                                 })
  72.                         });
  73.                 }
  74.                 catch (ProduceException<Null, KafkaMessage> ex)
  75.                 {
  76.                         Console.WriteLine($"ex: {ex.Error.Reason}");
  77.                 }
  78.                 await Task.Delay(TimeSpan.FromMilliseconds(200));
  79.         }
  80. });
  81. #endregion 生产者
  82. Console.ReadLine();
  83. await Task.WhenAll(t1, t2);
复制代码
2-2、DotNetCore.CAP.Kafka

CAP 是一种处理分布式事件的办理方案,能够保证任务环境下变乱消息都不会丢失。可作 EventBus 来使用。
引用库
  1. <PackageReference Include="DotNetCore.CAP.InMemoryStorage" Version="8.3.2" />
  2. <PackageReference Include="DotNetCore.CAP.Kafka" Version="8.3.2" />
  3. <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.0" />
  4. <PackageReference Include="Microsoft.Extensions.Logging.Console" Version="9.0.0" />
复制代码
  1. var services = new ServiceCollection();
  2. services.AddLogging(config =>
  3. {
  4.    config.SetMinimumLevel(LogLevel.Debug);
  5.    config.AddConsole();
  6. });
  7. services.AddCap(config =>
  8. {
  9.    config.UseInMemoryStorage();
  10.    config.UseKafka(opt =>
  11.    {
  12.            opt.Servers = "localhost:9092";
  13.           
  14.            // 自定义Headers, cap-msg-id 和 cap-msg-name 必选项
  15.            opt.CustomHeadersBuilder = (kr, sp) => new List<KeyValuePair<string, string>>
  16.            {
  17.                    new KeyValuePair<string, string>("cap-msg-id",Guid.NewGuid().ToString()),
  18.                    new KeyValuePair<string, string>("cap-msg-name","test"),
  19.                    new KeyValuePair<string, string>("kafka.offset",kr.Offset.ToString()),
  20.            };
  21.    });
  22.    config.FailedRetryInterval = 10; // 重试的间隔时间,默认是60s
  23.    config.DefaultGroupName = "test-group";
  24. });
  25. services.AddTransient<ICapSubscribe, SubscriberService>();
  26. var sp = services.BuildServiceProvider();
  27. var cts = new CancellationTokenSource();
  28. Console.CancelKeyPress += (s, e) =>
  29. {
  30.    e.Cancel = true;
  31.    cts.Cancel();
  32. };
  33. // 启动CAP
  34. var bootstrap = sp.GetRequiredService<IBootstrapper>();
  35. await bootstrap.BootstrapAsync(cts.Token);
  36. await Task.Delay(500);
  37. // 生产者
  38. ICapPublisher capBus = sp.GetRequiredService<ICapPublisher>();
  39. while (true)
  40. {
  41.    if (cts.IsCancellationRequested) break;
  42.    // 推送消息
  43.    capBus.Publish("test", new KafkaMessage
  44.    {
  45.            Ts = DateTimeOffset.Now,
  46.            Text = new Faker().Address.FullAddress()
  47.    });
  48.           
  49.    await Task.Delay(TimeSpan.FromMilliseconds(200));
  50. }
  51. await bootstrap.DisposeAsync();
  52. Console.ReadLine();
复制代码
消费者
  1. internal class SubscriberService : ICapSubscribe
  2. {
  3.    [CapSubscribe("test")]
  4.    public void ReceiveMessage(KafkaMessage msg, [FromCap] CapHeader header)
  5.    {
  6.            if (msg != null)
  7.            {
  8.                    Console.WriteLine($"Message: {msg.Ts} | {msg.Text}");
  9.            }
  10.    }
  11. }
复制代码
2-3、MassTransit.Kafka

MassTransit 基于消息驱动的分布式应用框架,可作为分布式应用的消息总线,也可以用作单体应用的变乱总线。
引用库
  1. <PackageReference Include="MassTransit.Kafka" Version="8.3.4" />
  2. <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.0" />
复制代码
  1. var services = new ServiceCollection();
  2. services.AddMassTransit(config =>
  3. {
  4.   const string topic = "test";
  5.   const string groupId = "test-consumer-group";
  6.   const string host = "localhost:9092";
  7.   config.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
  8.   config.AddRider(rider =>
  9.   {
  10.           // 注册生产者
  11.           rider.AddProducer<KafkaMessage>(topic);
  12.           // 注册消费者
  13.           rider.AddConsumer<KafkaMessageConsumer>();
  14.           // Kafka连接配置
  15.           rider.UsingKafka((context, k) =>
  16.           {
  17.                   k.Host(host);
  18.                   // 配置 topic
  19.                   k.TopicEndpoint<KafkaMessage>(topic, groupId, e =>
  20.                   {
  21.                           e.AutoOffsetReset = AutoOffsetReset.Earliest;
  22.                           e.ConfigureConsumer<KafkaMessageConsumer>(context);
  23.                           e.CreateIfMissing();
  24.                   });
  25.           });
  26.   });
  27. });
  28. var sp = services.BuildServiceProvider();
  29. var cts = new CancellationTokenSource();
  30. Console.CancelKeyPress += (s, e) =>
  31. {
  32.   e.Cancel = true;
  33.   cts.Cancel();
  34. };
  35. var bus = sp.GetRequiredService<IBusControl>();
  36. bus.Start(TimeSpan.FromSeconds(10.0));
  37. try
  38. {
  39.   // 生产者
  40.   var producer = sp.GetRequiredService<ITopicProducer<KafkaMessage>>();
  41.   while (true)
  42.   {
  43.           if (cts.IsCancellationRequested) break;
  44.           try
  45.           {
  46.                   // 推送消息
  47.                   string value = new Faker().Address.FullAddress();
  48.                   await producer.Produce(new KafkaMessage
  49.                   {
  50.                           Ts = DateTimeOffset.Now,
  51.                           Text = value
  52.                   });
  53.                   await Task.Delay(TimeSpan.FromMilliseconds(200));
  54.           }
  55.           catch (ProduceException<Ignore, KafkaMessage> pex)
  56.           {
  57.                   Console.WriteLine(pex.Error.Reason);
  58.           }
  59.   }
  60. }
  61. catch (OperationCanceledException cce)
  62. {
  63. }
  64. finally
  65. {
  66.   await bus.StopAsync();
  67. }
  68. Console.ReadLine();
复制代码
消费者
  1. internal class KafkaMessageConsumer : IConsumer<KafkaMessage>
  2. {
  3. public Task Consume(ConsumeContext<KafkaMessage> context)
  4. {
  5.         var ctx = (context.ReceiveContext as KafkaReceiveContext<Ignore, KafkaMessage>);
  6.         Console.WriteLine($"Message: {context.Message.Ts} | {context.Message.Text}, Offset: {ctx?.Offset}");
  7.         return Task.CompletedTask;
  8. }
  9. }
复制代码
2-4、Streamiz.Kafka.Net

Kafka Streams 流式计算
  1. // kafka连接配置
  2. var config = new StreamConfig<StringSerDes, StringSerDes>();
  3. config.ApplicationId = "test-app";
  4. config.BootstrapServers = "localhost:9092";
  5. StreamBuilder builder = new StreamBuilder();
  6. builder.Stream<string, string>("test") // 输入 topic (订阅 topic = test)
  7.         // 定义处理过程
  8.         .MapValues((value, context) =>
  9.         {
  10.                 return $"{value} ---- 1111 ---- {context.Offset}";
  11.         })
  12.         // 定义处理过程
  13.         .MapValues((value, context) =>
  14.         {
  15.                 return $"{value} ---- 2222 ---- {context.Offset}";
  16.         })
  17.         // 定义处理过程
  18.         // ...
  19.         // 定义处理过程
  20.         .To("test-output");  // 输出 topic (把处理结束发到 topic = test-output)
  21. Topology t = builder.Build();
  22. KafkaStream stream = new KafkaStream(t, config);
  23. await stream.StartAsync();
  24. Console.ReadLine();
  25. stream.Dispose();
复制代码
输入:a
输出:a ---- 1111 ---- 4910 ---- 2222 ---- 4910

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

风雨同行

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表