ToB企服应用市场:ToB评测及商务社交产业平台

标题: .NET分布式Orleans - 7 - Streaming [打印本页]

作者: 忿忿的泥巴坨    时间: 2024-5-15 19:36
标题: .NET分布式Orleans - 7 - Streaming
概念

在Orleans中,Streaming是一组API和功能集,它提供了一种构建、发布和消费数据流的方式。
这些流可以是任何类型的数据,从简单的消息到复杂的变乱或数据记录。Streaming API允许你定义、发布和消费这些流,而无需关心底层的传输机制或数据存储。
每个流都有一个唯一的标识符,称为StreamId,用于区分差别的流。流可以是持久的,也可以是临时的,具体取决于所利用的流提供者(Stream Provider)。流提供者负责处理流的存储、传输和故障恢复。
作用

Streaming在Orleans中起到了至关重要的作用,主要体现在以下几个方面:
应用场景

示例

安装nuget包
  1. [/code]配置Streaming
  2. [code]siloHostBuilder.AddMemoryStreams("StreamProvider").AddMemoryGrainStorage("PubSubStore");
复制代码
定义一个Grain天生变乱
  1. public interface ISender : IGrainWithStringKey
  2. {
  3.     Task Send(Guid rid);
  4. }
  5. public class SenderGrain : Grain, ISender
  6. {
  7.     public Task Send(Guid rid)
  8.     {
  9.         var streamProvider = this.GetStreamProvider("StreamProvider");
  10.         var streamId = StreamId.Create("RANDOMDATA", rid);
  11.         var stream = streamProvider.GetStream<int>(streamId);
  12.         RegisterTimer(_ =>
  13.         {
  14.             return stream.OnNextAsync(Random.Shared.Next());
  15.         }, null, TimeSpan.FromMilliseconds(1_000), TimeSpan.FromMilliseconds(1_000));
  16.         return Task.CompletedTask;
  17.     }
  18. }
复制代码
再定义一个Grain订阅变乱
  1. public interface IRandomReceiver : IGrainWithGuidKey
  2. {
  3.     Task Receive();
  4. }
  5. [ImplicitStreamSubscription("RANDOMDATA")]
  6. public class ReceiverGrain : Grain, IRandomReceiver
  7. {
  8.     public override async Task OnActivateAsync(CancellationToken cancellationToken)
  9.     {
  10.         var streamProvider = this.GetStreamProvider("StreamProvider");
  11.         var rid = this.GetPrimaryKey();
  12.         var streamId = StreamId.Create("RANDOMDATA", rid);
  13.         var stream = streamProvider.GetStream<int>(streamId);
  14.         await stream.SubscribeAsync<int>(
  15.             async (data, token) =>
  16.             {
  17.                 Console.WriteLine(data);
  18.                 await Task.CompletedTask;
  19.             });
  20.         base.OnActivateAsync(cancellationToken);
  21.     }
  22.     public async Task Receive()
  23.     {
  24.             
  25.     }
  26. }
复制代码
然后即可测试
  1. var rid = Guid.NewGuid();
  2. var sender1 = client.GetGrain<ISender>("sender1");
  3. await sender1.Send(rid);
  4. var reciver1 = client.GetGrain<IRandomReceiver>(new Guid());
  5. await reciver1.Receive();
复制代码
流提供程序

提供程序可以通过在nuget种搜索Orleans.Streaming,也可以通过PersistentStreamProvider 与 IQueueAdapter 重写来自定义Provider

 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4