在亚马逊云科技AWS上使用ElasticSearch和RAG搭建个性化推荐系统 ...

打印 上一主题 下一主题

主题 675|帖子 675|积分 2025

简介:

小李哥将继续每天先容一个基于亚马逊云科技AWS云盘算平台的环球前沿AI技能解决方案,帮助大家快速相识国际上最热门的云盘算平台亚马逊云科技AWS AI最佳实践,并应用到自己的一样平常工作里。
本次先容用当下热门的RAG和大语言模型,结合亚马逊云科技热门AI模型管理服务Amazon SageMaker,手把手开发一个用户的个性化推荐系统。本文将通过使用Bloom-7B向量化模型将用户数据文档天生向量,并生存到向量数据库ElasticSearch中,末了使用RAG和摆设在SageMaker上的Falcon-7B大语言模型在ElasticSearch中举行语义搜索天生用户个性化建议。我将领导大家编写一步一步的细节代码和展示实际的天生式AI实操项目,0底子学会AI核心技能。本架构设计全部采用了云原生Serverless架构,提供可扩展和安全的AI应用解决方案。本方案架构图如下:


方案实操背景知识

Amazon SageMaker

Amazon SageMaker 是一款亚马逊云科技AWS上的机器学习模型托管服务,旨在帮助开发者和数据科学家快速构建、训练和摆设机器学习模型。SageMaker 提供了丰富的工具和功能,如内置的 Jupyter Notebook、自动模型调优和托管训练环境,使整个机器学习工作流程更加简化和高效。
特别值得一提的是 SageMaker 的 JumpStart 功能。JumpStart 提供了一系列预训练模型和机器学习解决方案,用户可以通过简单的界面快速摆设这些模型,大大缩短了从实验到生产的时间。这对于那些必要快速验证模型结果或不具备大量机器学习知识的开发者来说尤为有用。
Amazon OpenSearch Service

Amazon OpenSearch Service 是一个亚马逊云科技托管的搜索和分析引擎,基于开源的 Elasticsearch 和 Kibana 构建。OpenSearch Service 提供了强大的数据索引和搜索能力,适用于日志分析、实时应用监控和复杂数据分析等场景。
别的,OpenSearch Service 还支持向量化数据库功能。通过将嵌入的向量数据存储在 OpenSearch Service 中,开发者可以实现高效的相似性搜索和推荐系统。向量化数据库能够处理惩罚高维度的数据查询,特别适合必要处理惩罚天然语言处理惩罚(NLP)和图像识别等任务的应用。

本方案包括的内容:

使用 Amazon SageMaker JumpStart 快速摆设向量化模型

在 SageMaker Studio 中使用 Jupyter notebook 处理惩罚原始数据

在 Amazon OpenSearch Service 中创建索引并存储向量化数据

使用 OpenSearch Service 探索数据

多场景测试使用检索增强天生(RAG)构建的个性化推荐应用


项目搭建具体步调:

1. 首先我们进入亚马逊云科技控制台,点击OpenSearch服务

2. 创建一个OpenSeach实例,点击左侧的domain检察创建好的实例

3.  检察OpenSearch实例登录界面的URL

4. 接下来我们进入SageMaker服务,点击Studio ,并打开Studio

5. 接下来我们进入JumpStart功能,点击HuggingFace,在这里我们可以直接在服务器上摆设HuggingFace上的开源大模型

 6. 我们点击模型“Bloom 7B1 Embedding FP16”向量模型

 7. 点击Deploy摆设向量模型

8. 设置参数后摆设
 9. 模型摆设时间约为5分钟,这时我们点击Studio Classic进入Jupyter Notebook

 10. 点击Open打开Notebook

11. 我们打开一个新的Notebook,开始编写我们的RAG个性化推荐系统服务,首先我们安装并导入须要模块
  1. %%capture
  2. ## Code Cell 1 ##
  3. !pip install opensearch-py-ml accelerate tqdm --quiet
  4. !pip install sagemaker --upgrade --quiet
  5. ## Code Cell 3 ##
  6. import boto3
  7. import os
  8. import time
  9. import json
  10. import pandas as pd
  11. from tqdm import tqdm
  12. import sagemaker
  13. from opensearchpy import OpenSearch, RequestsHttpConnection
  14. from sagemaker import get_execution_role
复制代码
12. 接下来我们从文件中导入我们推荐系统的源数据文件
  1. ## Code Cell 4 ##
  2. # Read in the Tab delimited data file and print the shape of the resulting data frame.
  3. pd.options.mode.chained_assignment = None
  4. df = pd.read_csv('booksummaries.txt',sep='\t')
  5. print(df.shape)
  6. # Add columns headers to the data frame to be able to analyze.
  7. df.columns = ['WikiPediaId','FreeBaseId','title','author','pub_date','genres','plot_summary']
  8. # Display entries with null data in any column (NaN).
  9. df[df.isnull().any(axis=1)]
复制代码
13.  接下来我们运行下面的代码对数据举行处理惩罚,删除不必要的列、字符,并扬弃确实数据的行
  1. ## Code Cell 5 ##
  2. # Let's drop any rows that contain null values in any column. In a real production application you would want to replace NaN values with correct data.
  3. df_1 = df.dropna()
  4. # clean-up Freebase markup and other unwanted characters in the Genres columm.
  5. df_1.genres.replace(to_replace='"\/m\/.{4,7}"\:', value='', regex=True,inplace=True)  # remove Freebase markup
  6. df_1.genres.replace(to_replace='\\\\u00e0\sclef', value='', regex=True,inplace=True)    # remove utf-8 special characters
  7. df_1.genres.replace(to_replace='\{\s"|"\}', value='', regex=True,inplace=True)          # Remove {" or "}
  8. df_1.genres.replace(to_replace='"', value='', regex=True,inplace=True)                  # Remove double quotes
  9. # Only use the first value as the genre
  10. df_2 = df_1['genres'].str.split(',', expand=True, n=1)
  11. df_2.rename( columns={0:'genre'}, inplace=True )
  12. df_2.drop(columns=[1],inplace=True)
  13. df_3 = pd.concat([df_1, df_2], axis=1)
  14. # Trim the size of the plot summary to 500 characters.
  15. df_3['plot_summary'] = df_3['plot_summary'].apply(lambda x: ' '.join(x[:500].split(' ')[:-1]) if len(x) > 500 else x)
  16. df_3['book_summary'] = df_3[['title','author','genre','plot_summary']].agg(' '.join, axis=1)
  17. # Sort by the author column, and to keep the lab within a reasonable time-frame drop the last 5000 rows of data.
  18. # Drop other columns not needed.
  19. df_3.sort_values(by=['author'],inplace = True)
  20. df_3.drop(df_3.tail(5000).index,inplace = True)
  21. df_3.drop(columns=['genres','WikiPediaId','FreeBaseId','plot_summary'],inplace=True)
  22. # Create a dictionary of the remaining data to be used for further processing.
  23. wm_list = df_3.to_dict('records')
  24. # Let's look at the data now that it has been cleansed and trimmed.
  25. df_3
复制代码
 14. 接下来我们通过授权,建立和OpenSearch集群的毗连
  1. ## Update the below <StackName> placeholder with the value from your Lab you copied in an earlier step.
  2. cloudformation_stack_name = 'LabStack-be9ce4b8-cc71-4dda-bfee-6024c60bb194-udbRjk1euELCeKsbKMeE2y-0'
  3. region = 'us-east-1'
  4. cfn = boto3.client('cloudformation')
  5. def get_cfn_outputs(stackname):
  6.     outputs = {}
  7.     for output in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Outputs']:
  8.         outputs[output['OutputKey']] = output['OutputValue']
  9.     return outputs
  10. outputs = get_cfn_outputs(cloudformation_stack_name)
  11. aos_host = outputs['OSDomainEndpoint']
  12. outputs
  13. ## To authenticate to the OpenSearch domain we need to retrieve username and password stored in Secrets Manager.
  14. secrets = boto3.client('secretsmanager')
  15. os_domain_secret = secrets.list_secrets(
  16.     Filters=[
  17.         {
  18.             'Key':'name',
  19.             'Values': ['DomainMasterUser']
  20.         }
  21.     ]
  22. )['SecretList'][0]['Name']
  23. aos_credentials = json.loads(secrets.get_secret_value(SecretId=os_domain_secret)['SecretString'])
  24. auth = (aos_credentials['username'], aos_credentials['password'])
  25. print(auth)
  26. ## The below client will be used in a later step below.
  27. aos_client = OpenSearch(
  28.     hosts = [{'host': aos_host, 'port': 443}],
  29.     http_auth = auth,
  30.     use_ssl = True,
  31.     verify_certs = True,
  32.     connection_class = RequestsHttpConnection
  33. )
复制代码
15. 下面我们定义调用SageMaker将文字转化为向量的函数"query_endpoint_with_json_payload"
  1. ## Code Cell 8 ##
  2. embedding_endpoint_name = 'jumpstart-dft-hf-textembedding-bloo-20240804-035651'
  3. def query_endpoint_with_json_payload(encoded_json, endpoint_name, content_type="application/json"):
  4.     client = boto3.client("runtime.sagemaker")
  5.    
  6.     response = client.invoke_endpoint(
  7.         EndpointName=endpoint_name, ContentType=content_type, Body=encoded_json
  8.     )
  9.     response_json = json.loads(response['Body'].read().decode("utf-8"))
  10.     embeddings = response_json["embedding"]
  11.     if len(embeddings) == 1:
  12.         return [embeddings[0]]
  13.     return embeddings
复制代码
 16. 接下来我们定义OpenSearch中的索引,用于更精准的搜索文档里的内容,这里我们定义了生存字段的类型,以及索引算法k-NN
  1. ## Code Cell 11 ##
  2. knn_index = {
  3.     "settings": {
  4.         "index.knn": True,
  5.         "index.knn.space_type": "cosinesimil",
  6.         "analysis": {
  7.           "analyzer": {
  8.             "default": {
  9.               "type": "standard",
  10.               "stopwords": "_english_"
  11.             }
  12.           }
  13.         }
  14.     },
  15.     "mappings": {
  16.         "properties": {
  17.             "booksummary_vector": {
  18.                 "type": "knn_vector",
  19.                 "dimension": 4096,
  20.                 "store": True
  21.             },
  22.             "book_summary": {
  23.                 "type": "text",
  24.                 "store": True
  25.             },
  26.             "author": {
  27.                 "type": "text",
  28.                 "store": True
  29.             },
  30.             "title": {
  31.                 "type": "text",
  32.                 "store": True
  33.             },
  34.             "pub_date": {
  35.                 "type": "text",
  36.                 "store": True
  37.             },
  38.             "genre": {
  39.                 "type": "text",
  40.                 "store": True
  41.             },
  42.         }
  43.     }
  44. }
复制代码
 17. 我们可以用如下代码,验证和获取我们之前定义的索引
  1. ## Code Cell 13 ##
  2. aos_client.indices.get(index=index_name)
复制代码
18. 接下来我们把之前处理惩罚过的文档内的数据导入到OpenSearch索引当中
  1. ## Code Cell 14 ##
  2. def os_import(record, aos_client, index_name):
  3.     book_summary = record["book_summary"]
  4.     search_vector = embed_phrase(book_summary)
  5.     aos_client.index(index=index_name,
  6.              body={"booksummary_vector": search_vector[0],
  7.                    "book_summary": record["book_summary"],
  8.                    "author":record["author"],
  9.                    "genre":record["genre"],
  10.                    "pub_date":record["pub_date"],
  11.                    "title":record["title"]
  12.                   }
  13.             )
  14. print("Loading records...")
  15. for record in tqdm(wm_list):
  16.     os_import(record, aos_client, index_name)
  17. print("Records loaded.")
复制代码
19. 打开我们之前在第3步获取的OpenSearch URL,并输入用户名和密码
20. 进入OpenSearch界面中的Stack Management,点击创建索引类型
 21. 为索引类型起一个名字“book_knowledge_base”

 22. 点击创建

 23. 在OpenSearch中点击Discover即可看到我们导入的文档数据

