基于Go语言实现一个网络谈天室(连接Redis版)

打印 上一主题 下一主题

主题 1676|帖子 1676|积分 5028

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
基于Go语言和Redis的实时谈天室项目详解

项目概述

在这个项目中,我们实现了一个基于Go语言和Redis的实时谈天室系统。该系统允许用户通过客户端连接到服务器,进行实时谈天,而且支持以下功能:


  • 用户网名注册和验证
  • 消息广播和接收
  • 心跳检测和自动重连
  • 用户活跃度统计和排名
  • 消息存储到Redis
技术栈



  • Go语言:用于实现客户端和服务器逻辑
  • Redis:用于存储用户活跃度和谈天记录
  • TCP协议:用于客户端和服务器之间的通信
项目结构

项目由三个重要文件构成:

  • client.go:客户端逻辑
  • server.go:服务器逻辑
  • utils.go:工具函数(消息发送和读取)
详细实现

客户端实现(client.go)

1. 连接到服务器

客户端通过TCP协议连接到服务器:
  1. conn, err := net.Dial("tcp", "localhost:8080")
  2. if err != nil {
  3.         log.Fatal("连接服务器时出错:", err)
  4. }
  5. defer conn.Close()
复制代码
2. 输入网名

用户输入网名并发送到服务器:
  1. fmt.Print("请输入你的网名: ")
  2. nameInput, err := reader.ReadString('\n')
  3. name = strings.TrimSpace(nameInput)
  4. err = utils.SendMessage(conn, []byte(name))
复制代码
3. 消息发送和接收

用户输入消息后,通过utils.SendMessage发送到服务器:
  1. message, err := reader.ReadString('\n')
  2. err = utils.SendMessage(conn, []byte(message))
复制代码
服务器发送的消息通过handleServerMessages函数接收并打印:
  1. func handleServerMessages(conn *net.Conn) {
  2.         reader := bufio.NewReader(*conn)
  3.         for {
  4.                 message, err := utils.ReadMessage(reader)
  5.                 if err != nil {
  6.                         // 处理错误和重连逻辑
  7.                 }
  8.                 fmt.Println(string(message))
  9.         }
  10. }
复制代码
4. 心跳检测和自动重连

客户端会接收服务器的心跳检测消息(PING),并发送PONG响应:
  1. if string(message) == "PING" {
  2.         log.Printf("【心跳检测】接收到服务端 PING,发送 PONG 响应")
  3.         utils.SendMessage(*conn, []byte("PONG"))
  4. }
复制代码
如果连接断开,客户端会尝试重新连接:
  1. func reconnect(oldConn net.Conn) (net.Conn, bool) {
  2.         for i := 0; i < 3; i++ {
  3.                 newConn, err := net.Dial("tcp", "localhost:8080")
  4.                 if err == nil {
  5.                         oldConn.Close()
  6.                         return newConn, true
  7.                 }
  8.                 time.Sleep(time.Duration(2<<uint(i)) * time.Second)
  9.         }
  10.         return nil, false
  11. }
复制代码
服务器实现(server.go)

1. 初始化Redis

服务器使用Redis存储用户活跃度和谈天记录:
  1. redisClient := redis.NewClient(&redis.Options{
  2.         Addr:     "localhost:6379",
  3.         Password: "",
  4.         DB:       0,
  5. })
复制代码
2. 客户端管理

服务器维护一个ChatRoom结构体,管理所有在线客户端:
  1. type ChatRoom struct {
  2.         Clients map[*Client]bool
  3.         Join    chan *Client
  4.         Leave   chan *Client
  5.         Message chan []byte
  6.         Redis   *redis.Client
  7.         mu      sync.RWMutex
  8. }
复制代码
3. 消息广播

服务器接收客户端的消息,并广播到所有在线客户端:
  1. func (cr *ChatRoom) Run() {
  2.         for {
  3.                 select {
  4.                 case message := <-cr.Message:
  5.                         cr.mu.RLock()
  6.                         for client := range cr.Clients {
  7.                                 client.Send <- message
  8.                         }
  9.                         cr.mu.RUnlock()
  10.                         batchStoreToRedis(ctx, cr.Redis, message)
  11.                 }
  12.         }
  13. }
复制代码
4. 心跳检测

