qidao123.com技术社区-IT企服评测·应用市场

标题: 利用 NV‑Ingest、Unstructured 和 Elasticsearch 处理非布局化数据 [打印本页]

作者: 星球的眼睛    时间: 6 天前
标题: 利用 NV‑Ingest、Unstructured 和 Elasticsearch 处理非布局化数据
作者:来自 Elastic Ajay Krishnan Gopalan

了解怎样利用 NV-Ingest、Unstructured Platform 和 Elasticsearch 为 RAG 应用构建可扩展的非布局化文档数据管道。

Elasticsearch 原生集成了行业领先的生成式 AI 工具和提供商。检察我们的网络研讨会,了解怎样超越 RAG 底子,或利用 Elastic 向量数据库构建可投入生产的应用。
为了为你的用例构建最佳搜索解决方案,现在就开始免费云试用,或在本地机器上试用 Elastic。

在这篇博客中,我们将讨论怎样利用 NV-Ingest、Unstructured Platform 和 Elasticsearch 实现一个可扩展的数据处理流水线。该流水线将来自数据源的非布局化数据转换为布局化、可搜索的内容,为下游的 AI 应用(如 RAG)做好准备。检索加强生成(RAG)是一种 AI 技术,它为大语言模型(LLMs)提供外部知识,以生成对用户查询的相应。这使得 LLM 的回答可以或许根据特定上下文举行定制,从而使答案更正确、更干系。
在开始之前,让我们先了解一下实现该流水线的关键组件,以及它们各自的作用。

流水线组件

NV-Ingest 是一组微服务,用于将非布局化文档转换为布局化内容和元数据。它可以大规模处理文档解析、视觉布局辨认和 OCR 处理。
Unstructured 是一个 ETL+ 平台,用于和谐整个非布局化数据处理流程:从从多个数据源中摄取非布局化数据、通过可配置的工作流引擎将原始非布局化文件转换为布局化数据、利用附加转换丰富数据,不停到将效果上传到向量存储、数据库和搜索引擎。它提供了可视化 UI、API 和可扩展的后端底子设施,在一个工作流中和谐文档解析、数据丰富和嵌入处理。
Elasticsearch 是业界领先的搜索和分析引擎,现在具备原生的向量搜索能力。它既可以作为传统的文本数据库,也可以作为向量数据库,支持像 k-NN 相似度搜索如许的功能,实现大规模语义搜索。
现在我们已经介绍了焦点组件,接下来让我们看看它们在典型工作流程中是怎样协同工作的,然后再深入了解详细实现。

利用 NV-Ingest - Unstructured - Elasticsearch 实现 RAG

虽然这里我们只提供关键要点,你可以在此处检察完备的 notebook。
本博客分为三个部门:
Unstructured 的工作流以 DAG(Directed Acyclic Graph - 有向无环图)的情势表现,节点称为毗连器,用于控制数据的摄取泉源以及处理效果的上传目标。这些节点在任何工作流中都是必需的。源毗连器配置原始数据从数据源的摄取,目标毗连器配置处理后数据上传到向量存储、搜索引擎或数据库。
在本博客中,我们将研究论文存储在 Amazon S3 中,并盼望将处理后的数据传送到 Elasticsearch 用于下游用途。这意味着,在构建数据处理工作流之前,我们必要通过 Unstructured API 创建一个 Amazon S3 的源毗连器和一个 Elasticsearch 的目标毗连器。

步骤 1:设置 S3 源毗连器

在创建源毗连器时,你必要为其指定一个唯一名称,明白其范例(例如 S3 或 Google Drive),并提供配置,通常包罗你要毗连的数据源的位置(例如 S3 bucket 的 URI 或 Google Drive 文件夹)以及身份验证信息。
  1. source_connector_response = unstructured_client.sources.create_source(
  2.     request=CreateSourceRequest(
  3.         create_source_connector=CreateSourceConnector(
  4.             name="demo_source1",
  5.             type=SourceConnectorType.S3,
  6.             config=S3SourceConnectorConfigInput(
  7.                 key=os.environ['S3_AWS_KEY'],
  8.                 secret=os.environ['S3_AWS_SECRET'],
  9.                 remote_url=os.environ["S3_REMOTE_URL"],
  10.                 recursive=False #True/False
  11.             )
  12.         )
  13.     )
  14. )
  15. pretty_print_model(source_connector_response.source_connector_information)
复制代码

步骤 2:设置 Elasticsearch 目标毗连器

接下来,我们来设置 Elasticsearch 目标毗连器。你利用的 Elasticsearch 索引必须具有与 Unstructured 为你生成的文档架构兼容的架构 —— 你可以在文档中找到全部详细信息。
  1. destination_connector_response = unstructured_client.destinations.create_destination(
  2.     request=CreateDestinationRequest(
  3.         create_destination_connector=CreateDestinationConnector(
  4.             name="demo_dest-3",
  5.             type=DestinationConnectorType.ELASTICSEARCH,
  6.             config=ElasticsearchConnectorConfigInput(
  7.                 hosts=[os.environ['es_host']],
  8.                 es_api_key=os.environ['es_api_key'],
  9.                 index_name="demo-index"
  10.             )
  11.         )
  12.     )
  13. )
复制代码

步骤 3:利用 Unstructured 创建工作流

一旦你拥有了源毗连器和目标毗连器,就可以创建一个新的数据处理工作流。我们将通过以下节点构建工作流 DAG:

  1. from unstructured_client.models.shared import (
  2.     WorkflowNode,
  3.     WorkflowNodeType,
  4.     WorkflowType,
  5.     Schedule
  6. )
  7. # Partition the content by using NV-Ingest
  8. parition_node = WorkflowNode(
  9.             name="Ingest",
  10.             subtype="nvingest",
  11.             type="partition",
  12.             settings={"nvingest_host":  userdata.get('NV-Ingest-host-address')},
  13.         )
  14. # Summarize each detected image.
  15. image_summarizer_node = WorkflowNode(
  16.     name="Image summarizer",
  17.     subtype="openai_image_description",
  18.     type=WorkflowNodeType.PROMPTER,
  19.     settings={}
  20. )
  21. # Summarize each detected table.
  22. table_summarizer_node = WorkflowNode(
  23.     name="Table summarizer",
  24.     subtype="anthropic_table_description",
  25.     type=WorkflowNodeType.PROMPTER,
  26.     settings={}
  27. )
  28. # Label each recognized named entity.
  29. named_entity_recognizer_node = WorkflowNode(
  30.     name="Named entity recognizer",
  31.     subtype="openai_ner",
  32.     type=WorkflowNodeType.PROMPTER,
  33.     settings={
  34.         "prompt_interface_overrides": None
  35.     }
  36. )
  37. # Chunk the partitioned content.
  38. chunk_node = WorkflowNode(
  39.     name="Chunker",
  40.     subtype="chunk_by_title",
  41.     type=WorkflowNodeType.CHUNK,
  42.     settings={
  43.         "unstructured_api_url": None,
  44.         "unstructured_api_key": None,
  45.         "multipage_sections": False,
  46.         "combine_text_under_n_chars": 0,
  47.         "include_orig_elements": True,
  48.         "max_characters": 1537,
  49.         "overlap": 160,
  50.         "overlap_all": False,
  51.         "contextual_chunking_strategy": None
  52.     }
  53. )
  54. # Generate vector embeddings.
  55. embed_node = WorkflowNode(
  56.     name="Embedder",
  57.     subtype="azure_openai",
  58.     type=WorkflowNodeType.EMBED,
  59.     settings={
  60.         "model_name": "text-embedding-3-large"
  61.     }
  62. )
  63. response = unstructured_client.workflows.create_workflow(
  64.     request={
  65.         "create_workflow": {
  66.             "name": f"s3-to-es-NV-Ingest-custom-workflow",
  67.             "source_id": source_connector_response.source_connector_information.id,
  68.             "destination_id": "a72838a4-bb72-4e93-972d-22dc0403ae9e",
  69.             "workflow_type": WorkflowType.CUSTOM,
  70.             "workflow_nodes": [
  71.                 parition_node,
  72.                 image_summarizer_node,
  73.                 table_summarizer_node,
  74.                 named_entity_recognizer_node,
  75.                 chunk_node,
  76.                 embed_node
  77.             ],
  78.         }
  79.     }
  80. )
  81. workflow_id = response.workflow_information.id
  82. pretty_print_model(response.workflow_information)
  83. job = unstructured_client.workflows.run_workflow(
  84.     request={
  85.         "workflow_id": workflow_id,
  86.     }
  87. )
  88. pretty_print_model(job.job_information)
复制代码
一旦这个工作流的任务完成,数据将被上传到 Elasticsearch,我们就可以继续构建一个底子的 RAG 应用程序。

步骤 4:RAG 设置

让我们继续设置一个简朴的检索器,它将毗连到数据,吸收用户查询,利用与原始数据嵌入雷同的模型对其举行嵌入,并盘算余弦相似度以检索前 3 个文档。
  1. from langchain_elasticsearch import ElasticsearchStore
  2. from langchain.embeddings import OpenAIEmbeddings
  3. import os
  4. embeddings = OpenAIEmbeddings(
  5.     model="text-embedding-3-large",
  6.     openai_api_key=os.environ['OPENAI_API_KEY']
  7. )
  8. vector_store = ElasticsearchStore(
  9.     es_url=os.environ['es_host'],
  10.     index_name="demo-index",
  11.     embedding=embeddings,
  12.     es_api_key=os.environ['es_api_key'],
  13.     query_field="text",
  14.     vector_query_field="embeddings",
  15.     distance_strategy="COSINE"
  16. )
  17. retriever = vector_store.as_retriever(
  18.     search_type="similarity",
  19.     search_kwargs={"k": 3}  # Number of results to return
  20. )
