看了很多相关博客,但是都没有本地客户端和服务器后端的完整代码示例,有的也只说了怎样流式获取后端结果,基本没有讲两头怎样同时实现流式输入输出,特此整理总结,给各人交流学习和使用!
本地客户端
- requests.post得到流式复兴的紧张参数:
- stream:需要设置为True;
- response.iter_content:使用该函数接收返回的流式数据。
- import requests
- import time
- import json
- def generate_stream_data():
- # 假设这是要发送的文本列表
- is_end = False
- lines = ["Hello", "world", "this", "is", "a", "stream", "of", "text"]
- for line in lines:
- print(line)
- if lines.index(line) == len(lines) - 1:
- is_end = True
- yield json.dumps({'line': line, 'is_end': is_end}) + '\n'
- time.sleep(0.5)
- # 模拟数据处理时间
- def get_stream_response(response):
- # 流式接收response
- rec_data_list = []
- temp_data = ''
- for chunk in response.iter_content(chunk_size=1):
- temp_data += chunk.decode('utf-8')
- if temp_data.endswith('\n'):
- temp_json = json.loads(temp_data)
- rec_data_list.append(temp_json)
- print(temp_data)
- temp_data = ''
- if temp_json['is_end']:
- break
- print(rec_data_list)
- print("----------------------------")
- print(temp_data)
- return rec_data_list
- def stream_upload(url):
-
- # 流式接收response
- response = requests.post(url, data=generate_stream_data(), stream=True)
-
- final_response = get_stream_response(response)
-
- return final_response
- url = 'http://127.0.0.1:5000/stream'
- response = stream_upload(url)
复制代码 Flask服务器后端
- flask.request流式获取数据::
- 使用request.stream.read读取数据,而不是get_data()等一次性函数。
- from flask import Flask, Response, request
- import time
- import json
- import requests
- app = Flask(__name__)
- def process_stream_data(stream_data):
- # 假设这是要发送的数据
- print("开始生成新的数据流")
- is_end = False
- print(stream_data)
- for idx, line in enumerate(stream_data):
- if idx == len(stream_data)-1:
- is_end = True
- print(line)
- yield json.dumps(line)+"\n"
- time.sleep(0.5)
- # 模拟数据处理时间
- def get_stream_request(chunk_size=1):
- req_data_list = []
- temp_data = ''
- while True:
- chunk = request.stream.read(chunk_size)
- temp_data += chunk.decode('utf-8')
- if temp_data.endswith('\n'):
- temp_json = json.loads(temp_data)
- req_data_list.append(temp_json)
- print(temp_data)
- temp_data = ''
- if temp_json['is_end']:
- return req_data_list
- @app.route('/stream', methods=['POST'])
- def stream_text():
-
- data = get_stream_request()
- print("----------------------------")
-
- return Response(process_stream_data(data))
- if __name__ == "__main__":
- app.run(host='0.0.0.0', port=5000, debug=True)
复制代码 客户端/服务器端流式接收[打字机]结果
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |