使用Apache Kafka路由消息
本示例向您展示了如何使用LangChain的尺度聊天功能,并通过Apache Kafka来回传递聊天消息。
目的是模拟一个架构,其中聊天前端和LLM作为必要通过内部网络相互通信的独立服务运行。
这是一种替代通过REST API哀求模型相应的典型模式(本文末端有更多信息,表明白为什么您可能想要如许做)。
1. 安装主要依赖项
依赖项包罗:
- Quix Streams库,用于以"andas-like"的方式管理与Apache Kafka(或Kafka-like工具,如Redpanda)的交互。
- LangChain库,用于管理与Llama-2的交互并存储对话状态。
- !pip install quixstreams==2.1.2a langchain==0.0.340 huggingface_hub==0.19.4 langchain-experimental==0.0.42 python-dotenv
复制代码 2. 构建并安装llama-cpp-python库(启用CUDA以使用Google Colab GPU)
llama-cpp-python库是一个Python包装器,围绕llama-cpp库,使您可以大概高效地仅使用CPU运行量化的LLM。
使用尺度的pip安装下令llama-cpp-python默认不支持GPU。如果在Google Colab中仅依赖CPU,天生可能会非常慢,以是下面的下令添加了一个额外的选项来构建并安装带有GPU支持的llama-cpp-python(确保您在Google Colab中选择了GPU支持的运行情况)。
- !CMAKE_ARGS="-DLLAMA_CUBLAS=on" FORCE_CMAKE=1 pip install llama-cpp-python
复制代码 3. 下载并设置Kafka和Zookeeper实例
从Apache网站下载Kafka二进制文件,并以守卫进程方式启动服务器。我们将使用Apache Kafka提供的默认配置来启动实例。
- !curl -sSOL https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
- !tar -xzf kafka_2.13-3.6.1.tgz
- !./kafka_2.13-3.6.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.1/config/zookeeper.properties
- !./kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.1/config/server.properties
- !echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
- !sleep 10
复制代码 4. 检查Kafka守卫进程是否正在运行
显示正在运行的进程,并过滤Java进程(您应该看到两个——每个服务器一个)。
- !ps aux | grep -E '[j]ava'
复制代码 5. 导入所需的依赖项并初始化所需的变量
导入与Kafka交互的Quix Streams库,以及运行ConversationChain所需的LangChain组件。
- # 导入实用程序库
- import json
- import random
- import re
- import time
- import uuid
- from os import environ
- from pathlib import Path
- from random import choice, randint, random
- from dotenv import load_dotenv
- # 从Hugging Face hub直接下载模型的Hugging Face实用程序:
- from huggingface_hub import hf_hub_download
- from langchain.chains import ConversationChain
- # 导入Langchain模块以管理提示和对话链:
- from langchain.llms import LlamaCpp
- from langchain.memory import ConversationTokenBufferMemory
- from langchain.prompts import PromptTemplate, load_prompt
- from langchain_core.messages import SystemMessage
- from langchain_experimental.chat_models import Llama2Chat
- from quixstreams import Application, State, message_key
- # 导入Quix依赖项
- from quixstreams.kafka import Producer
- # 初始化全局变量。
- AGENT_ROLE = "AI"
- chat_id = ""
- # 将当前角色设置为角色常量,并为补充客户元数据初始化变量:
- role = AGENT_ROLE
复制代码 6. 下载"llama-2-7b-chat.Q4_K_M.gguf"模型
从Hugging Face下载量化的Llama-2 7B模型,我们将使用它作为当地LLM(而不是依赖于外部服务的REST API调用)。
- model_name = "llama-2-7b-chat.Q4_K_M.gguf"
- model_path = f"./state/{model_name}"
- if not Path(model_path).exists():
- print("The model path does not exist in state. Downloading model...")
- hf_hub_download("TheBloke/Llama-2-7b-Chat-GGUF", model_name, local_dir="state")
- else:
- print("Loading model from state...")
复制代码 7. 加载模型并初始化对话记忆
加载Llama 2,并使用ConversationTokenBufferMemory将对话缓冲区设置为300个token。这个值用于在仅CPU容器中运行Llama,以是如果在Google Colab中运行,您可以提高它。它防止了托管模型的容器内存不足。
在这里,我们覆盖了默认的体系角色,以便聊天呆板人具有《银河系环游指南》中Marvin The Paranoid Android的个性。
- # 使用适当的参数加载模型:
- llm = LlamaCpp(
- model_path=model_path,
- max_tokens=250,
- top_p=0.95,
- top_k=150,
- temperature=0.7,
- repeat_penalty=1.2,
- n_ctx=2048,
- streaming=False,
- n_gpu_layers=-1,
- )
- model = Llama2Chat(
- llm=llm,
- system_message=SystemMessage(
- content="您是一个非常无聊的机器人,具有《银河系漫游指南》中Marvin the Paranoid Android的个性。"
- ),
- )
- # 定义在每次交流中给模型的对话历史量(300个token,或者略多于300个单词)
- # 该函数自动修剪超出token范围的对话历史中最旧的消息。
- memory = ConversationTokenBufferMemory(
- llm=llm,
- max_token_limit=300,
- ai_prefix="AGENT",
- human_prefix="HUMAN",
- return_messages=True,
- )
- # 定义自定义提示
- prompt_template = PromptTemplate(
- input_variables=["history", "input"],
- template="""
- 以下文本是您和需要您智慧的谦逊人类之间聊天的历史。
- 请回复人类最近的消息。
- 当前对话:
- {history}
- HUMAN: {input}\
- :nANDROID:
- """,
- )
- chain = ConversationChain(llm=model, prompt=prompt_template, memory=memory)
- print("--------------------------------------------")
- print(f"Prompt={chain.prompt}")
- print("--------------------------------------------")
复制代码 8. 使用聊天呆板人初始化聊天对话
我们配置聊天呆板人通过向"chat" Kafka主题发送固定问候来初始化对话。当我们发送第一条消息时,"chat"主题会自动创建。
- def chat_init():
- chat_id = str(
- uuid.uuid4()
- ) # 为有效的消息键控给对话一个ID
- print("======================================")
- print(f"Generated CHAT_ID = {chat_id}")
- print("======================================")
- chat_init()
复制代码 9. 初始化复兴功能
这个函数界说了聊天呆板人应该如何复兴传入的消息。与之前的单元格差别,我们不是发送一个固定的消息,而是使用Llama-2天生一个复兴,并将该复兴发回"chat" Kafka主题。
- def reply(row: dict, state: State):
- print("-------------------------------")
- print("Received:")
- print(row)
- print("-------------------------------")
- print(f"Thinking about the reply to: {row['text']}...")
复制代码 10. 检查Kafka主题以获取新的人类消息,并让模型天生复兴
如果您第一次运行这个单元格,请运行它并期待在控制台输出中看到Marvin的问候(“Hello my name is Marvin…”)。在收到LLM的复兴后,手动停止这个单元格,并继续执行下一个单元格,在那里您将被提示输入您的复兴。
一旦您输入了您的消息,请回到这个单元格。您的复兴也发送到了同一个"chat"主题。Kafka消费者检查新消息,并过滤掉来自聊天呆板人本身的消息,只留下最新的人类消息。
一旦检测到新的人类消息,就会触发复兴功能。
在输出中收到LLM的复兴后,手动停止这个单元格。
- # 定义您的应用程序和设置
- app = Application(
- broker_address="127.0.0.1:9092",
- consumer_group="aichat",
- auto_offset_reset="earliest",
- consumer_extra_config={"allow.auto.create.topics": "true"},
- )
- # 定义一个带有JSON反序列化的输入主题
- input_topic = app.topic("chat", value_deserializer="json")
- # 定义一个带有JSON序列化的输出主题
- output_topic = app.topic("chat", value_serializer="json")
- # 基于输入主题的消息流初始化一个流数据帧:
- sdf = app.dataframe(topic=input_topic)
- # 过滤SDF,只包括角色与机器人当前角色不匹配的传入行
- sdf = sdf.update(
- lambda val: print(
- f"Received update: {val}\n\nSTOP THIS CELL MANUALLY TO HAVE THE LLM REPLY OR ENTER YOUR OWN FOLLOWUP RESPONSE"
- )
- )
- # 以便它不会回复自己的消息
- sdf = sdf[sdf["role"] != role]
- # 为过滤后的SDF中检测到的任何新消息(行)触发回复功能
- sdf = sdf.apply(reply, stateful=True)
- # 再次检查SDF并过滤掉任何空行
- sdf = sdf[sdf.apply(lambda row: row is not None)]
- # 更新时间戳列到当前时间的纳秒
- sdf["Timestamp"] = sdf["Timestamp"].apply(lambda row: time.time_ns())
- # 将处理过的SDF发布到由output_topic对象指定的Kafka主题。
- sdf = sdf.to_topic(output_topic)
- app.run(sdf)
复制代码 11. 输入人类消息
运行这个单元格以输入您想要发送给模型的消息。它使用另一个Kafka生产者将您的文本发送到"chat" Kafka主题,供模型获取(必要再次运行上一个单元格)。
- chat_input = input("Please enter your reply: ")
- myreply = chat_input
- msgvalue = {
- "uuid": chat_id, # 现在留空
- "role": "human",
- "text": myreply,
- "conversation_id": chat_id,
- "Timestamp": time.time_ns(),
- }
- with Producer(
- broker_address="127.0.0.1:9092",
- extra_config={"allow.auto.create.topics": "true"},
- ) as producer:
- value = msgvalue
- producer.produce(
- topic="chat",
- headers=[("uuid", str(uuid.uuid4()))], # 这里也允许使用字典
- key=chat_id, # 现在留空
- value=json.dumps(value), # 需要是一个字符串
- )
- print("Replied to chatbot with message:")
- print("--------------------------------------------")
- print(value)
- print("--------------------------------------------")
- print("\n\nRUN THE PREVIOUS CELL TO HAVE THE CHATBOT GENERATE A REPLY")
复制代码 为什么要通过Kafka路由聊天消息?
使用LangChain内置的对话管理功能直接与LLM交互更轻易。此外,您还可以使用REST API从外部托管的模型天生相应。那么为什么要费心使用Apache Kafka呢?
有几个缘故原由,例如:
- 集成:很多企业希望运行自己的LLM,以便他们可以将数据保留在内部。这必要将LLM支持的组件集成到可能已经使用某种消息总线进行解耦的现有架构中。
- 可扩展性:Apache Kafka旨在进行并行处理,因此很多团队更喜欢使用它来更有效地将工作分配给可用的工作者(在这种情况下,“工作者”是运行LLM的容器)。
- 恒久性:Kafka旨在答应服务在另一项服务遇到内存问题或下线时继续进行。这可以防止在多个体系相互通信的复杂分布式架构中发生数据丢失(LLM只是很多相互依赖的体系之一,还包罗向量数据库和传统数据库)。
有关事件流为何适用于Gen AI应用步调架构的更多配景信息,请参阅Kai Waehner的文章“Apache Kafka + Vector Database + LLM = Real-Time GenAI”。
本文介绍了一种使用Apache Kafka进行消息路由以模拟独立聊天前端和LLM服务之间通信的架构。通过安装依赖库、设置Kafka情况、初始化模型和对话记忆,构建了一个聊天呆板人。这种方法使用了Kafka的集成性、可扩展性和恒久性优势,适合必要将LLM集成到现有解耦架构中的企业使用。此外,它还提供了一种比直接REST API调用更结实和可靠的解决方案。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |