【C#】Kafka for .NET 入坑记录
一、有关Kafka
Kafka 是一个分布式、多副本、基于Zookeeper的消息流处理平台。Kafka 最初由Linkedin开发,为了办理 data pipeline 问题。2011年募捐给 Apache基金会,后来成为Apache的顶级开源项目。
Kafka 的未来:及时数据流平台。(出自饶军)
1-1、基本概念
- 消息:Kafka 中的数据单元,也被称为记录。
- 批次:消息会分批次写入Kafka,批次就是指一组消息。
- 主题:消息的种类,也可以说一个主题代表一类消息。
- 分区:物理上最小的存储单元。一个主题可以分为多个分区,每个分区内消息是有序的。
- 生产者:生产消息,发送到主题。
- 消费者:从主题获取消息,处理消息。
- 消费者群组:由一个或多个消费者组成的群体。在同一个消费组中,一个消息只能被一个消息者消息。
- 偏移量:一个不断自增的整数值,分区中一条消息的唯一标示符。可体现消费者在分区的位置。
- broker:一个独立的Kafka服务器。
- broker集群:由一个或多个broker组成的集群。
- 副本:消息的备份。Kafka定义了两类副本(领导者副本、追随者副本)。
- 重平衡:其个消费者实例挂掉后,其他消息者实例自动重新分配的过程。
相干词汇:
- 消息 - message
- 分区 - partition
- 主题 - topic
- 生产者 - producer
- 消费者 - consumer
- 消费者群组 - consumer group
- 偏移量 - consumer offset
- 缓存代理 - broker
- 副本 - replica
- 领导者副本 - leader replica
- 追随者副本 - follower replica
- 重平衡 - rebalance
复制代码 1-2、Kafka发行版本:
- Apache Kafak:也称社区版 Kafka。
- Confluent Kafka:Confluent 公司提供的 Kafka。
2014年,Kafka的3个首创人 Jay Kreps、Naha Narkhede 和 Jun Rao 离开 LinkedIn,创立了Confluent公司,专注于提供基于 Kafka 的企业流处理办理方案。Confluent Kafka 提供了一些 Apache Kafka 没有的高级特性,如跨数据中央备份、Schema 注册中央以及集群监控工具等。
- Cloudera/Hortonworks Kafka:Cloudera 提供的 CDH Kafka 和 Hortonworks 提供的 HDP Kafka。集成了 Apache Kafka,通过便捷化的界面操作将 Kafka 的安装、运维、管理、监控全部统一在控制台中。
二、Technology Libraries
2-1、Confluent.Kafka
引用库
- <PackageReference Include="Confluent.Kafka" Version="2.8.0" />
复制代码- const string topic = "test";
- const string host = "localhost:9092";
- #region 消费者
- var t1 = Task.Run(async () =>
- {
- var cfg = new ConsumerConfig()
- {
- GroupId = $"test-consumer-group-console",
- BootstrapServers = host,
- AutoOffsetReset = AutoOffsetReset.Earliest,
- EnableAutoCommit = true // 自动提交
- };
- using var builder = new ConsumerBuilder<Ignore, string>(cfg).Build();
- builder.Subscribe(topic);
- var cts = new CancellationTokenSource();
- Console.CancelKeyPress += (s, e) =>
- {
- e.Cancel = true;
- cts.Cancel();
- };
- try
- {
- while (true)
- {
- try
- {
- var consumer = builder.Consume(cts.Token);
- Console.WriteLine($"Message: {consumer.Message.Value} received from {consumer.TopicPartitionOffset}");
- // builder.Commit(consumer); // 手动提交
- }
- catch (ConsumeException e)
- {
- Console.WriteLine($"Error occured: {e.Error.Reason}");
- }
- await Task.Yield();
- }
- }
- catch (OperationCanceledException ex)
- {
- builder.Close();
- Console.WriteLine($"ex: {ex}");
- }
- });
- #endregion 消费者
- #region 生产者
- var t2 = Task.Run(async () =>
- {
- var cfg = new ProducerConfig
- {
- BootstrapServers = host,
- };
- var cts = new CancellationTokenSource();
- Console.CancelKeyPress += (s, e) =>
- {
- e.Cancel = true;
- cts.Cancel();
- };
- // 生产者
- using var producer = new ProducerBuilder<Null, string>(cfg).Build();
- while (true)
- {
- if (cts.IsCancellationRequested) break;
- try
- {
- await producer.ProduceAsync(topic, new Message<Null, string>
- {
- Value = JsonConvert.SerializeObject(new KafkaMessage
- {
- Ts = DateTimeOffset.Now,
- Text = new Faker("en").Address.FullAddress()
- })
- });
- }
- catch (ProduceException<Null, KafkaMessage> ex)
- {
- Console.WriteLine($"ex: {ex.Error.Reason}");
- }
- await Task.Delay(TimeSpan.FromMilliseconds(200));
- }
- });
- #endregion 生产者
- Console.ReadLine();
- await Task.WhenAll(t1, t2);
复制代码 2-2、DotNetCore.CAP.Kafka
CAP 是一种处理分布式事件的办理方案,能够保证任务环境下变乱消息都不会丢失。可作 EventBus 来使用。
引用库
- <PackageReference Include="DotNetCore.CAP.InMemoryStorage" Version="8.3.2" />
- <PackageReference Include="DotNetCore.CAP.Kafka" Version="8.3.2" />
- <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.0" />
- <PackageReference Include="Microsoft.Extensions.Logging.Console" Version="9.0.0" />
复制代码- var services = new ServiceCollection();
- services.AddLogging(config =>
- {
- config.SetMinimumLevel(LogLevel.Debug);
- config.AddConsole();
- });
- services.AddCap(config =>
- {
- config.UseInMemoryStorage();
- config.UseKafka(opt =>
- {
- opt.Servers = "localhost:9092";
-
- // 自定义Headers, cap-msg-id 和 cap-msg-name 必选项
- opt.CustomHeadersBuilder = (kr, sp) => new List<KeyValuePair<string, string>>
- {
- new KeyValuePair<string, string>("cap-msg-id",Guid.NewGuid().ToString()),
- new KeyValuePair<string, string>("cap-msg-name","test"),
- new KeyValuePair<string, string>("kafka.offset",kr.Offset.ToString()),
- };
- });
- config.FailedRetryInterval = 10; // 重试的间隔时间,默认是60s
- config.DefaultGroupName = "test-group";
- });
- services.AddTransient<ICapSubscribe, SubscriberService>();
- var sp = services.BuildServiceProvider();
- var cts = new CancellationTokenSource();
- Console.CancelKeyPress += (s, e) =>
- {
- e.Cancel = true;
- cts.Cancel();
- };
- // 启动CAP
- var bootstrap = sp.GetRequiredService<IBootstrapper>();
- await bootstrap.BootstrapAsync(cts.Token);
- await Task.Delay(500);
- // 生产者
- ICapPublisher capBus = sp.GetRequiredService<ICapPublisher>();
- while (true)
- {
- if (cts.IsCancellationRequested) break;
- // 推送消息
- capBus.Publish("test", new KafkaMessage
- {
- Ts = DateTimeOffset.Now,
- Text = new Faker().Address.FullAddress()
- });
-
- await Task.Delay(TimeSpan.FromMilliseconds(200));
- }
- await bootstrap.DisposeAsync();
- Console.ReadLine();
复制代码 消费者
- internal class SubscriberService : ICapSubscribe
- {
- [CapSubscribe("test")]
- public void ReceiveMessage(KafkaMessage msg, [FromCap] CapHeader header)
- {
- if (msg != null)
- {
- Console.WriteLine($"Message: {msg.Ts} | {msg.Text}");
- }
- }
- }
复制代码 2-3、MassTransit.Kafka
MassTransit 基于消息驱动的分布式应用框架,可作为分布式应用的消息总线,也可以用作单体应用的变乱总线。
引用库
- <PackageReference Include="MassTransit.Kafka" Version="8.3.4" />
- <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.0" />
复制代码- var services = new ServiceCollection();
- services.AddMassTransit(config =>
- {
- const string topic = "test";
- const string groupId = "test-consumer-group";
- const string host = "localhost:9092";
- config.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
- config.AddRider(rider =>
- {
- // 注册生产者
- rider.AddProducer<KafkaMessage>(topic);
- // 注册消费者
- rider.AddConsumer<KafkaMessageConsumer>();
- // Kafka连接配置
- rider.UsingKafka((context, k) =>
- {
- k.Host(host);
- // 配置 topic
- k.TopicEndpoint<KafkaMessage>(topic, groupId, e =>
- {
- e.AutoOffsetReset = AutoOffsetReset.Earliest;
- e.ConfigureConsumer<KafkaMessageConsumer>(context);
- e.CreateIfMissing();
- });
- });
- });
- });
- var sp = services.BuildServiceProvider();
- var cts = new CancellationTokenSource();
- Console.CancelKeyPress += (s, e) =>
- {
- e.Cancel = true;
- cts.Cancel();
- };
- var bus = sp.GetRequiredService<IBusControl>();
- bus.Start(TimeSpan.FromSeconds(10.0));
- try
- {
- // 生产者
- var producer = sp.GetRequiredService<ITopicProducer<KafkaMessage>>();
- while (true)
- {
- if (cts.IsCancellationRequested) break;
- try
- {
- // 推送消息
- string value = new Faker().Address.FullAddress();
- await producer.Produce(new KafkaMessage
- {
- Ts = DateTimeOffset.Now,
- Text = value
- });
- await Task.Delay(TimeSpan.FromMilliseconds(200));
- }
- catch (ProduceException<Ignore, KafkaMessage> pex)
- {
- Console.WriteLine(pex.Error.Reason);
- }
- }
- }
- catch (OperationCanceledException cce)
- {
- }
- finally
- {
- await bus.StopAsync();
- }
- Console.ReadLine();
复制代码 消费者
- internal class KafkaMessageConsumer : IConsumer<KafkaMessage>
- {
- public Task Consume(ConsumeContext<KafkaMessage> context)
- {
- var ctx = (context.ReceiveContext as KafkaReceiveContext<Ignore, KafkaMessage>);
- Console.WriteLine($"Message: {context.Message.Ts} | {context.Message.Text}, Offset: {ctx?.Offset}");
- return Task.CompletedTask;
- }
- }
复制代码 2-4、Streamiz.Kafka.Net
Kafka Streams 流式计算
- // kafka连接配置
- var config = new StreamConfig<StringSerDes, StringSerDes>();
- config.ApplicationId = "test-app";
- config.BootstrapServers = "localhost:9092";
- StreamBuilder builder = new StreamBuilder();
- builder.Stream<string, string>("test") // 输入 topic (订阅 topic = test)
- // 定义处理过程
- .MapValues((value, context) =>
- {
- return $"{value} ---- 1111 ---- {context.Offset}";
- })
- // 定义处理过程
- .MapValues((value, context) =>
- {
- return $"{value} ---- 2222 ---- {context.Offset}";
- })
- // 定义处理过程
- // ...
- // 定义处理过程
- .To("test-output"); // 输出 topic (把处理结束发到 topic = test-output)
- Topology t = builder.Build();
- KafkaStream stream = new KafkaStream(t, config);
- await stream.StartAsync();
- Console.ReadLine();
- stream.Dispose();
复制代码 输入:a
输出:a ---- 1111 ---- 4910 ---- 2222 ---- 4910
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |