概念
在Orleans中,Streaming是一组API和功能集,它提供了一种构建、发布和消费数据流的方式。
这些流可以是任何类型的数据,从简单的消息到复杂的变乱或数据记录。Streaming API允许你定义、发布和消费这些流,而无需关心底层的传输机制或数据存储。
每个流都有一个唯一的标识符,称为StreamId,用于区分差别的流。流可以是持久的,也可以是临时的,具体取决于所利用的流提供者(Stream Provider)。流提供者负责处理流的存储、传输和故障恢复。
作用
Streaming在Orleans中起到了至关重要的作用,主要体现在以下几个方面:
- 解耦:Streaming允许将数据的产生者和消费者解耦。生产者可以发布数据到流中,而消费者可以独立地订阅这些流并处理数据。这种解耦使得系统更加灵活和可扩展。
- 实时性:通过Streaming,你可以实时地处理和响应数据流。这对于需要实时分析、监控或响应的场景非常有用。
- 故障恢复:Orleans的Streaming机制具有强大的故障恢复本领。纵然在出现网络分区或节点故障的情况下,流提供者也能够确保数据的可靠性和一致性。
应用场景
- 实时日记分析:你可以将应用程序的日记消息发布到流中,并利用专门的消费者来分析这些日记。这允许你实时地监控和响应应用程序的举动。
- 变乱驱动架构:在变乱驱动架构中,你可以利用Streaming来发布变乱,并由多个消费者来处理这些变乱。这有助于构建松耦合、可扩展和响应式的系统。
- 分布式协作:Streaming也可以用于实现分布式系统中的协作和通信。例如,多个节点可以发布状态更新到流中,其他节点可以订阅这些流以获取最新的状态信息。
示例
安装nuget包- [/code]配置Streaming
- [code]siloHostBuilder.AddMemoryStreams("StreamProvider").AddMemoryGrainStorage("PubSubStore");
复制代码 定义一个Grain天生变乱- public interface ISender : IGrainWithStringKey
- {
- Task Send(Guid rid);
- }
- public class SenderGrain : Grain, ISender
- {
- public Task Send(Guid rid)
- {
- var streamProvider = this.GetStreamProvider("StreamProvider");
- var streamId = StreamId.Create("RANDOMDATA", rid);
- var stream = streamProvider.GetStream<int>(streamId);
- RegisterTimer(_ =>
- {
- return stream.OnNextAsync(Random.Shared.Next());
- }, null, TimeSpan.FromMilliseconds(1_000), TimeSpan.FromMilliseconds(1_000));
- return Task.CompletedTask;
- }
- }
复制代码 再定义一个Grain订阅变乱- public interface IRandomReceiver : IGrainWithGuidKey
- {
- Task Receive();
- }
- [ImplicitStreamSubscription("RANDOMDATA")]
- public class ReceiverGrain : Grain, IRandomReceiver
- {
- public override async Task OnActivateAsync(CancellationToken cancellationToken)
- {
- var streamProvider = this.GetStreamProvider("StreamProvider");
- var rid = this.GetPrimaryKey();
- var streamId = StreamId.Create("RANDOMDATA", rid);
- var stream = streamProvider.GetStream<int>(streamId);
- await stream.SubscribeAsync<int>(
- async (data, token) =>
- {
- Console.WriteLine(data);
- await Task.CompletedTask;
- });
- base.OnActivateAsync(cancellationToken);
- }
- public async Task Receive()
- {
-
- }
- }
复制代码 然后即可测试- var rid = Guid.NewGuid();
- var sender1 = client.GetGrain<ISender>("sender1");
- await sender1.Send(rid);
- var reciver1 = client.GetGrain<IRandomReceiver>(new Guid());
- await reciver1.Receive();
复制代码 流提供程序
提供程序可以通过在nuget种搜索Orleans.Streaming,也可以通过PersistentStreamProvider 与 IQueueAdapter 重写来自定义Provider
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |