rust学习十六.2、并发-使用消息传递进行线程间通讯

打印 上一主题 下一主题

主题 870|帖子 870|积分 2610

通过信道是rust的解决线程之间通信的2个工具之一,另外1个是是共享内存状态。
rust推出这个,明显地是因为受到go之类的影响。
在册本中,作者提到go编程文档中的内容:
不要通过共享内存来通讯;而是通过通讯来共享内存(Do not communicate by sharing memory; instead, share memory by communicating)
 
因为,我这里就按部就班复述下书籍上关于这个篇章的内容。
 
一、概述

* 1.mpsc 是 multiply producer single consumer 的缩写,即多生产者单消费者模式.意味着*   rust的重要消息模式不允许一个生产者多个消费者。但rust也没有说不支持一个生产者多个消费者模式* 2.(tx, rx) = mpsc::channel() 会返回一个元组,分别表示发送和接收者* 3. rx有两个紧张方法,分别是recv() 和 try_recv().前者会阻塞,后者不会。* 4. 信道和所有权问题. tx.send方法会默认夺取消息的所有权,移动这个值归接收者所有.所以发送者必要拥有消息的所有权。*    这也同时意味着,如果没有特别措施,那么发送之后,被发送的消息不能再访问了。*    那么,我们是否甘心clone()一下,然后再发送呢?是的。*    rust的所有权体系,导致rust会把消息视为一个实物,而不是一个拷贝,意味着发送就是“脱手”、“出手”* 5. 发送者可以关闭信道,这样接收者就会收到一个错误。* 6. 发送者可以克隆,这样多个线程都可以发送消息。发送者虽然克隆了,但是接收者照旧同一个.  7. 这种方式是否可能存在这样的问题:后发先至。 这个问题暂时没有明确答案,因为暂时没有找到答案。据说概率很小,但不是么有?二、简要示例

2.1例子一

这个例子把原书的简朴地做了一些修改,以便更有乐趣些!
和原书稍微不同的是,接收者会根据消息来源把收到的内容分别生存到v0,v1的向量中,最后打印各自拼接的消息
大体模拟了碟战中,收到不同消息来源的情景!
  1. use std::sync::mpsc;
  2. use std::thread;
  3. use std::time::Duration;
  4. #[derive(Debug)]
  5. #[allow(dead_code)]
  6. struct Message {
  7.     sender: String, // 发送者标识
  8.     content: String, // 消息内容
  9.     order: u8, // 消息序号
  10. }
  11. fn main() {
  12.     // --snip--
  13.     let (tx, rx) = mpsc::channel();   //channel() 返回一个元组,分别表示发送和接收者
  14.     let tx1 = tx.clone();
  15.     thread::spawn(move || {           // 移动tx1所有权到新线程中。为什么需要move?规定就是这样
  16.         let vals = vec![
  17.             Message{sender: String::from("x1"),content: String::from("风"),order:1},
  18.             Message{sender: String::from("x1"),content: String::from("紧"),order:2},
  19.             Message{sender: String::from("x1"),content: String::from("扯"),order:3},
  20.             Message{sender: String::from("x1"),content: String::from("呼"),order:4}
  21.         ];
  22.         for val in vals {
  23.             tx1.send(val).unwrap();
  24.             println!("tx1 is sending....");
  25.             thread::sleep(Duration::from_millis(200));
  26.         }
  27.     });
  28.     thread::spawn(move || {
  29.         let vals = vec![
  30.             Message{sender: String::from("x0"),content: String::from("乘"),order:1},
  31.             Message{sender: String::from("x0"),content: String::from("胜"),order:2},
  32.             Message{sender: String::from("x0"),content: String::from("追"),order:3},
  33.             Message{sender: String::from("x0"),content: String::from("击"),order:4}
  34.         ];
  35.         for val in vals {
  36.             tx.send(val).unwrap();
  37.             println!("tx0 is sending....");
  38.             thread::sleep(Duration::from_millis(100));
  39.         }
  40.     });
  41.     //把收到的消息分别放入到两个Vec中,以便于观察顺序。
  42.     let mut v0 = Vec::new();
  43.     let mut v1 = Vec::new();
  44.     let mut qty=0;
  45.     //利用rx的迭代器接收消息. 无需显示调用rec(),或者try_recv(),迭代器会自动调用。
  46.     for received in rx {
  47.         if received.sender == "x0" {
  48.             v0.push(received);
  49.         }
  50.         else{
  51.             v1.push(received);
  52.         }
  53.         qty += 1;
  54.         if qty==5 { // 收到5条消息就退出循环。看看会不会有什么问题
  55.             break;
  56.         }
  57.     }
  58.     let msg0 = get_msg(v0);
  59.     let msg1 = get_msg(v1);
  60.     println!("从t0收到的消息是: {}", msg0);
  61.     println!("从t1收到的消息是: {}", msg1);
  62.     if msg0!=msg1 {
  63.         println!("那一份消息是真的?应该信任谁?还是说都是真的?\n 如果都是真的,为什么会这样?");
  64.     }
  65. }
  66. fn get_msg(msg:Vec<Message>) -> String   {
  67.     let mut result = String::new();
  68.     for m in msg {
  69.         result.push_str(&m.content);
  70.     }
  71.     return result;
  72. }