24. 接下来我们定义在OpenSearch中举行近似语义搜索的函数“retrieve_opensearch_with_semantic_search”
  1. ## Code Cell 16 ##
  2. def retrieve_opensearch_with_semantic_search(phrase, n=2):
  3.    
  4.     search_vector = embed_phrase(phrase)[0]
  5.     osquery={
  6.         "_source": {
  7.             "exclude": [ "booksummary_vector" ]
  8.         },
  9.         
  10.       "size": n,
  11.       "query": {
  12.         "knn": {
  13.           "booksummary_vector": {
  14.             "vector":search_vector,
  15.             "k":n
  16.           }
  17.         }
  18.       }
  19.     }
  20.     res = aos_client.search(index=index_name,
  21.                            body=osquery,
  22.                            stored_fields=["title","author","pub_date", "genre", "book_summary"],
  23.                            explain = True)
  24.     top_result = res['hits']['hits'][1]
  25.    
  26.     result = {
  27.         "title":top_result['_source']['title'],
  28.         "author":top_result['_source']['author'],
  29.         "pub_date":top_result['_source']['pub_date'],
  30.         "genre":top_result['_source']['genre'],
  31.         "book_summary":top_result['_source']['book_summary'],
  32.     }
  33.    
  34.     return result
复制代码
25. 接下来我们运行该函数举行测试,搜索我们输入的内容在文档数据源中举行语义搜索,找到推荐的书籍。我们的问题是:“一本由埃德加·赖斯·巴勒斯(Edgar Rice Burroughs)撰写的、属于科幻类型且涉及人猿泰山的书。”
  1. ## Code Cell 17 ##
  2. example_request = retrieve_opensearch_with_semantic_search(question_on_book)
  3. print(question_on_book)
  4. print(example_request)
复制代码
26.  接下来我们定义函数,使用RAG在OpenSearch中举行语义搜索找到相关结果,在用大模型天生终极复兴
  1. ## Code Cell 21 ##
  2. def generate_prompt_to_llm(original_question_on_book):
  3.     retrieved_documents = retrieve_opensearch_with_semantic_search(original_question_on_book)
  4.     print("retrieved relevant book per your query is : \n" + str(retrieved_documents))
  5.     print("------------")
  6.     one_shot_description_example = "{'book_summary': 'Tarzan tracks down a man who has been mistaken for him. The man is under the delusion that he is Tarzan, and he is living in a lost city inhabited by people descended from early Portuguese explorers. The plot devices of a lost city and a Tarzan double or impostor had been used by Burroughs in some previous Tarzan novels.', 'author': 'Edgar Rice Burroughs', 'title': 'Tarzan and the Madman', 'genre': 'Science fiction', 'pub_date': '1964'}"
  7.     one_shot_response_example = "It's a real page turning story about Tarzan and how a madman has been impersonating him. The author is Edgar Rice Burroughs and it's a science fiction book with adventure and lots of fun. It was published in the year 1964."
  8.     prompt = (
  9.         f" Make a book recommendation that is similar to the {original_question_on_book} The recommendation must include the title of the book, the author and genre: \n"
  10.         f"Data: {one_shot_description_example} \n Recommendation: {one_shot_response_example} \n"
  11.         f"Data: {retrieved_documents} \n Recommendation:"
  12.     )
  13.     return prompt
复制代码
27. 末了我们定义函数设计大模型输入和输出的格式,得到终极的推荐结果。
  1. ## Code Cell 22 ##
  2. def generate_llm_input(data, **kwargs):
  3.     default_kwargs = {
  4.         "num_beams": 5,
  5.         "no_repeat_ngram_size": 3,
  6.         "do_sample": True,
  7.         "max_new_tokens": 100,
  8.         "temperature": 0.9,
  9.         "watermark": True,
  10.         "top_k": 200,
  11.         "max_length": 200,
  12.         "early_stopping": True
  13.     }
  14.    
  15.     default_kwargs = {**default_kwargs, **kwargs}
  16.    
  17.     input_data = {
  18.         "inputs": data,
  19.         "parameters": default_kwargs
  20.     }
  21.    
  22.     return input_data
  23. def query_llm_with_rag(description, **kwargs):
  24.     prompt = generate_prompt_to_llm(description)
  25.     query_payload = generate_llm_input(prompt, **kwargs)
  26.     response = query_llm_endpoint_with_json_payload(json.dumps(query_payload).encode("utf-8"), endpoint_name=llm_endpoint_name)
  27.     return response
  28. ## Code Cell 23 ##
  29. recommendation = query_llm_with_rag(question_on_book)
  30. print(question_on_book)
  31. print(recommendation)
复制代码
以上就是在亚马逊云科技上使用RAG和摆设在SageMaker上大语言模型构建个性化推荐服务系统的步调。欢迎大家关注小李哥,未来获取更多国际前沿的天生式AI开发方案!


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

怀念夏天

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表