服务器定期向客户端发送PING消息,并等待PONG响应:
  1. ticker := time.NewTicker(10 * time.Second)
  2. heartbeatTimeout := time.NewTimer(15 * time.Second)
  3. for {
  4.         select {
  5.         case <-ticker.C:
  6.                 utils.SendMessage(client.Conn, []byte("PING"))
  7.                 heartbeatTimeout.Reset(15 * time.Second)
  8.         case <-heartbeatTimeout.C:
  9.                 cr.Leave <- client
  10.         }
  11. }
复制代码
5. Redis存储

服务器将消息批量存储到Redis,并更新用户活跃度:
  1. func batchStoreToRedis(ctx context.Context, redisClient *redis.Client, message []byte) {
  2.         queue := redisClient.LRange(ctx, "chat_messages_queue", 0, -1).Val()
  3.         queue = append(queue, string(message))
  4.         if len(queue) >= 10 {
  5.                 pipe := redisClient.Pipeline()
  6.                 for _, msg := range queue {
  7.                         pipe.RPush(ctx, "chat_messages", msg)
  8.                         pipe.ZIncrBy(ctx, "user_activity", 1, extractUsername(msg))
  9.                 }
  10.                 pipe.Exec(ctx)
  11.                 redisClient.Del(ctx, "chat_messages_queue")
  12.         } else {
  13.                 redisClient.RPush(ctx, "chat_messages_queue", string(message))
  14.         }
  15. }
复制代码
工具函数(utils.go)

工具函数提供了消息的发送和读取功能:
  1. func SendMessage(conn net.Conn, message []byte) error {
  2.         length := uint32(len(message))
  3.         err := binary.Write(conn, binary.BigEndian, length)
  4.         if err != nil {
  5.                 return err
  6.         }
  7.         _, err = conn.Write(message)
  8.         return err
  9. }
  10. func ReadMessage(reader *bufio.Reader) ([]byte, error) {
  11.         var length uint32
  12.         err := binary.Read(reader, binary.BigEndian, &length)
  13.         if err != nil {
  14.                 return nil, err
  15.         }
  16.         message := make([]byte, length)
  17.         _, err = io.ReadFull(reader, message)
  18.         return message, err
  19. }
复制代码
重点问题分析

问题 1:心跳检测机制与消息处理分支的逻辑冲突导致服务端无法接收客户端消息

1.1 原始代码结构的问题

在未修复的代码中,服务端的 HandleClient 函数使用了如下逻辑:
  1. for {
  2.     select {
  3.     case <-ticker.C:  // 心跳检测分支(每10秒触发一次)
  4.         sendPing()
  5.     default:          // 默认分支(非阻塞)
  6.         message := readMessage()
  7.         processMessage(message)
  8.     }
  9. }
复制代码
1.2 关键问题分析



  • default 分支的局限性
    default 分支仅在 所有其他 case 未停当时触发
    当 ticker.C 每隔 10 秒触发一次心跳检测时,select 会优先执行 case <-ticker.C,而 default 分支仅在心跳未触发时才会执行。
    这会导致:

    • 消息读取延伸:客户端发送的消息大概堆积在 bufio.Reader 的缓冲区中,但服务端因 default 分支未及时执行而无法读取。
    • 竞争条件:如果客户端在心跳触发时发送消息,服务端会优先处理心跳,消息大概被跳过。

1.3 详细场景模拟

假设客户端连续发送两条消息 消息A 和 消息B,时间线如下:
  1. 时间点 0ms: 服务端开始循环
  2. 时间点 5ms: 客户端发送消息A
  3. 时间点 10ms: ticker.C 触发心跳检测(发送PING)
  4. 时间点 15ms: 客户端发送消息B
复制代码


  • 服务端行为

    • 在 10ms 时,case <-ticker.C 触发,发送 PING。
    • default 分支在 10ms 后才有机会执行,但此时 bufio.Reader 中大概已有 消息A 和 消息B。
    • 由于 default 分支是非阻塞的,服务端大概只读取到部门消息,甚至因心跳频繁触发而完全跳过消息处理。

1.4 Go 的 select 调度机制



  • 随机选择原则
    当多个 case 同时停当时,select 会随机选择一个执行。
    但若某个 case(如 ticker.C)周期性触发,它会频繁占用执行机会,导致其他分支(如消息读取)被“饿死”。
  • default 分支的陷阱
    default 分支的设计初衷是制止阻塞,但它不得当需要连续监听的操作。
    在您的场景中,消息读取需要自动检查缓冲区,而 default 分支无法包管这一点。
1.5 修复方案的核心逻辑

修改后的代码通过以下方式办理问题:
  1. for {
  2.     select {
  3.     case <-ticker.C:           // 心跳检测
  4.         sendPing()
  5.     case <-msgTicker.C:        // 专用消息读取分支(每50ms触发一次)
  6.         message := readMessage()
  7.         processMessage(message)
  8.     }
  9. }
复制代码
1.6 为何有用?


  • 独立的定时器 (msgTicker)
    添加了一个专用的 msgTicker,每隔 50ms 触发一次消息读取。

    • 即使心跳检测占用 select 的执行机会,消息读取仍有独立的触发窗口。
    • 制止了心跳和消息处理的竞争。

  • 消除 default 分支的不可靠性
    用显式的 case <-msgTicker.C 替代 default,确保消息读取按固定频率执行。
1.7 总结:冲突的本质



  • 心跳机制干扰:周期性的心跳检测(ticker.C)占用了 select 的执行机会,导致 default 分支无法及时处理消息。
  • 修复思路:为消息读取分配独立的触发通道(msgTicker),与心跳检测解耦,确保两者互不阻塞。
1.8 类比表明

想象一个餐厅服务员(服务端)需要同时做两件事:

  • 定时检查厨房温度(心跳检测):每10分钟一次。
  • 接待顾客点餐(消息处理):需要随时响应。


  • 原始方案:服务员大部门时间站在厨房门口检查温度,只有偶然看一眼大堂(default 分支),导致顾客长时间无人接待。
  • 修复方案:服务员每隔5分钟自动巡视大堂一次(msgTicker),同时定期检查厨房,两者互不干扰。
问题 2:网名重复导致身份冲突

2.1 前因后果



  • 问题本质:多个用户使用相同网名加入谈天室,导致消息归属杂乱、活跃度统计错误。
  • 根本缘故原由:服务端未验证网名唯一性,直接担当客户端提交的名称,未检查是否已被占用。
  • 详细表现

    • 用户A和用户B使用相同网名“小明”加入后,服务端无法区分两者消息。
    • Redis中user_activity的活跃度分数会被错误累加到同一用户名下。

2.2 详细场景模拟

假设用户A和用户B同时尝试使用网名“小明”连接:
  1. 时间点 0ms: 用户A发送网名“小明” → 服务端接受并加入。
  2. 时间点 50ms: 用户B发送网名“小明” → 服务端未检查唯一性,直接加入。
  3. 时间点 100ms: 用户A发送消息“你好”,用户B发送消息“大家好”。
复制代码


  • 服务端行为

    • 用户A和用户B的消息均被标记为“小明: 消息内容”。
    • Redis中用户“小明”的活跃度分数错误累加为2(实际应为两个独立用户)。

2.3 Go 语言机制分析



  • 并发写入冲突:多个协程同时操作Redis集适时,若未加锁大概导致数据竞争。
  • 聚集操作的原子性:Redis的SADD和SISMEMBER命令是原子操作,但Go代码中需确保逻辑次序正确。
2.4 办理方案


  • Redis 聚集管理在线用户

    • 用户加入时,检查网名是否存在于聚集online_users:
      1. exists, err := cr.Redis.SIsMember(ctx, "online_users", clientName).Result()
      2. if exists {
      3.     utils.SendMessage(conn, []byte("ERROR: 网名已被占用"))
      4.     return
      5. }
      复制代码
    • 网名正当后添加到聚集:
      1. cr.Redis.SAdd(ctx, "online_users", clientName)
      复制代码

  • 退出时清理网名

    • 使用defer确保客户端断开时从聚会合移除:
      1. defer cr.Redis.SRem(ctx, "online_users", clientName)
      复制代码

2.5 总结类比



  • 问题类比:多人共用同一身份证号登记入住酒店,前台无法区分客人。
  • 修复思路:前台要求每位客人提供唯一身份证号,登记前查重,退房时注销。
问题 3:空消息和空网名导致无效数据

3.1 前因后果



  • 问题本质:用户输入空缺内容作为网名或消息,导致系统处理无效数据。
  • 根本缘故原由:客户端和服务端未对输入内容做非空验证。
  • 详细表现

    • 空网名:用户直接回车加入,服务端记录空名称,广播消息时无法标识来源。
    • 空消息:用户发送空格或回车,占用网络带宽和存储资源。