复制代码
然后,让我们设置一个工作流来吸收用户查询,从 Elasticsearch 中获取相似文档,并利用这些文档作为上下文来回答用户的问题。
  1. from openai import OpenAI
  2. client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
  3. def generate_answer(question: str, documents: str):
  4.     prompt = """
  5.     You are an assistant that can answer user questions given provided context.
  6.     Your answer should be thorough and technical.
  7.     If you don't know the answer, or no documents are provided, say 'I do not have enough context to answer the question.'
  8.     """
  9.     augmented_prompt = (
  10.         f"{prompt}"
  11.         f"User question: {question}\n\n"
  12.         f"{documents}"
  13.     )
  14.     response = client.chat.completions.create(
  15.         messages=[
  16.             {'role': 'system', 'content': 'You answer users questions.'},
  17.             {'role': 'user', 'content': augmented_prompt},
  18.         ],
  19.         model="gpt-4o-2024-11-20",
  20.         temperature=0,
  21.     )
  22.     return response.choices[0].message.content
  23. def format_docs(docs):
  24.     seen_texts = set()
  25.     useful_content = [doc.page_content for doc in docs]
  26.     return  "\nRetrieved documents:\n" + "".join(
  27.         [
  28.             f"\n\n===== Document {str(i)} =====\n" + doc
  29.             for i, doc in enumerate(useful_content)
  30.         ]
  31.     )
  32. def rag(query):
  33.   docs = retriever.invoke(query)
  34.   documents = format_docs(docs)
  35.   answer = generate_answer(query, documents)
  36.   return documents, answer
复制代码
将全部内容组合在一起,我们得到:
  1. query = "How did the response lengths change with training?"
  2. docs, answer = rag(query)
  3. print(answer)
复制代码
和一个相应:
  1. Based on the provided context, the response lengths during training for the DeepSeek-R1-Zero model showed a clear trend of increasing as the number of training steps progressed. This is evident from the graphs described in Document 0 and Document 1, which both depict the "average length per response" on the y-axis and training steps on the x-axis.
  2. ### Key Observations:
  3. 1. **Increasing Trend**: The average response length consistently increased as training steps advanced. This suggests that the model naturally learned to allocate more "thinking time" (i.e., generate longer responses) as it improved its reasoning capabilities during the reinforcement learning (RL) process.
  4. 2. **Variability**: Both graphs include a shaded area around the average response length, indicating some variability in response lengths during training. However, the overall trend remained upward.
  5. 3. **Quantitative Range**: The y-axis for response length ranged from 0 to 12,000 tokens, and the graphs show a steady increase in the average response length over the course of training, though specific numerical values at different steps are not provided in the descriptions.
  6. ### Implications:
  7. The increase in response length aligns with the model's goal of solving reasoning tasks more effectively. Longer responses likely reflect the model's ability to provide more detailed and comprehensive reasoning, which is critical for tasks requiring complex problem-solving.
  8. In summary, the response lengths increased during training, indicating that the model adapted to allocate more resources (in terms of response length) to improve its reasoning performance.
复制代码
Elasticsearch 提供了多种加强搜索的战略,包罗混合搜索,这是近似语义搜索和基于关键字的搜索的结合。
这种方法可以进步作为上下文利用的 RAG 架构中的 top 文档的干系性。要启用此功能,您必要按照以下方式修改 vector_store 初始化:
  1. from langchain_elasticsearch import DenseVectorStrategy
  2. vector_store = ElasticsearchStore(
  3.     es_url=os.environ['es_host'],
  4.     index_name="demo-index",
  5.     embedding=embeddings,
  6.     es_api_key=os.environ['es_api_key'],
  7.     query_field="text",
  8.     vector_query_field="embeddings",
  9.     strategy=DenseVectorStrategy(hybrid=True) // <-- here the change
  10. )
复制代码

结论

良好的 RAG 从准备充分的数据开始,而 Unstructured 简化了这一关键的第一步。通过 NV-Ingest 启用文档分区、对非布局化数据举行元数据丰富并高效地将其摄取到 Elasticsearch,它确保了您的 RAG 管道建立在坚固的底子上,为全部下游任务释放其全部潜力。

原文:Unstructured data processing with NV‑Ingest, Unstructured, and Elasticsearch - Elasticsearch Labs

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 qidao123.com技术社区-IT企服评测·应用市场 (https://dis.qidao123.com/) Powered by Discuz! X3.4