Golang:实时消息交互系统

打印 上一主题 下一主题

主题 1050|帖子 1050|积分 3150

一、项目框架


1.1 Server端:

1.1.1Server:服务器
—OnlineMap:纪录都有哪些用户在线。(key:用户名,value:用户对象),当给一个用户发消息时,在OnlineMap中查询是否在线并找到用户对象找到对应连接。
—Message channel:server用来广播的channel,如果收到一个消息,先将消息写道该channel中,该channel再将消息转发给特定user的channel或者广播给全部user的channel,每个user就会将自己channel中收到的消息发给客户端。
1.1.2User:在线的用户
user这部门有两个goroutine,读写分离模型。
—一个goroutine负责从user channel中读消息,一旦有消息,就会立即将消息发送给客户端。
—handler go:永世壅闭等待客户端发消息,read。
1.2 Client端:

二、底子server构建

2.1 server.go

<1.创建server结构体(Ip、Port)
<2.提供接口创建、初始化server结构体对象
<3.写一个start启动服务器的server的类方法,listen、accept、go handler
<4.Handler方法,用于收到客户端的业务处理
  1. package main
  2. import "net"
  3. import "fmt"
  4. type Server struct {
  5.         Ip string
  6.         Port int
  7. }
  8. func (this *Server) Handler(conn net.Conn) {//创建成功与客户端建立好连接的套接字
  9.         //...当前连接的业务
  10.         fmt.Println("链接建立成功!")
  11. }
  12. //创建一个server的接口-用于创建server对象-仅仅是对外提供的接口
  13. func NewServer(ip string,port int) *Server {
  14.         server := &Server{ //把当前创建对象的地址传给返回值
  15.                 Ip : ip,
  16.                 Port : port,
  17.         }
  18.         return server
  19. }
  20. //启动服务器的方法-server的成员方法
  21. func (this *Server) Start() {
  22.         //socket listen;Listen接口返回listener对象和error
  23.         listener,err := net.Listen("tcp",fmt.Sprintf("%s:%d",this.Ip,this.Port))//"127.0.0.1:8888"
  24.         if err != nil {
  25.                 fmt.Println("Listener accept err:",err)
  26.                 return
  27.         }
  28.         //close listen socket
  29.         defer listener.Close()
  30.         //
  31.         for {
  32.                 //accept; 返回一个连接对象conn和err,这个conn可以中有读写等操作--跟客户端建立成功的套接字
  33.                 conn,err := listener.Accept()
  34.                 if err != nil {
  35.                         fmt.Println("Listener accept err:",err)
  36.                         continue
  37.                 }
  38.                 //do handler业务回调
  39.                 go this.Handler(conn)//让一个go去执行业务,主go成继续循环接收下一个连接
  40.         }
  41. }
复制代码
2.2 main.go

  1. package main //server.go和main.go都属于main包,不需要再Import
  2. func main() {
  3.         server := NewServer("127.0.0.1",8888)
  4.         server.Start()
  5. }
复制代码
二、用户上线及广播功能 

client客户端在server中怎样表现呢?封装一个user类。这个user类中包含一个conn与客户端简历链接,在线的user存到OnlineMap中,每个user对象还会绑定一个channel,用来接收Message channel中的消息,被user的goroutine壅闭监听,收到就会通过conn发送给client客户端。Message channel是接收客户端的消息,Message goroutine壅闭监听channel,一旦有消息,就遍历OnlineMap去通过user channel广播给在线用户。

2.1封装一个User类  

<1.User类中成员变量:name、addr、c(server)、conn(与客户端的连接)
<2.提供创建并初始化User对象的接口,并在初始化时启动goroutine对user channel监听。
<3.监听方法。从user channel中读数据,读出后将数据通过conn写入client。
  1. package main
  2. import "net"
  3. type User struct {
  4.         Name string
  5.         Addr string
  6.         C chan string
  7.         Conn net.Conn
  8. }
  9. //创建一个用户的API
  10. func NewUser(conn net.Conn) *User {
  11.         userAddr := conn.RemoteAddr().String() //从conn获取addr
  12.         user := &User{
  13.                 Name : userAddr,
  14.                 Addr : userAddr,
  15.                 C : make(chan string),
  16.                 Conn : conn,
  17.         }
  18.         //启动监听当前user channel消息的goroutine
  19.         go user.ListenMessage()
  20.         return user
  21. }
  22. //监听当前User channel的方法,一旦有消息,就直接发送给对端客户端
  23. func (this *User) ListenMessage() {
  24.         for {
  25.                 //一直从管道中读数据
  26.                 msg := <-this.C
  27.                 //向客户端写消息
  28.                 this.Conn.Write([]byte(msg + "\n"))
  29.         }
  30. }
复制代码
2.2对Server类的更改

<1.新增成员变量OnlineMap、mapLock(掩护onlinemap)、Message(转发的channel)
<2.server在listen时每accept一个conn就表明一个用户上线,每一个客户端连接都分配一个gorountine,将每一个客户端conn封装成user对象,加入到OnlineMap中,再向Message channel中写数据。
<3.在start服务器时新增gorountine去监听Message有没有收到用户创建连接的消息,收到后就将该消息广播给全部user的user channel。
  1. package main
  2. import "net"
  3. import "fmt"
  4. import "sync"
  5. type Server struct {
  6.         Ip string
  7.         Port int
  8.         //在线用户的列表
  9.         OnlineMap map[string]*User
  10.         mapLock sync.RWMutex
  11.         //消息广播的channel
  12.         Message chan string
  13. }
  14. //监听message广播消息channel的gorountine,一旦有消息就发送给全部在线的user
  15. func (this *Server) ListenMessage() {
  16.         for {
  17.                 msg := <-this.Message//server中start时仅有一个gorountine专门来监听message channel,而每一个用来处理监听user连接业务的gorountine像这一个message channel中写数据
  18.                 //将msg发送给全部在线的User
  19.                 this.mapLock.Lock()
  20.                 for _,cli := range this.OnlineMap {
  21.                         cli.C <- msg //广播给每一个user的channel
  22.                 }
  23.                 this.mapLock.Unlock()
  24.         }
  25. }
  26. func(this *Server) BroadCast(user *User,msg string){
  27.         sendMsg := "["+user.Addr+"]"+user.Name+":"+msg
  28.         //给server类的channel写数据
  29.         this.Message <- sendMsg//将打包好的消息给massage channel,由它进行广播。server端listen监听客户端,连接上后启用goroutine去处理这个conn,将这个conn封装成user,将这个user给到hash
  30. }
  31. //user的goroutine
  32. func (this *Server) Handler(conn net.Conn) {//创建成功与客户端建立好连接的套接字
  33.         //处理上线的用户
  34.         // fmt.Println("链接建立成功!")
  35.         user := NewUser(conn)
  36.         //用户上线,将用户加入到onlineMap中
  37.         this.mapLock.Lock()
  38.         this.OnlineMap[user.Name]=user
  39.         this.mapLock.Unlock()
  40.         //广播当前用户上线消息
  41.         this.BroadCast(user,"已上线")
  42.         //当前的handler不能结束-该goroutine就会死亡,子goroutine就会死亡
  43. }
  44. //创建一个server的接口-用于创建server对象-仅仅是对外提供的接口
  45. func NewServer(ip string,port int) *Server {
  46.         server := &Server{ //把当前创建对象的地址传给返回值
  47.                 Ip : ip,
  48.                 Port : port,
  49.                 OnlineMap : make(map[string]*User),
  50.                 Message : make(chan string),
  51.         }
  52.         return server
  53. }
  54. //启动服务器的方法-server的成员方法
  55. func (this *Server) Start() {
  56.         //socket listen;Listen接口返回listener对象和error
  57.         listener,err := net.Listen("tcp",fmt.Sprintf("%s:%d",this.Ip,this.Port))//"127.0.0.1:8888"
  58.         if err != nil {
  59.                 fmt.Println("Listener accept err:",err)
  60.                 return
  61.         }
  62.         //close listen socket
  63.         defer listener.Close()
  64.         //启动监听Message的gorountine
  65.         go this.ListenMessage()
  66.         //
  67.         for {
  68.                 //accept; 返回一个连接对象conn和err,这个conn可以中有读写等操作--跟客户端建立成功的套接字
  69.                 conn,err := listener.Accept()//此时就有用户上线了
  70.                 if err != nil {
  71.                         fmt.Println("Listener accept err:",err)
  72.                         continue
  73.                 }
  74.                 //do handler业务回调
  75.                 go this.Handler(conn)//让一个go去执行业务,主go成继续循环接收下一个连接
  76.         }
  77. }
复制代码
三、用户消息广播功能 

在接收每个client的conn的go程中又创建子go程去壅闭从conn中读数据,读不到就以为对方下线。
  1. //user的goroutine,从client->onlinemap->message channel的线
  2. func (this *Server) Handler(conn net.Conn) {//创建成功与客户端建立好连接的套接字
  3.         //处理上线的用户
  4.         // fmt.Println("链接建立成功!")
  5.         user := NewUser(conn)
  6.         //用户上线,将用户加入到onlineMap中
  7.         this.mapLock.Lock()
  8.         this.OnlineMap[user.Name]=user
  9.         this.mapLock.Unlock()
  10.         //广播当前用户上线消息
  11.         this.BroadCast(user,"已上线")
  12.         //接收客户端发送的消息,一直监听,从conn中读;
  13.         go func() {
  14.                 buf := make([]byte,4096)
  15.                 for {
  16.                         //Read(b []byte) (n int,err error) 读成功返回n为读到的字节数,读失败err
  17.                         n,err := conn.Read(buf)
  18.                         if n==0 {
  19.                                 this.BroadCast(user,"下线") //读不到消息就是下线了吗?依旧可...
  20.                                 return
  21.                         }
  22.                         if err != nil && err != io.EOF {//不为空且不是读到文件末尾
  23.                                 fmt.Println("Conn Read err:",err)
  24.                                 return
  25.                         }
  26.                         //提取用户的消息(去除'\n')
  27.                         msg := string(buf[:n-1])//从0到n-1
  28.                         //将得到的消息进行广播
  29.                         this.BroadCast(user,msg)
  30.                 }
  31.         }()
  32.         //当前的handler不能结束-该goroutine就会死亡,子goroutine就会死亡
  33.         select {}//让当前handler阻塞????
  34. }
复制代码
四、用户业务封装

<2.在User类中新增Server关联(组合),通过关联可以对server中的属性和方法操纵。 
<1.将用户上线、用户下线、用户消息处理在User类中封装。
  1. //用户上线的业务
  2. func (this *User) Online() {
  3.         //用户上线,将用户加入到onlineMap中
  4.         this.server.mapLock.Lock()
  5.         this.server.OnlineMap[this.Name]=this
  6.         this.server.mapLock.Unlock()
  7.         //广播当前用户上线消息
  8.         this.server.BroadCast(this,"已上线")
  9. }
  10. //用户下线的业务
  11. func (this *User) Offline() {
  12.         //用户下线,将用户从onlineMap中删除
  13.         this.server.mapLock.Lock()
  14.         delete(this.server.OnlineMap,this.Name)
  15.         this.server.mapLock.Unlock()
  16.         //广播当前用户上线消息
  17.         this.server.BroadCast(this,"下线")
  18. }
  19. //用户处理消息的业务,将该用户(客户端的)消息广播
  20. func (this *User) DoMessage(msg string) {
  21.         this.server.BroadCast(this,msg)
  22. }
复制代码
五、在线用户查询 

遍历OnlineMap表通过conn发送给客户端全部在线用户
  1. //给当前User对应的客户端发送消息
  2. func (this *User) SendMsg(msg string) {
  3.         this.Conn.Write([]byte(msg))
  4. }
  5. //用户处理消息的业务,将该用户(客户端的)消息广播
  6. func (this *User) DoMessage(msg string) {
  7.         if msg == "who" {
  8.                 //查询当前在线用户都有哪些
  9.                 this.server.mapLock.Lock()
  10.                 for _,user := range this.server.OnlineMap {
  11.                         OnlineMsg := "["+user.Addr+"]"+user.Name+":"+"在线...\n"
  12.                         this.SendMsg(OnlineMsg)
  13.                 }
  14.                 this.server.mapLock.Unlock()
  15.         } else {
  16.                 this.server.BroadCast(this,msg)
  17.         }
  18. }
复制代码
六、修改用户名 

新用户名不能存在,如果存在提示。不存在就开始在OnlineMap删除旧用户名,添加新用户名。
  1. else if len(msg) > 7 && msg[:7]=="rename|"{
  2.                 //消息格式:renanme|张三
  3.                 newName := strings.Split(msg,"|")[1]
  4.                 //判断name是否存在
  5.                 _, ok := this.server.OnlineMap[newName]
  6.                 if ok {
  7.                         this.SendMsg("当前用户名被使用\n")
  8.                 }else{
  9.                         this.server.mapLock.Lock()
  10.                         delete(this.server.OnlineMap,this.Name)
  11.                         this.server.OnlineMap[newName]=this
  12.                         this.server.mapLock.Unlock()
  13.                         this.Name = newName
  14.                         this.SendMsg("您已经更新用户名:"+this.Name +"\n")
  15.                 }
  16.         }
复制代码
七、超时强踢功能

 如果用户长时间不保持活跃,就断开它的连接。如果Read到用户数据就重置定时器,如果超过十秒没有收到用户消息,就将连接断开。
  1. //当前的handler不能结束-该goroutine就会死亡,子goroutine就会死亡
  2.         for {
  3.                 select { //select会阻塞监控管道channel的消息,如果超时会满足case,触发,select不会阻塞
  4.                         case <-isLive:
  5.                                 //当前用户活跃应重置定时器--不做处理,只为了激活select顺序执行time.After重置定时器
  6.                         case <-time.After(time.Second*10): //time.After 返回一个单向通道(<-chan time.Time),该通道在指定的时间间隔后发送当前时间值‌
  7.                         //如果超时,将当前的user强制关闭
  8.                         user.SendMsg("你被超时踢出了")
  9.                         close(user.C)//关闭channel
  10.                         conn.Close()//关闭连接
  11.                         return//退出当前hanler
  12.                 }
  13.         }
复制代码
八、用户私聊功能 

<1.目前只实现了将用户的消息进行广播的功能。客户端先通过who查询在线用户,再发送给服务器具有格式的消息指明私聊消息发送给哪位在线用户;
<2.从msg中拿到对方userName,在OnlineMap找到它的user对象,用它的user对象调用sendMsg函数通过conn给其发送消息。
<3.私聊消息的本质是在OnlineMap中保存的多个用户的连接中找到目的用户的长连接conn向它write。从client A的conn读到的消息进行处理后在Map找到它要发送的client B的conn去write。
  1. else if len(msg) > 4 && msg[:3] == "to" {
  2.                 //消息格式:to|张三|消息内容
  3.                 //<1.获取对方的用户名
  4.                 remoteName := strings.Split(msg,"|")[1]
  5.                 if remoteName == "" {
  6.                         this.SendMsg("消息格式不正确,请使用"to|张三|你好啊"格式,\n")
  7.                         return
  8.                 }
  9.                 //<2.根据用户名得到对方的user对象
  10.                 remoteUser,ok := this.server.OnlineMap[remoteName]
  11.                 if !ok {
  12.                         this.SendMsg("该用户名不存在\n")
  13.                         return
  14.                 }
  15.                 //<3.获取消息内容,通过对方的User对象将消息发送过去
  16.                 content := strings.Split(msg,"|")[2]
  17.                 if content == "" {
  18.                         this.SendMsg("无消息内容,请重发\n")
  19.                         return
  20.                 }
  21.                 remoteUser.SendMsg(this.Name + "对您说:"+content) //从msg中拿到对方userName,在OnlineMap找到它的user对象,用它的user对象调用sendMsg函数通过conn给其发送消息。
  22.         }
复制代码
九、客户端基本构建 

  1. package main
  2. import "net"
  3. import "fmt"
  4. type Client struct {
  5.         ServerIp string //服务器Ip
  6.         ServerPort int
  7.         Name string //客户端名称
  8.         conn net.Conn //连接句柄
  9. }
  10. func NewClient(serverIp string,serverPort int) *Client {
  11.         //创建客户端对象
  12.         client := &Client{
  13.                 ServerIp: serverIp,
  14.                 ServerPort: serverPort,
  15.         }
  16.         //连接server,客户端去连接服务器func Dial(network,address string) (Conn,error)
  17.         conn, err := net.Dial("tcp",fmt.Sprintf("%s:%d",serverIp,serverPort))
  18.         if err != nil {
  19.                 fmt.Println("net.Dial error:",err)
  20.                 return nil
  21.         }
  22.         client.conn = conn
  23.         //返回对象
  24.         return client
  25. }
  26. func main() {
  27.         client := NewClient("127.0.0.1",8888)
  28.         if client == nil{
  29.                 fmt.Println(">>>>>连接服务器失败>>>>>")
  30.                 return
  31.         }
  32.         fmt.Println(">>>>>连接服务器成功>>>>>")
  33.         //启动客户端的业务
  34.         select {}
  35. }
复制代码
十、解析命令行 

  1. var serverIp string
  2. var serverPort int
  3. func init() { //将两个形参绑定到flag包中
  4.         flag.StringVar(&serverIp,"ip","127.0.0.1","设置服务器IP地址(默认是127.0.0.1)")
  5.         flag.IntVar(&serverPort,"port",8888,"设置服务器Port地址(默认是8888)")
  6. }
  7. func main() {
  8.         //命令行解析
  9.         flag.Parse()
  10. }
复制代码
十一、添加客户端菜单

  1. func (client *Client) menu() bool {
  2.         var flag int
  3.         fmt.Println("************************************")
  4.         fmt.Println("1.公聊模式")
  5.         fmt.Println("2.私聊模式")
  6.         fmt.Println("3.更新用户名")
  7.         fmt.Println("0.退出")
  8.         fmt.Println("************************************")
  9.         fmt.Scanln(&flag)
  10.         if flag >= 0 && flag <= 3{
  11.                 client.flag = flag
  12.                 return true
  13.         }else {
  14.                 fmt.Println(">>>>请输入合法范围内的数字<<<<")
  15.                 return false
  16.         }
  17. }
  18. func (client *Client) Run() {
  19.         for client.flag != 0{
  20.                 for client.menu() != true {//阻塞式等待用户输入正确的flag或者退出
  21.                 }
  22.                 //根据不同的模式处理不同的业务
  23.                 switch client.flag {
  24.                 case 1:
  25.                         //公聊模式
  26.                         fmt.Println("公聊模式")
  27.                         break
  28.                 case 2:
  29.                         //私聊模式
  30.                         fmt.Println("私聊模式")
  31.                         break
  32.                 case 3:
  33.                         //更新用户名
  34.                         fmt.Println("更新用户名")
  35.                         break
  36.                 }
  37.         }
  38. }
复制代码
十二、公聊、私聊、更新用户名接口

12.1更新用户名接口实现

  1. func (client *Client) UpdateName() bool {
  2.         fmt.Println(">>>>>请输入用户名<<<<<")
  3.         fmt.Scanln(&client.Name)
  4.         sendMsg := "rename|" + client.Name +"\n"
  5.         _,err := client.conn.Write([]byte(sendMsg)) //从命令行中读消息并发送给服务器
  6.         if err != nil {
  7.                 fmt.Println("conn.Write err:",err)
  8.                 return false
  9.         }
  10.         return true
  11. }
复制代码
12.2公聊模式接口实现

  1. func (client *Client) PublicChat() {
  2.         //提示用户输入消息
  3.         var chatMsg string
  4.         fmt.Println(">>>>>请输入聊天内容,exit退出")
  5.         fmt.Scanln(&chatMsg)
  6.        
  7.         for chatMsg != "exit"{
  8.                 //发给服务器
  9.                 if len(chatMsg) != 0{
  10.                         sendMsg := chatMsg + "\n"//协议中有\n
  11.                         _,err := client.conn.Write([]byte(sendMsg)) //发送给服务器
  12.                         if err != nil {
  13.                                 fmt.Println("conn Write err:",err)
  14.                                 break
  15.                         }
  16.                 }
  17.                 //只要用户不退出,就一直在公聊模式中
  18.                 chatMsg = ""
  19.                 fmt.Println(">>>>>请输入聊天内容,exit退出")
  20.                 fmt.Scanln(&chatMsg)
  21.         }
  22. }
复制代码
12.3私聊模式接口实现 

  1. func (client *Client) PrivateChat() {
  2.         var remoteName string
  3.         var chatMsg string
  4.         client.SelectUsers()
  5.         fmt.Println(">>>>>请输入聊天对象[用户名],exit退出:")
  6.         fmt.Scanln(&remoteName)
  7.         for remoteName != "exit"{
  8.                 fmt.Println(">>>>>请输入消息内容,exit退出")
  9.                 fmt.Scanln(&chatMsg)
  10.                 for chatMsg != "exit" {
  11.                         //消息不为空则发送
  12.                         if len(chatMsg) != 0{
  13.                                 sendMsg := "to|" + remoteName + "|" + chatMsg + "\n\n"
  14.                                 _,err := client.conn.Write([]byte(sendMsg))
  15.                                 if err != nil {
  16.                                         fmt.Println("conn Write err:",err)
  17.                                         break
  18.                                 }
  19.                         }
  20.                         chatMsg = ""
  21.                         fmt.Println(">>>>>请输入消息内容,exit退出")
  22.                         fmt.Scanln(&chatMsg)
  23.                 }
  24.                 remoteName = ""
  25.                 client.SelectUsers()
  26.                 fmt.Println(">>>>>请输入聊天对象[用户名],exit退出:")
  27.                 fmt.Scanln(&remoteName)
  28.         }
  29. }
复制代码



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

尚未崩坏

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表