3.2 详细场景模拟

假设用户执行以下操作:
  1. 时间点 0ms: 用户输入空网名(直接回车) → 服务端允许加入。
  2. 时间点 100ms: 用户发送空消息(空格 + 回车) → 服务端广播“:  ”。
复制代码


  • 服务端行为

    • 网名为空的客户端加入,广播“系统广播: 加入了谈天室”。
    • 空消息被广播到所有客户端,消息内容偶然义。

3.3 Go 语言机制分析



  • 字符串处理:strings.TrimSpace可过滤首尾空缺字符,但需自动调用。
  • 输入流阻塞:客户端的ReadString大概读取到换行符,需显式检查内容是否为空。
3.4 办理方案


  • 客户端验证

    • 循环读取网名直到非空:
      1. for {
      2.     fmt.Print("请输入网名: ")
      3.     name = strings.TrimSpace(reader.ReadString())
      4.     if name != "" {
      5.         break
      6.     }
      7.     fmt.Println("网名不能为空!")
      8. }
      复制代码
    • 发送消息前检查内容:
      1. message = strings.TrimSpace(input)
      2. if message == "" {
      3.     fmt.Println("消息不能为空!")
      4.     continue
      5. }
      复制代码

  • 服务端二次过滤

    • 网名非空检查:
      1. if clientName == "" {
      2.     utils.SendMessage(conn, []byte("ERROR: 网名不能为空"))
      3.     return
      4. }
      复制代码
    • 消息内容过滤:
      1. msgContent := strings.TrimSpace(string(message))
      2. if msgContent == "" {
      3.     log.Printf("客户端 %s 发送了空消息,已忽略", client.Name)
      4.     continue
      5. }
      复制代码

3.5 总结类比



  • 问题类比:用户向邮箱发送空缺信件,邮局仍派送,浪费资源。
  • 修复思路:邮局拒绝投递无内容信件,并要求寄件人填写有用地点。
问题 4:心跳超时导致僵尸连接

4.1 前因后果



  • 问题本质:客户端非常退出后,服务端未检测到离线状态,维持无效连接。
  • 根本缘故原由:服务端仅依赖客户端显式退出信号,缺乏被动检测机制。
  • 详细表现

    • 客户端断网后,服务端连续等待消息,占用内存和连接资源。
    • Redis中online_users聚集保留无效网名,影响新用户注册。

4.2 详细场景模拟

假设客户端因网络故障断开:
  1. 时间点 0ms: 服务端发送PING。
  2. 时间点 10ms: 客户端未响应(已断开)。
  3. 时间点 25ms: 服务端仍认为客户端在线,未清理资源。
复制代码


  • 服务端行为

    • 客户端连接残留,占用Clients聚集和TCP端口。
    • 新用户无法使用相同网名注册。

4.3 Go 语言机制分析



  • 计时器管理:time.Timer需手动重置,制止误触发。
  • 协程走漏风险:未关闭的协程大概连续占用内存。
4.4 办理方案


  • 心跳检测与超机会制

    • 服务端每10秒发送PING:
      1. ticker := time.NewTicker(10 * time.Second)
      2. defer ticker.Stop()
      复制代码
    • 客户端响应PONG后重置超时计时器:
      1. case <-msgTicker.C:
      2.     if message == "PONG" {
      3.         heartbeatTimeout.Reset(15 * time.Second)
      4.     }
      复制代码

  • 超时逼迫断开

    • 若15秒未收到PONG,判断客户端离线:
      1. heartbeatTimeout := time.NewTimer(15 * time.Second)
      2. defer heartbeatTimeout.Stop()
      3. select {
      4. case <-heartbeatTimeout.C:
      5.     cr.Leave <- client
      6.     return
      7. }
      复制代码

4.5 总结类比



  • 问题类比:电话通话中对方突然静默,但未挂断,导致线路不停被占用。
  • 修复思路:运营商设定“无响应超时”,若一段时间无声音,自动挂断开释线路。
总结

通过上述步伐,项目实现了:

  • 身份唯一性:Redis聚集保障网名全局唯一。
  • 数据有用性:双重验证过滤空输入。
  • 连接康健性:心跳超机会制自动清理僵尸连接。
完整代码

