3.actor模型的根本搭建(c#代码)

打印 上一主题 下一主题

主题 837|帖子 837|积分 2521

本文看了博客C# 实现 Actor并发模型 (案例版)_51CTO博客_actor并发模型,这里作为条记用,该博客内容写的很具体,这里根本上没有改动。
起首,本文的目次如下:

每个cs文件的代码都有具体的注释,具体代码如下:
1. IActor.cs代码:

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. namespace ConsoleApp3.core
  7. {
  8.     /// <summary>
  9.     /// 无锁并行编程模型(暂时用来处理串行任务,任务串行执行)
  10.     /// </summary>
  11.     public interface IActor
  12.     {
  13.         bool AddMsg(object message); //增加消息
  14.         Task Start(); // 启动服务
  15.         bool Stop(int WaitingTimeout); // 停止服务运行,等待毫秒数
  16.     }
  17. }
复制代码
2. Actor.cs代码

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading.Tasks;
  7. namespace ConsoleApp3.core
  8. {
  9.     public abstract class Actor : IDisposable, IActor
  10.     {
  11.         public string Name { get; set; }//名称
  12.         public bool Active { get; private set; } // 是否启用
  13.         public bool LongRunning { get; set; } = true; // 是否长时间运行。长时间运行任务使用独立线程,默认true
  14.         public BlockingCollection<object> MailBox { get; set; } // 处理的消息邮箱
  15.         private Task _task; // 内置任务
  16.         public Actor(string name)
  17.         {
  18.             Name = name;
  19.             MailBox = new BlockingCollection<object>();
  20.         }
  21.         // 开始任务
  22.         public virtual Task Start()
  23.         {
  24.             if (Active) return _task;
  25.             Active = true;
  26.             // 启动异步
  27.             if (_task == null)
  28.             {
  29.                 lock(this)
  30.                 {
  31.                     if (_task == null)
  32.                     {
  33.                         // Task.Factory.StartNew: 线程的使用
  34.                         // 如果LongRunning为真,则返回 TaskCreationOptions.LongRunning,否则返回TaskCreationOptions.None
  35.                         _task = Task.Factory.StartNew(DoActorWork, LongRunning ? TaskCreationOptions.LongRunning : TaskCreationOptions.None);
  36.                     }
  37.                 }
  38.             }
  39.             return _task;
  40.         }
  41.         // 停止任务
  42.         public virtual bool Stop(int WaitingTimeout = 100)
  43.         {
  44.             //BlockingCollection?.CompleteAdding()它的作用是通知集合不再接受新的元素添加。调用此方法后,尝试向集合中添加元素的操作将会抛出异常,并且当集合为空时,
  45.             //尝试从集合中移除元素的操作不会等待,而是立即返回。
  46.             MailBox?.CompleteAdding();
  47.             Active = false;
  48.             if (WaitingTimeout == 0 || _task == null) return true;
  49.             return _task.Wait(WaitingTimeout);
  50.         }
  51.         // 给邮件添加消息
  52.         public virtual bool AddMsg(object message)
  53.         {
  54.             // 自动开始
  55.             if (!Active) { Start(); }
  56.             if (!Active) { return false; }
  57.             MailBox.Add(message);
  58.             return true;
  59.         }
  60.         // 循环消息
  61.         private void DoActorWork()
  62.         {
  63.             //BlockingCollection.IsCompleted:指集合是否已经完成添加并且是否已经消耗了所有数据。换句话说,它返回 true 的条件是集合已经调用了 CompleteAdding() 并且集合为空。
  64.             //因此,IsCompleted 包含了集合为空的判断
  65.             while(!MailBox.IsCompleted)
  66.             {
  67.                 try
  68.                 {
  69.                     //BlockingCollection.Take():用于从集合中移除并返回一个元素。如果集合为空,调用线程会被阻塞,直到集合中有元素可用。
  70.                     var ctx = MailBox.Take();
  71.                     var task = ProcessAsync(ctx);
  72.                     if (task != null)
  73.                     {
  74.                         task.Wait();
  75.                     }
  76.                 }
  77.                 catch(InvalidOperationException) { }
  78.                 catch (Exception ex)
  79.                 {
  80.                     Console.WriteLine($"DoActorWork Error : {ex.Message}");
  81.                 }
  82.             }
  83.             Active = false;
  84.         }
  85.         // 处理消息
  86.         // abstract:是定义了一个抽象类
  87.         public abstract Task ProcessAsync(object msg);
  88.         public void Dispose()
  89.         {
  90.             try {Stop(100); }
  91.             catch (Exception) { }
  92.             // BlockingCollection?.TryTake(out _):尝试从MailBox中移除一个元素。如果集合为空,该方法会立即返回false;如果集合不为空,则尝试移除一个元素并返回true。
  93.             // 如果操作成功,还可以通过out参数获取移除的元素。
  94.             while (MailBox?.TryTake(out _) == true) { }
  95.             MailBox = null;
  96.         }
  97.     }
  98. }
复制代码
3.WriteActor.cs代码

  1. using ConsoleApp3.core;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading.Tasks;
  7. namespace ConsoleApp3
  8. {
  9.     public class WriteActor: Actor
  10.     {
  11.         public WriteActor() : base(nameof(WriteActor)) { }
  12.          处理信息
  13.         public override Task ProcessAsync(object msg)
  14.         {
  15.             try { Console.WriteLine($"输出 {this.Name} :{msg}");}
  16.             catch(Exception e) { Console.WriteLine($"业务处理异常:{e.Message}"); }
  17.             return Task.CompletedTask;
  18.         }
  19.     }
  20. }
复制代码
4.AccumulationActor.cs代码

  1. using ConsoleApp3.core;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading.Tasks;
  7. namespace ConsoleApp3
  8. {
  9.     public class AccumulationActor : Actor
  10.     {
  11.         private int Count = 0;
  12.         private IActor actor;
  13.         public AccumulationActor(IActor actor) : base(nameof(AccumulationActor))
  14.         {
  15.             Count = 0;
  16.             this.actor = actor;
  17.             //Console.WriteLine(nameof(AccumulationActor)); // 输出基类名称
  18.         }
  19.         // 处理信息
  20.         public override Task ProcessAsync(object msg)
  21.         {
  22.             try
  23.             {
  24.                 var msgNumber = (int)(msg);
  25.                 Count += msgNumber;
  26.                 Console.WriteLine($"处理{this.Name} :{msg} ,累积总数:{Count}");
  27.                 if (Count >= 100)
  28.                 {
  29.                     this.actor.AddMsg(Count);
  30.                     Count = 0;
  31.                 }
  32.             }
  33.             catch(Exception e) { Console.WriteLine($"业务处理异常:{e.Message}"); }
  34.             return Task.CompletedTask;
  35.         }
  36.     }
  37. }
复制代码
5.program.cs代码

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace ConsoleApp3
  8. {
  9.     class Program
  10.     {
  11.         static void Main(string[] args)
  12.         {
  13.             Console.Title = "Actor Demo of wang";
  14.             //实现一个加法逻辑
  15.             //a累加到100,就发送消息到 b里,让b 输出
  16.             var write = new WriteActor();
  17.             var User = new AccumulationActor(write);
  18.             for (int i = 0; i < 20; i++) { User.AddMsg(i * 30); }
  19.             Thread.Sleep(2000);
  20.             write.Stop();
  21.             User.Stop();
  22.             //释放资源
  23.             Console.WriteLine("示例完毕!");
  24.             Console.ReadLine();
  25.         }
  26.     }
  27. }
复制代码





免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

来自云龙湖轮廓分明的月亮

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表