花瓣小跑 发表于 2024-8-5 00:16:53

HTTP SSE (Server-Sent Events)流式服务器推送技术

HTTP SSE (Server-Sent Events)流式服务器推送技术

SSE(Server-Sent Events)是一种基于HTTP协议的服务器推送技术,用于实现服务器向客户端实时发送变乱的功能。在SSE中,服务器可以发送不同类型的消息给客户端。
以下是SSE大概出现的消息类型:

变乱消息(Event Message):服务器发送的一般变乱消息,可以包含自定义的变乱类型和数据。
注释消息(Comment Message):服务器发送的注释消息,用于向客户端传递一些额外的信息,但不会触发任何变乱处理。
重连消息(Reconnect Message):服务器发送的重连消息,用于告知客户端重新连接服务器。
下面是一个完备的HTTP请求返回结果示例,展示了SSE的利用:


HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

id: msgid
event: myEvent
data: {"message": "Hello, SSE!"}

id: msgid
event: myEvent
data: {"message": "Another event"}

: This is a comment

id: msgid
event: reconnect
retry: 5000

在这个示例中,服务器返回的HTTP相应头中指定了Content-Type为text/event-stream,表示这是一个SSE的相应。Cache-Control设置为no-cache,确保每次请求都能从服务器获取最新的数据。Connection设置为keep-alive,保持长连接。
接下来,服务器通过多个变乱消息和注释消息向客户端发送数据。每个消息以event字段指定变乱类型,以data字段包含消息的数据。在示例中,服务器发送了两个变乱消息,分别是myEvent类型的消息,包含不同的数据。还发送了一个注释消息,以冒号开头,用于传递额外的信息。
最后,服务器发送了一个重连消息,以event字段指定为reconnect类型,同时通过retry字段指定了重连的时间间隔为5000毫秒。
客户端可以通过监听SSE变乱,实时吸取服务器发送的消息,并进行相应的处理。
下面是一个利用C#解析SSE示例,展示了SSE的解析过程:



namespace System.Net
{
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Text;

    /// <summary>
    /// 解析SSE 。
    /// </summary>
    public class EventStreamHandler
    {
      /// <summary>
      /// 新的换行符。
      /// </summary>
      private readonly string newLine;
      /// <summary>
      /// 消息体与消息体之间的空白行的数量。
      /// </summary>
      private const int emptySplitLineCount = 1;
      /// <summary>
      /// 最后1个消息体与流结束符的空白行的数量。
      /// </summary>
      private readonly static int emptySplitLineCountEnd = 1;

      private string idSource;
      private string eventSource;
      private string dataSource;
      // 是不是可能有未知项(消息体与消息体之间的未知项)。
      private bool possibleUnknownItem;
      /// <summary>
      /// 当前行。
      /// </summary>
      private string line;
      /// <summary>
      /// 是不是处理了当前行 <see cref="line"/>。
      /// </summary>
      private bool isHandledLine;
      /// <summary>
      /// 消息体和消息体中间的未知行。
      /// </summary>
      private readonly List<string> unknownLines = new List<string>(16);
      /// <summary>
      /// 消息体列表。
      /// </summary>
      private readonly List<EventStreamEventArgs> addedItems = new List<EventStreamEventArgs>(64);

      public EventStreamHandler(string newLine)
      {
            this.newLine = newLine;
      }

      /// <summary>
      /// 消息体列表。
      /// </summary>
      public List<EventStreamEventArgs> AddedItems
      {
            get { return addedItems; }
      }

      protected string DataSource { get { return dataSource; } }

      protected bool IsHandledLine { get { return isHandledLine; } }

      /// <summary>
      /// 测试代码。
      /// </summary>
      
      public static void Test()
      {
            var builder = new StringBuilder(64);
            builder.AppendLine("id: msgid");
            builder.AppendLine("event: myEvent");
            builder.AppendLine("data: Hello, SSE!");
            builder.AppendLine();
            builder.AppendLine("id: msgid");
            builder.AppendLine("event: myEvent");
            builder.AppendLine("data: Another event");
            builder.AppendLine();
            builder.AppendLine(": This is a comment");
            builder.AppendLine();
            builder.AppendLine("id: msgid");
            builder.AppendLine("event: reconnect");
            builder.AppendLine("retry: 5000");
            builder.AppendLine();
            builder.AppendLine("id: msgid end");
            builder.AppendLine("event: myEvent end");
            builder.AppendLine("data: Another event end");
            builder.AppendLine();
            var bytes = Encoding.UTF8.GetBytes(builder.ToString());
            var stream = new MemoryStream(bytes);
            var reader = new StreamReader(stream, Encoding.UTF8);
            var handler = new EventStreamHandler("\r\n");
            handler.HandleStreamReader(reader);
            var messageItems = handler.AddedItems;
            System.Diagnostics.Trace.WriteLine($"Message items count {messageItems.Count}");
      }