client.go(客户端)

  1. package main
  2. import (
  3.         "bufio"
  4.         "fmt"
  5.         "log"
  6.         "net"
  7.         "os"
  8.         "strings"
  9.         "time"
  10.         "Learn/kaohe/redis2/utils"
  11. )
  12. // handleServerMessages 负责处理从服务器接收到的消息
  13. func handleServerMessages(conn *net.Conn) {
  14.         reader := bufio.NewReader(*conn) // 创建一个读取器,用于从连接中读取消息
  15.         for {
  16.                 message, err := utils.ReadMessage(reader) // 从服务器读取消息
  17.                 if err != nil {
  18.                         log.Println("接收服务器消息时出错:", err)
  19.                         // 如果连接断开,尝试重新连接
  20.                         newConn, ok := reconnect(*conn)
  21.                         if !ok {
  22.                                 return // 如果重新连接失败,退出处理
  23.                         }
  24.                         *conn = newConn                 // 更新连接
  25.                         reader = bufio.NewReader(*conn) // 重新创建读取器
  26.                         continue
  27.                 }
  28.                 log.Printf("接收到消息: %s", string(message))
  29.                 fmt.Println(string(message)) // 打印消息到控制台
  30.                 // 如果收到的是 PING 消息,发送 PONG 响应
  31.                 if string(message) == "PING" {
  32.                         log.Printf("【心跳检测】接收到服务端 PING,发送 PONG 响应")
  33.                         err := utils.SendMessage(*conn, []byte("PONG"))
  34.                         if err != nil {
  35.                                 log.Println("发送心跳响应时出错:", err)
  36.                         }
  37.                 }
  38.         }
  39. }
  40. // reconnect 尝试重新连接到服务器
  41. func reconnect(oldConn net.Conn) (net.Conn, bool) {
  42.         for i := 0; i < 3; i++ { // 最多尝试 3 次
  43.                 log.Printf("尝试第 %d 次重新连接服务器...", i+1)
  44.                 newConn, err := net.Dial("tcp", "localhost:8080") // 尝试连接服务器
  45.                 if err == nil {
  46.                         log.Println("重新连接服务器成功")
  47.                         oldConn.Close()      // 关闭旧连接
  48.                         return newConn, true // 返回新连接
  49.                 }
  50.                 time.Sleep(time.Duration(2<<uint(i)) * time.Second) // 指数退避等待
  51.         }
  52.         log.Println("多次尝试重新连接服务器失败,退出程序")
  53.         return nil, false // 重新连接失败
  54. }
  55. func main() {
  56.         conn, err := net.Dial("tcp", "localhost:8080") // 连接到服务器
  57.         if err != nil {
  58.                 log.Fatal("连接服务器时出错:", err)
  59.         }
  60.         defer conn.Close() // 程序结束时关闭连接
  61.         go handleServerMessages(&conn) // 启动一个 goroutine 处理服务器消息
  62.         reader := bufio.NewReader(os.Stdin) // 创建一个读取器,用于读取用户输入
  63.         // 循环读取网名直到输入有效
  64.         var name string
  65.         for {
  66.                 fmt.Print("请输入你的网名: ")
  67.                 nameInput, err := reader.ReadString('\n') // 读取用户输入的网名
  68.                 if err != nil {
  69.                         log.Fatal("读取网名时出错:", err)
  70.                 }
  71.                 name = strings.TrimSpace(nameInput) // 去除多余空格
  72.                 if name == "" {
  73.                         fmt.Println("网名不能为空,请重新输入!")
  74.                         continue
  75.                 }
  76.                 break
  77.         }
  78.         err = utils.SendMessage(conn, []byte(name)) // 向服务器发送网名
  79.         if err != nil {
  80.                 log.Fatal("发送网名时出错:", err)
  81.         }
  82.         fmt.Println("你已成功加入聊天室,可以开始聊天了!")
  83.         for {
  84.                 message, err := reader.ReadString('\n') // 读取用户输入的消息
  85.                 if err != nil {
  86.                         log.Println("读取用户输入时出错:", err)
  87.                         break
  88.                 }
  89.                 message = strings.TrimSpace(message) // 去除多余空格
  90.                 if message == "" {
  91.                         fmt.Println("消息不能为空,请重新输入!")
  92.                         continue
  93.                 }
  94.                 if message == "/quit" { // 如果输入是 "/quit",退出聊天室
  95.                         fmt.Println("你已退出聊天室")
  96.                         utils.SendMessage(conn, []byte("/quit"))
  97.                         break
  98.                 }
  99.                 err = utils.SendMessage(conn, []byte(message)) // 向服务器发送消息
  100.                 log.Printf("DEBUG: 客户端尝试发送消息: %s, 错误: %v", message, err)
  101.                 if err != nil {
  102.                         log.Println("发送消息到服务器时出错:", err)
  103.                         // 如果发送失败,尝试重新连接
  104.                         newConn, ok := reconnect(conn)
  105.                         if !ok {
  106.                                 return
  107.                         }
  108.                         conn = newConn
  109.                         err = utils.SendMessage(conn, []byte(name)) // 重新发送网名
  110.                         if err != nil {
  111.                                 log.Fatal("重新连接后发送网名时出错:", err)
  112.                         }
  113.                 }
  114.         }
  115. }
复制代码
server.go(服务端)

  1. package mainimport (        "bufio"        "context"        "fmt"        "io"        "log"        "net"        "strings"        "sync"        "time"        "Learn/kaohe/redis2/utils"        "github.com/redis/go-redis/v9")// Client 表示一个客户端连接type Client struct {        Name string      // 客户端的网名        Conn net.Conn    // 客户端的连接        Send chan []byte // 用于向客户端发送消息的通道}// ChatRoom 表示谈天室,管理所有客户端连接type ChatRoom struct {        Clients map[*Client]bool // 当前在线的客户端        Join    chan *Client     // 新客户端加入的通道        Leave   chan *Client     // 客户端离开的通道        Message chan []byte      // 广播消息的通道        Redis   *redis.Client    // Redis 客户端        mu      sync.RWMutex     // 用于并发控制的互斥锁}// NewChatRoom 创建一个新的谈天室func NewChatRoom(redisClient *redis.Client) *ChatRoom {        return &ChatRoom{                Clients: make(map[*Client]bool), // 初始化客户端列表                Join:    make(chan *Client),     // 初始化加入通道                Leave:   make(chan *Client),     // 初始化离开通道                Message: make(chan []byte, 100), // 初始化消息通道,缓冲大小为 100                Redis:   redisClient,            // 初始化 Redis 客户端        }}// Run 谈天室的主循环,处理客户端的加入、离开和消息广播func (cr *ChatRoom) Run() {        for {                select {                case client := <-cr.Join: // 处理新客户端加入                        cr.mu.Lock()                        cr.Clients[client] = true // 将客户端添加到列表                        cr.mu.Unlock()                        message := fmt.Sprintf("系统广播: %s 加入了谈天室", client.Name)                        cr.Message <- []byte(message) // 广播加入消息                case client := <-cr.Leave: // 处理客户端离开                        cr.mu.Lock()                        if _, ok := cr.Clients[client]; ok {                                delete(cr.Clients, client) // 从列表中删除客户端                                close(client.Send)         // 关闭客户端的发送通道                        }                        cr.mu.Unlock()                        if client.Name != "" {                                message := fmt.Sprintf("系统广播: %s 离开了谈天室", client.Name)                                cr.Message <- []byte(message) // 广播离开消息                        }                case message := <-cr.Message: // 处理广播消息                        cr.mu.RLock()                        log.Printf("准备广播消息: %s", string(message))                        for client := range cr.Clients { // 向所有客户端发送消息                                select {                                case client.Send <- message: // 发送消息                                        log.Printf("已发送消息到客户端 %s", client.Name)                                default:                                        log.Printf("客户端 %s 的发送通道已满", client.Name)                                }                        }                        cr.mu.RUnlock()                        // 将消息存储到 Redis                        ctx := context.Background()                        batchStoreToRedis(ctx, cr.Redis, message)                }        }}// batchStoreToRedis 将消息批量存储到 Redisfunc batchStoreToRedis(ctx context.Context, redisClient *redis.Client, message []byte) {        const batchSize = 10 // 批量存储的大小        // 从 Redis 获取当前队列中的消息        queue := redisClient.LRange(ctx, "chat_messages_queue", 0, -1).Val()        queue = append(queue, string(message)) // 将新消息添加到队列        if len(queue) >= batchSize { // 如果队列到达批量大小                pipe := redisClient.Pipeline() // 创建 Redis 管道                for _, msg := range queue {                        // 将消息存储到 chat_messages 列表                        pipe.RPush(ctx, "chat_messages", msg)                        // 更新用户活跃度                        pipe.ZIncrBy(ctx, "user_activity", 1, extractUsername(msg))                }                _, _ = pipe.Exec(ctx)                       // 执行管道                redisClient.Del(ctx, "chat_messages_queue") // 删除队列        } else {                // 如果队列未到达批量大小,将消息存储到队列                redisClient.RPush(ctx, "chat_messages_queue", string(message))        }}// extractUsername 从消息中提取用户名func extractUsername(message string) string {        parts := strings.SplitN(message, ": ", 2)        if len(parts) > 0 {                return parts[0]        }        return ""}// HandleClient 处理单个客户端的连接func (cr *ChatRoom) HandleClient(conn net.Conn) {        var clientName string        defer func() {                conn.Close()                                                 // 关闭连接                cr.Leave <- &Client{Name: clientName, Conn: conn, Send: nil} // 关照谈天室客户端离开                log.Printf("客户端连接已关闭: %s", conn.RemoteAddr())        }()        reader := bufio.NewReader(conn)             // 创建读取器        nameBytes, err := utils.ReadMessage(reader) // 读取客户端的网名        if err != nil {                log.Println("读取网名时出错:", err)                return        }        clientName = strings.TrimSpace(string(nameBytes))        // 检查网名是否为空        if clientName == "" {                log.Println("客户端尝试使用空网名连接")                utils.SendMessage(conn, []byte("ERROR: 网名不能为空"))                return        }        // 检查网名是否已存在        ctx := context.Background()        exists, err := cr.Redis.SIsMember(ctx, "online_users", clientName).Result()        if err != nil {                log.Printf("检查网名 %s 时 Redis 出错: %v", clientName, err)                utils.SendMessage(conn, []byte("ERROR: 服务器内部错误"))                return        }        if exists {                log.Printf("网名 %s 已存在,拒绝连接", clientName)                utils.SendMessage(conn, []byte("ERROR: 网名已被占用,请更换其他名称"))                return        }        // 添加网名到在线用户聚集        if _, err := cr.Redis.SAdd(ctx, "online_users", clientName)
  2. .Result(); err != nil {                log.Printf("存储网名 %s 到 Redis 失败: %v", clientName, err)                utils.SendMessage(conn, []byte("ERROR: 服务器内部错误"))                return        }        defer cr.Redis.SRem(ctx, "online_users", clientName)
  3.         client := &Client{                Name: clientName,                Conn: conn,                Send: make(chan []byte, 10), // 初始化发送通道        }        cr.Join <- client // 关照谈天室客户端加入        // 启动一个 goroutine 处理发送消息        go func() {                for message := range client.Send {                        err := utils.SendMessage(client.Conn, message)                        if err != nil {                                log.Println("发送消息给客户端时出错:", err)                                cr.Leave <- client // 关照谈天室客户端离开                                break                        }                }        }()        // 定时发送心跳检测        ticker := time.NewTicker(10 * time.Second)        // 定时读取消息        msgTicker := time.NewTicker(50 * time.Millisecond)        // 心跳超时检测        heartbeatTimeout := time.NewTimer(15 * time.Second)        defer func() {                ticker.Stop()                msgTicker.Stop()                heartbeatTimeout.Stop()        }()        for {                select {                case <-ticker.C: // 心跳检测                        log.Printf("【心跳检测】向客户端 %s 发送 PING", client.Name)                        if err := utils.SendMessage(client.Conn, []byte("PING")); err != nil {                                log.Printf("【心跳非常】客户端 %s 心跳发送失败: %v", client.Name, err)                                cr.Leave <- client                                return                        }                        heartbeatTimeout.Reset(15 * time.Second)                case <-heartbeatTimeout.C:                        log.Printf("【心跳超时】客户端 %s 未响应 PONG,逼迫断开", client.Name)                        cr.Leave <- client                        return                case <-msgTicker.C: // 读取消息                        message, err := utils.ReadMessage(reader)                        if err != nil {                                if err == io.EOF {                                        log.Printf("客户端 %s 自动断开连接", client.Name)                                } else {                                        log.Printf("读取消息时出错: %v", err)                                }                                cr.Leave <- client                                return                        }                        if string(message) == "PONG" { // 处理心跳响应                                log.Printf("【心跳响应】客户端 %s 返回 PONG", client.Name)                                heartbeatTimeout.Reset(15 * time.Second)                                continue                        }                        msgContent := strings.TrimSpace(string(message))                        if msgContent == "" {                                log.Printf("客户端 %s 发送了空消息,已忽略", client.Name)                                continue                        }                        log.Printf("接收到来自 %s 的消息: %s", client.Name, msgContent)                        // 构造完整消息并广播                        fullMessage := fmt.Sprintf("%s: %s", client.Name, msgContent)                        cr.Message <- []byte(fullMessage)                }        }}// printActivityRankings 打印用户活跃度排名func printActivityRankings(redisClient *redis.Client) {        ctx := context.Background()        topUsers, err := redisClient.ZRevRangeWithScores(ctx, "user_activity", 0, 9).Result()        if err != nil {                log.Println("获取活跃度排名时出错:", err)                return        }        fmt.Println("活跃度排名:")        for i, user := range topUsers {                fmt.Printf("%d. %s: %.0f\n", i+1, user.Member.(string), user.Score)        }}// PrintConnectedClients 打印当前在线客户端func (cr *ChatRoom) PrintConnectedClients() {        cr.mu.RLock()        defer cr.mu.RUnlock()        log.Println("【活跃客户端】当前在线客户端列表:")        for client := range cr.Clients {                log.Printf(" - %s (IP: %s)", client.Name, client.Conn.RemoteAddr())        }}func main() {        // 初始化 Redis 客户端        redisClient := redis.NewClient(&redis.Options{                Addr:     "localhost:6379",                Password: "",                DB:       0,        })        ctx := context.Background()        _, err := redisClient.Ping(ctx).Result()        if err != nil {                log.Fatal("连接 Redis 时出错:", err)        }        // 监听端口        listener, err := net.Listen("tcp", ":8080")        if err != nil {                log.Fatal("监听端口时出错:", err)        }        defer listener.Close()        fmt.Println("谈天室服务器已启动,等待客户端连接...")        // 创建谈天室        chatRoom := NewChatRoom(redisClient)        go chatRoom.Run() // 启动谈天室        // 定期打印活跃度排名和在线客户端        go func() {                ticker := time.NewTicker(30 * time.Second)                defer ticker.Stop()                for range ticker.C {                        printActivityRankings(redisClient)                        chatRoom.PrintConnectedClients()                }        }()        // 担当客户端连接        for {                conn, err := listener.Accept()                if err != nil {                        log.Println("担当客户端连接时出错:", err)                        continue                }                go chatRoom.HandleClient(conn) // 处理客户端连接        }}
复制代码
utils.go(工具函数,发送和接收消息)

  1. package utils
  2. import (
  3.         "bufio"
  4.         "encoding/binary"
  5.         "fmt"
  6.         "io"
  7.         "log"
  8.         "net"
  9. )
  10. const maxMessageLength = 1 << 20 // 1MB,最大消息长度限制
  11. // SendMessage 向连接发送消息
  12. func SendMessage(conn net.Conn, message []byte) error {
  13.         length := uint32(len(message)) // 消息长度
  14.         if length > maxMessageLength {
  15.                 log.Println("消息长度超出限制:", length)
  16.                 return fmt.Errorf("message too long")
  17.         }
  18.         // 写入消息长度
  19.         err := binary.Write(conn, binary.BigEndian, length)
  20.         if err != nil {
  21.                 log.Printf("写入消息长度时出错: %v", err)
  22.                 return err
  23.         }
  24.         // 写入消息内容
  25.         _, err = conn.Write(message)
  26.         if err != nil {
  27.                 log.Printf("写入消息内容时出错: %v", err)
  28.         }
  29.         return err
  30. }
  31. // ReadMessage 从连接读取消息
  32. func ReadMessage(reader *bufio.Reader) ([]byte, error) {
  33.         var length uint32 // 消息长度
  34.         err := binary.Read(reader, binary.BigEndian, &length)
  35.         if err != nil {
  36.                 log.Printf("读取消息长度时出错: %v", err)
  37.                 return nil, err
  38.         }
  39.         if length > maxMessageLength {
  40.                 log.Println("消息长度超出限制:", length)
  41.                 return nil, fmt.Errorf("message too long")
  42.         }
  43.         // 读取消息内容
  44.         message := make([]byte, length)
  45.         _, err = io.ReadFull(reader, message)
  46.         if err != nil {
  47.                 log.Printf("读取消息内容时出错: %v", err)
  48.                 return nil, err
  49.         }
  50.         return message, nil
  51. }
复制代码
如果对项目有任何问题或发起,接待在评论区留言!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万有斥力

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表