复制代码
当主线程中,设置为接收5条消息:

当主线程中,设置为接收20条(根本不可能):

 
2.2 例子2_try_recv()

很遗憾,第一个例子么有try_recv(),所以这里接纳这个试试。
根据描述try_recv()不阻塞主线程,所以可以干一些其它的事情!!!这是我感兴趣的地方,所以把例子稍微修改了下!
  1. //利用try_recv()接收消息,并打印出来。
  2.     loop {
  3.         match rx.try_recv() {
  4.             Ok(val) => {
  5.                 if val.sender == "x0" {
  6.                     v0.push(val);
  7.                 }
  8.                 else{
  9.                     v1.push(val);
  10.                 }
  11.                 qty+=1;
  12.             }
  13.             Err(_) => {
  14.                 if qty==8 {
  15.                     break;
  16.                 }
  17.                 thread::sleep(Duration::from_millis(200));
  18.             }
  19.         }
  20.         println!("...快点啊!");
  21.     }
复制代码
其余部分同示例1,详细略。
效果如下:

用这个loop结构,有个问题:如何知道消息已经接收完毕了? 示例代码只能写死。 但实际应该不能这样,太不友好!
从这个方面来说,不如用迭代友好! 反之,try_recv()让计算机可以干一点别的事情...,充分使用资源,如果有的话!
2.3、例子3 -不接收
  1. use std::sync::mpsc;
  2. use std::thread;
  3. use std::time::Duration;
  4. #[derive(Debug)]
  5. #[allow(dead_code)]
  6. struct Message {
  7.     sender: String, // 发送者标识
  8.     content: String, // 消息内容
  9.     order: u8, // 消息序号
  10. }
  11. fn main() {
  12.     // --snip--
  13.     let (tx, rx) = mpsc::channel();   //channel() 返回一个元组,分别表示发送和接收者
  14.     let tx1 = tx.clone();
  15.     let handle1=thread::spawn(move || {           // 移动tx1所有权到新线程中。为什么需要move?规定就是这样
  16.         let vals = vec![
  17.             Message{sender: String::from("x1"),content: String::from("风"),order:1},
  18.             Message{sender: String::from("x1"),content: String::from("紧"),order:2},
  19.             Message{sender: String::from("x1"),content: String::from("扯"),order:3},
  20.             Message{sender: String::from("x1"),content: String::from("呼"),order:4}
  21.         ];
  22.         for val in vals {
  23.             tx1.send(val).unwrap();
  24.             println!("tx1 is sending....");
  25.             thread::sleep(Duration::from_millis(200));
  26.         }
  27.     });
  28.     let handle0=thread::spawn(move || {
  29.         let vals = vec![
  30.             Message{sender: String::from("x0"),content: String::from("乘"),order:1},
  31.             Message{sender: String::from("x0"),content: String::from("胜"),order:2},
  32.             Message{sender: String::from("x0"),content: String::from("追"),order:3},
  33.             Message{sender: String::from("x0"),content: String::from("击"),order:4}
  34.         ];
  35.         for val in vals {
  36.             tx.send(val).unwrap();
  37.             println!("tx0 is sending....");
  38.             thread::sleep(Duration::from_millis(100));
  39.         }
  40.     });
  41.     handle0.join().unwrap();
  42.     handle1.join().unwrap();
  43. }
复制代码

 
三、疑问

1.后发先到问题
这个暂时没有结论! 待后续补充!!
2.如何制止发送大概接收
接收者退出即可,即不执行recv,大概try_recv即可!
3.如果没有接收动作,会发送得出去吗?
可以,因为没有报告异常!!!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

怀念夏天

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表