缠丝猫 发表于 2024-9-11 22:22:53

Flask——基于python完整实现客户端和服务器后端流式哀求及响应

看了很多相关博客,但是都没有本地客户端和服务器后端的完整代码示例,有的也只说了怎样流式获取后端结果,基本没有讲两头怎样同时实现流式输入输出,特此整理总结,给各人交流学习和使用!
本地客户端



[*]requests.post得到流式复兴的紧张参数:

[*]stream:需要设置为True;
[*]response.iter_content:使用该函数接收返回的流式数据。

import requests
import time
import json

def generate_stream_data():
    # 假设这是要发送的文本列表
    is_end = False
    lines = ["Hello", "world", "this", "is", "a", "stream", "of", "text"]
    for line in lines:
      print(line)
      if lines.index(line) == len(lines) - 1:
            is_end = True
      yield json.dumps({'line': line, 'is_end': is_end}) + '\n'
      time.sleep(0.5)
      # 模拟数据处理时间

def get_stream_response(response):
    # 流式接收response
    rec_data_list = []
    temp_data = ''
    for chunk in response.iter_content(chunk_size=1):
      temp_data += chunk.decode('utf-8')
      if temp_data.endswith('\n'):
            temp_json = json.loads(temp_data)
            rec_data_list.append(temp_json)
            print(temp_data)
            temp_data = ''
            if temp_json['is_end']:
                break
    print(rec_data_list)
    print("----------------------------")
    print(temp_data)
    return rec_data_list

def stream_upload(url):
   
    # 流式接收response
    response = requests.post(url, data=generate_stream_data(), stream=True)
   
    final_response = get_stream_response(response)
   
    return final_response

url = 'http://127.0.0.1:5000/stream'
response = stream_upload(url)
Flask服务器后端



[*]flask.request流式获取数据::

[*]使用request.stream.read读取数据,而不是get_data()等一次性函数。

from flask import Flask, Response, request
import time
import json
import requests

app = Flask(__name__)

def process_stream_data(stream_data):
    # 假设这是要发送的数据
    print("开始生成新的数据流")
    is_end = False
    print(stream_data)
    for idx, line in enumerate(stream_data):
      if idx == len(stream_data)-1:
            is_end = True
      print(line)
      yield json.dumps(line)+"\n"
      time.sleep(0.5)
      # 模拟数据处理时间

def get_stream_request(chunk_size=1):
    req_data_list = []
    temp_data = ''
    while True:
      chunk = request.stream.read(chunk_size)
      temp_data += chunk.decode('utf-8')
      if temp_data.endswith('\n'):
            temp_json = json.loads(temp_data)
            req_data_list.append(temp_json)
            print(temp_data)
            temp_data = ''
            if temp_json['is_end']:
                return req_data_list

@app.route('/stream', methods=['POST'])
def stream_text():
   
    data = get_stream_request()

    print("----------------------------")
   
    return Response(process_stream_data(data))

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000, debug=True)
客户端/服务器端流式接收[打字机]结果

https://i-blog.csdnimg.cn/blog_migrate/0088c2776b124719c5aad9102642fea3.gif

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Flask——基于python完整实现客户端和服务器后端流式哀求及响应