      /// <summary>
      /// 解析SSE 。
      /// </summary>
      /// <param name="streamReader"></param>
      public void HandleStreamReader(StreamReader streamReader)
      {
            while (!streamReader.EndOfStream)
            {
                line = streamReader.ReadLine();
                isHandledLine = false;
                if (!string.IsNullOrEmpty(line))
                {
                  if (line.StartsWith(EventStreamEventArgs.IdBegin))
                  {
                        HandleForIdBegin();
                  }
                  else if (line.StartsWith(EventStreamEventArgs.EventBegin))
                  {
                        HandleForEventBegin();
                  }
                  else if (line.StartsWith(EventStreamEventArgs.DataBegin))
                  {
                        HandleForDataBegin();
                  }
                  else if (line.StartsWith(EventStreamEventArgs.RetryBegin))
                  {
                        HandleForRetryBegin();
                  }
                }
                if (!isHandledLine)
                {
                  unknownLines.Add(line);
                }
            }

            HandleUnknownItemWhenEnd();
      }

      #region 解析各种消息。

      private void HandleForIdBegin()
      {
            // 当前行是不是用来传输 id: 信息的行。
            var isIdLine = HandleCheckIdLine();
            if (isIdLine)
            {
                idSource = line;
                isHandledLine = true;
            }
      }

      private bool HandleCheckIdLine()
      {
            // 当前行是不是用来传输 id: 信息的行。
            bool isIdLine = false;

            // 如果是第1次遇到 idSource 。移除前面遇到的未知行。
            if (addedItems.Count < 1)
            {
                ClearIfNotEmpty(unknownLines);
                isIdLine = true;
            }
            // 如果遇到了,上1个消息体结束的特征。
            else if (MatchEmptySplitLine(unknownLines, emptySplitLineCount))
            {
                HandleUnknownItemWhenMiddle();

                isIdLine = true;
            }

            return isIdLine;
      }

      private void HandleForEventBegin()
      {
            ClearIfNotEmpty(unknownLines);

            eventSource = line;
            isHandledLine = true;
      }

      private void HandleForDataBegin()
      {
            ClearIfNotEmpty(unknownLines);

            dataSource = line;
            isHandledLine = true;

            AddItem(addedItems, idSource, eventSource, dataSource);
            possibleUnknownItem = true;
      }

      private void HandleForRetryBegin()
      {
            ClearIfNotEmpty(unknownLines);

            isHandledLine = true;
            possibleUnknownItem = true;
      }

      private void HandleUnknownItemWhenMiddle()
      {
            if (possibleUnknownItem)
            {
                // 如果遇到消息体与消息体之间,插入了,额外的未知内容时。
                AddUnknownItem(addedItems, emptySplitLineCount, unknownLines);
                possibleUnknownItem = false;
                ClearIfNotEmpty(unknownLines);
            }
      }

      private void HandleUnknownItemWhenEnd()
      {
            if (possibleUnknownItem)
            {
                var emptySplitLineCountEndNew = (emptySplitLineCountEnd == 0) ? 0 :
                  Math.Min(emptySplitLineCountEnd, GetEndEmptyLineCount(unknownLines));
                AddUnknownItem(addedItems, emptySplitLineCountEndNew, unknownLines);
                possibleUnknownItem = false;
                ClearIfNotEmpty(unknownLines);
            }
      }

      #endregion

      #region 帮助函数。

      private static void ClearIfNotEmpty(List<string> unknownLines)
      {
            if (unknownLines.Count > 0)
            {
                unknownLines.Clear();
            }
      }

      /// <summary>
      /// 是不是匹配了,消息体与消息体之间的分割特征。
      /// </summary>
      /// <param name="lines"></param>
      /// <param name="emptySplitLineCount"></param>
      /// <returns></returns>
      private static bool MatchEmptySplitLine(List<string> lines, int emptySplitLineCount)
      {
            bool isMatch = lines.Count >= emptySplitLineCount;
            if (isMatch && emptySplitLineCount > 0)
            {
                int minIndex = lines.Count - emptySplitLineCount;
                for (int index = lines.Count - 1; index >= minIndex; index--)
                {
                  if (!string.IsNullOrEmpty(lines))
                  {
                        isMatch = false;
                        break;
                  }
                }
            }
            return isMatch;
      }

      /// <summary>
      /// 末尾连续的空白行数。
      /// </summary>
      /// <param name="lines"></param>
      /// <returns></returns>
      private static int GetEndEmptyLineCount(List<string> lines)
      {
            int count = 0;
            for (int index = lines.Count - 1; index >= 0 && string.IsNullOrEmpty(lines); index--)
            {
                count++;
            }
            return count;
      }

      private string GetUnknownValue(int splitLineCount, List<string> unknownLines)
      {
                string followingValue = string.Empty;
                if (unknownLines.Count > splitLineCount)
                {
                  var addLine = unknownLines.Count - splitLineCount;
                  StringBuilder builder = new StringBuilder(64);
                  for (int index = 0; index < addLine; index++)
                  {
                        builder.Append(newLine);
                        var unknownLine = unknownLines;
                        if (!string.IsNullOrEmpty(unknownLine))
                        {
                            builder.Append(unknownLine);
                        }
                  }
                  followingValue = builder.ToString();
                }
                return followingValue;
      }

      private void AddUnknownItem(List<EventStreamEventArgs> addedItems, string unknownValue)
      {
            var item = new EventStreamEventArgs(unknownValue);
            AddItem(addedItems, item);
      }

      private void AddUnknownItem(List<EventStreamEventArgs> addedItems, int splitLineCount, List<string> unknownLines)
      {
            var unknownValue = GetUnknownValue(splitLineCount, unknownLines);
            if (!string.IsNullOrEmpty(unknownValue))
            {
                AddUnknownItem(addedItems, unknownValue);
            }
      }

      private void AddItem(List<EventStreamEventArgs> addedItems, string idSource, string eventSource, string dataSource)
      {
            var item = new EventStreamEventArgs(idSource, eventSource, dataSource);
            AddItem(addedItems, item);
      }

      private void AddItem(List<EventStreamEventArgs> addedItems, EventStreamEventArgs item)
      {
            addedItems.Add(item);
      }

      #endregion
    }

    /// <summary>
    /// 一个消息体。
    /// </summary>
    public class EventStreamEventArgs : EventArgs
    {
      internal const string IdBegin = "id: ";
      internal const string EventBegin = "event: ";
      internal const string DataBegin = "data: ";
      /// <summary>
      /// 重连消息(Reconnect Message)
      /// 服务器发送的重连消息,用于告知客户端重新连接服务器。
      /// 示例:
      /// event: reconnect
      /// retry: 5000
      /// </summary>
      internal const string RetryBegin = "retry: ";
      
      private readonly string idSource;
      
      private readonly string eventSource;
      
      private readonly string dataSource;
      
      private string idValue;
      
      private string eventValue;
      
      private string dataValue;

      public EventStreamEventArgs(string idSource, string eventSource, string dataSource)
      {
            this.idSource = idSource;
            this.eventSource = eventSource;
            this.dataSource = dataSource;
      }

      public EventStreamEventArgs(string unknownValue)
      {
            this.UnknownValue = unknownValue;
      }

      public string IdValue
      {
            get
            {
                if (idValue == null) idValue = GetValue(idSource, IdBegin);
                return idValue;
            }
      }

      public string EventValue
      {
            get
            {
                if (eventValue == null) eventValue = GetValue(eventSource, EventBegin);
                return eventValue;
            }
      }

      public string DataValue
      {
            get
            {
                if (dataValue == null) dataValue = GetValue(dataSource, DataBegin);
                return dataValue;
            }
      }

      /// <summary>
      /// 上1个 <see cref="EventStreamEventArgs"/> 中的 <see cref="DataValue"/> 后面跟随的未知内容。
      /// </summary>
      public string UnknownValue
      {
            get;
            private set;
      }

      private string GetValue(string source, string begin)
      {
            string value;
            if (source != null && source.Length > begin.Length)
            {
                value = source.Substring(begin.Length, source.Length - begin.Length);
            }
            else
            {
                value = string.Empty;
            }
            return value;
      }
    }
}



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: HTTP SSE (Server-Sent Events)流式服务器推送技术