如何通过 Apache Airflow 将数据导入 Elasticsearch

打印 上一主题 下一主题

主题 820|帖子 820|积分 2460

作者:来自 Elastic Andre Luiz

相识如何通过 Apache Airflow 将数据导入 Elasticsearch。

Apache Airflow

Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load) 流程、数据管道和其他复杂工作流,提供灵活性和可扩展性。它的可视化界面和实时监控功能使管道管理更易于访问和高效,让你可以跟踪执行的进度和结果。以下是它的四个紧张支柱:


  • 动态:管道以 Python 界说,允许动态灵活地生成工作流。
  • 可扩展:Airflow 可以与各种情况集成,可以创建自界说运算符,并可以根据必要执行特定代码。
  • 优雅:管道以干净明确的方式编写。
  • 可扩展:其模块化架构使用消息队列来编排任意数目的工作器。
在实践中,Airflow 可用于以了局景:


  • 数据导入:编排将数据每日提取到 Elasticsearch 等数据库中。
  • 日志监控:管理日志文件的网络和处置惩罚,然后在 Elasticsearch 中举行分析以识别错误或异常。
  • 多种数据源集成:将来自不同体系(API、数据库、文件)的信息合并到 Elasticsearch 中的单个层中,简化搜刮和报告。

DAG:Directed Acyclic Graphs - 有向无环图

在 Airflow 中,工作流由 DAG(有向无环图)表示。DAG 是一种界说任务执行顺序的结构。DAG 的紧张特性是:


  • 由独立任务组成:每个任务代表一个工作单元,旨在独立执行。
  • 排序:任务的执行顺序在 DAG 中明确界说。
  • 可重用性:DAG 旨在重复执行,促进流程主动化。

Airflow 的紧张组件

Airflow 生态体系由多个组件组成,它们共同协作以调和任务:



  • 调度程序 - scheduler:负责调度 DAG 并发送任务以供工作人员执行。
  • 执行器 - Exectutor:管理任务的执行,将其委托给工作人员。
  • Web 服务器 - Webserver:提供与 DAG 和任务交互的图形界面。
  • Dags 文件夹 - Dags folder:我们存储用 Python 编写的 DAG 的文件夹。
  • 元数据 - Metadata:作为工具存储库的数据库,由调度程序和执行器用于存储执行状态。

Apache Airflow 和 Elasticsearch

我们将演示如何使用 Apache Airflow 和 Elasticsearch 来调和任务并在 Elasticsearch 中索引结果。此演示的目标是创建一个任务管道来更新 Elasticsearch 索引中的记录。此索引包罗电影数据库,用户可以在此中举行评分和分配评级。想象一个天天有数百个评级的场景,有必要保持评级记录更新。为此,将开发一个 DAG,它将天天执行,负责检索新的合并评级并更新索引中的记录。
在 DAG 流程中,我们将有一个获取评级的任务,然后是一个验证结果的任务。如果数据不存在,DAG 将被定向到失败任务。否则,数据将在 Elasticsearch 中编入索引。目标是通过一种带有负责盘算分数的机制的方法检索评级,以更新索引中电影的评级字段。

使用 Apache Airflow 和 Elasticsearch 以及 Docker

要创建容器化情况,我们将使用 Apache Airflow 和 Docker。按照 “在 Docker 中运行 Airflow” 指南中的阐明现实设置 Airflow。
至于 Elasticsearch,我将使用 Elastic Cloud 上的集群,但如果你愿意,也可以使用 Docker 设置 Elasticsearch。已经创建了一个包罗电影目录的索引,此中电影数据已编入索引。这些电影的 “rating” 字段将被更新。

创建 DAG

通过 Docker 安装后,将创建一个文件夹结构,此中包括 dags 文件夹,我们必须将 DAG 文件放在该文件夹中,以便 Airflow 识别它们。
在此之前,我们必要确保安装了必要的依赖项。以下是此项目标依赖项:
  1. pip install apache-airflow apache-airflow-providers-elasticsearch
复制代码
我们将创建文件 update_ratings_movies.py 并开始编写任务代码。
现在,让我们导入必要的库:
  1. from airflow import DAG
  2. from airflow.operators.python import PythonOperator, BranchPythonOperator
  3. from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
复制代码
我们将使用 ElasticsearchPythonHook,这是一个通过抽象连接和使用外部 API 来简化 Airflow 和 Elasticsearch 集群之间集成的组件。
接下来,我们界说 DAG,并指定其紧张参数:


  • dag_id:DAG 的名称。
  • start_date:DAG 的启动时间。
  • schedule:界说周期(在我们的例子中是每日)。
  • doc_md:将导入并显示在 Airflow 界面中的文档。

界说任务

现在,让我们界说 DAG 的任务。第一个任务将负责检索电影评级数据。我们将使用 PythonOperator,并将 task_id 设置为“get_movie_ratings”。python_callable 参数将调用负责获取 ratings 的函数。
  1. get_ratings_operator = PythonOperator(
  2.    task_id='get_movie_ratings',
  3.    python_callable=get_movie_ratings_task
  4. )
复制代码
接下来,我们必要验证结果是否有效。为此,我们将使用带有 BranchPythonOperator 的条件。task_id 将为 “validate_result”,python_callable 将调用验证函数。op_args 参数将用于将上一个任务 “get_movie_ratings” 的结果通报给验证函数。
  1. validate_result = BranchPythonOperator(
  2.    task_id='validate_result',
  3.    python_callable=validate_result,
  4.    op_args=["{
  5.   
  6.   { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
  7. )
复制代码
如果验证乐成,我们将从 “get_movie_ratings” 任务中获取数据并将其索引到 Elasticsearch 中。为此,我们将创建一个新任务 “index_movie_ratings”,它将使用 PythonOperator。op_args 参数将 “get_movie_ratings” 任务的结果通报给索引函数。
  1. index_ratings_operator = PythonOperator(
  2.    task_id='index_movie_ratings',
  3.    python_callable=index_movie_ratings_task,
  4.    op_args=["{
  5.   
  6.   { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
  7. )
复制代码
如果验证表明失败,DAG 将继承执行失败通知任务。在此示例中,我们只是打印一条消息,但在现实场景中,我们可以设置警报来通知失败。
  1. failed_get_rating_operator = PythonOperator(
  2.    task_id='failed_get_rating_operator',
  3.    python_callable=lambda: print('Ratings were False, skipping indexing.')
  4. )
复制代码
最后,我们界说任务依赖关系,确保它们以精确的顺序执行:
  1. get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]
复制代码
以下是我们 DAG 的完整代码:
  1. """DAG update Rating Movies"""import astimport randomfrom airflow import DAGfrom datetime import datetimefrom airflow.operators.python import PythonOperator, BranchPythonOperatorfrom airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHookdef index_movie_ratings_task(movies):   es_hook = ElasticsearchPythonHook(hosts=None,                                     es_conn_args={                                         "cloud_id": "cloud_id"                                         "api_key": "api-key"                                     })   es_client = es_hook.get_conn   actions = []   for movie in ast.literal_eval(movies):       actions.append(           {               "update": {                   "_id": movie["id"],                   "_index": "movies"               }           }       )       actions.append(           {               "doc": {                   "rating": movie["rating"]               },               "doc_as_upsert": True           }       )   result = es_client.bulk(operations=actions)   print(f"Ingestion completed.")   print(result)   return Truedef get_movie_ratings_task():   movies = [       {"id": i, "rating": round(random.uniform(1, 10), 1)}       for i in range(1, 100)   ]   return moviesdef validate_result(result):   if not result:       return 'failed_get_rating_operator'   else:       return 'index_movie_ratings'with DAG(       dag_id="update_ratings_movies_2024",       start_date=datetime(2024, 12, 29),       schedule="@daily",       doc_md=__doc__,):   get_ratings_operator = PythonOperator(       task_id='get_movie_ratings',       python_callable=get_movie_ratings_task   )   validate_result = BranchPythonOperator(       task_id='validate_result',       python_callable=validate_result,       op_args=["{    { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"],       provide_context=True   )   index_ratings_operator = PythonOperator(       task_id='index_movie_ratings',       python_callable=index_movie_ratings_task,       op_args=["{    { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]   )   failed_get_rating_operator = PythonOperator(       task_id='failed_get_rating_operator',       python_callable=lambda: print('Ratings were False, skipping indexing.')   )get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]
复制代码

可视化 DAG 执行

在 Apache Airflow 界面中,我们可以可视化 DAG 的执行。只需转到 “DAG” 选项卡并找到你创建的 DAG 即可。

下面,我们可以直观地看到任务的执行情况及其各自的状态。通过选择特定日期的执行,我们可以访问每个任务的日志。请注意,在 index_movie_ratings 任务中,我们可以在索引中看到索引结果,而且它已乐成完成。

在其他选项卡中,可以访问有关任务和 DAG 的其他信息,以协助分析和解决潜在题目。

结论

在本文中,我们演示了如何将 Apache Airflow 与 Elasticsearch 集成以创建数据提取解决方案。我们展示了如何设置 DAG、界说负责检索、验证和索引电影数据的任务,以及如何在 Airflow 界面中监控和可视化这些任务的执行。
这种方法可以轻松适应不同范例的数据和工作流,使 Airflow 成为在各种场景中编排数据管道的有用工具。

参考资料:

Apache AirFlow


  • https://airflow.apache.org/
使用 Docker 安装 Apache Airflow


  • https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
Elasticsearch Python Hook


  • https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html
Python 运算符


  • https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

想要得到 Elastic 认证?相识下一期 Elasticsearch 工程师培训何时开始!
Elasticsearch 包罗很多新功能,可帮助你为你的用例构建最佳搜刮解决方案。深入相识我们的示例笔记本以相识更多信息,开始免费云试用,或立刻在吗的本地呆板上试用 Elastic。

原文:How to ingest data to Elasticsearch through Apache Airflow - Elasticsearch Labs

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

美丽的神话

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表