如何通过 Apache Airflow 将数据导入 Elasticsearch
作者:来自 Elastic Andre Luizhttps://i-blog.csdnimg.cn/direct/c25b8c41e5c545bda1e9a5bb44c0dc5e.webp
相识如何通过 Apache Airflow 将数据导入 Elasticsearch。
Apache Airflow
Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load) 流程、数据管道和其他复杂工作流,提供灵活性和可扩展性。它的可视化界面和实时监控功能使管道管理更易于访问和高效,让你可以跟踪执行的进度和结果。以下是它的四个紧张支柱:
[*]动态:管道以 Python 界说,允许动态灵活地生成工作流。
[*]可扩展:Airflow 可以与各种情况集成,可以创建自界说运算符,并可以根据必要执行特定代码。
[*]优雅:管道以干净明确的方式编写。
[*]可扩展:其模块化架构使用消息队列来编排任意数目的工作器。
在实践中,Airflow 可用于以了局景:
[*]数据导入:编排将数据每日提取到 Elasticsearch 等数据库中。
[*]日志监控:管理日志文件的网络和处置惩罚,然后在 Elasticsearch 中举行分析以识别错误或异常。
[*]多种数据源集成:将来自不同体系(API、数据库、文件)的信息合并到 Elasticsearch 中的单个层中,简化搜刮和报告。
DAG:Directed Acyclic Graphs - 有向无环图
在 Airflow 中,工作流由 DAG(有向无环图)表示。DAG 是一种界说任务执行顺序的结构。DAG 的紧张特性是:
[*]由独立任务组成:每个任务代表一个工作单元,旨在独立执行。
[*]排序:任务的执行顺序在 DAG 中明确界说。
[*]可重用性:DAG 旨在重复执行,促进流程主动化。
Airflow 的紧张组件
Airflow 生态体系由多个组件组成,它们共同协作以调和任务:
https://i-blog.csdnimg.cn/direct/34f68600d2504319925508131ee9e74a.webp
[*]调度程序 - scheduler:负责调度 DAG 并发送任务以供工作人员执行。
[*]执行器 - Exectutor:管理任务的执行,将其委托给工作人员。
[*]Web 服务器 - Webserver:提供与 DAG 和任务交互的图形界面。
[*]Dags 文件夹 - Dags folder:我们存储用 Python 编写的 DAG 的文件夹。
[*]元数据 - Metadata:作为工具存储库的数据库,由调度程序和执行器用于存储执行状态。
Apache Airflow 和 Elasticsearch
我们将演示如何使用 Apache Airflow 和 Elasticsearch 来调和任务并在 Elasticsearch 中索引结果。此演示的目标是创建一个任务管道来更新 Elasticsearch 索引中的记录。此索引包罗电影数据库,用户可以在此中举行评分和分配评级。想象一个天天有数百个评级的场景,有必要保持评级记录更新。为此,将开发一个 DAG,它将天天执行,负责检索新的合并评级并更新索引中的记录。
在 DAG 流程中,我们将有一个获取评级的任务,然后是一个验证结果的任务。如果数据不存在,DAG 将被定向到失败任务。否则,数据将在 Elasticsearch 中编入索引。目标是通过一种带有负责盘算分数的机制的方法检索评级,以更新索引中电影的评级字段。
使用 Apache Airflow 和 Elasticsearch 以及 Docker
要创建容器化情况,我们将使用 Apache Airflow 和 Docker。按照 “在 Docker 中运行 Airflow” 指南中的阐明现实设置 Airflow。
至于 Elasticsearch,我将使用 Elastic Cloud 上的集群,但如果你愿意,也可以使用 Docker 设置 Elasticsearch。已经创建了一个包罗电影目录的索引,此中电影数据已编入索引。这些电影的 “rating” 字段将被更新。
创建 DAG
通过 Docker 安装后,将创建一个文件夹结构,此中包括 dags 文件夹,我们必须将 DAG 文件放在该文件夹中,以便 Airflow 识别它们。
在此之前,我们必要确保安装了必要的依赖项。以下是此项目标依赖项:
pip install apache-airflow apache-airflow-providers-elasticsearch 我们将创建文件 update_ratings_movies.py 并开始编写任务代码。
现在,让我们导入必要的库:
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook 我们将使用 ElasticsearchPythonHook,这是一个通过抽象连接和使用外部 API 来简化 Airflow 和 Elasticsearch 集群之间集成的组件。
接下来,我们界说 DAG,并指定其紧张参数:
[*]dag_id:DAG 的名称。
[*]start_date:DAG 的启动时间。
[*]schedule:界说周期(在我们的例子中是每日)。
[*]doc_md:将导入并显示在 Airflow 界面中的文档。
界说任务
现在,让我们界说 DAG 的任务。第一个任务将负责检索电影评级数据。我们将使用 PythonOperator,并将 task_id 设置为“get_movie_ratings”。python_callable 参数将调用负责获取 ratings 的函数。
get_ratings_operator = PythonOperator(
task_id='get_movie_ratings',
python_callable=get_movie_ratings_task
) 接下来,我们必要验证结果是否有效。为此,我们将使用带有 BranchPythonOperator 的条件。task_id 将为 “validate_result”,python_callable 将调用验证函数。op_args 参数将用于将上一个任务 “get_movie_ratings” 的结果通报给验证函数。
validate_result = BranchPythonOperator(
task_id='validate_result',
python_callable=validate_result,
op_args=["{
{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
) 如果验证乐成,我们将从 “get_movie_ratings” 任务中获取数据并将其索引到 Elasticsearch 中。为此,我们将创建一个新任务 “index_movie_ratings”,它将使用 PythonOperator。op_args 参数将 “get_movie_ratings” 任务的结果通报给索引函数。
index_ratings_operator = PythonOperator(
task_id='index_movie_ratings',
python_callable=index_movie_ratings_task,
op_args=["{
{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
) 如果验证表明失败,DAG 将继承执行失败通知任务。在此示例中,我们只是打印一条消息,但在现实场景中,我们可以设置警报来通知失败。
failed_get_rating_operator = PythonOperator(
task_id='failed_get_rating_operator',
python_callable=lambda: print('Ratings were False, skipping indexing.')
) 最后,我们界说任务依赖关系,确保它们以精确的顺序执行:
get_ratings_operator >> validate_result >> 以下是我们 DAG 的完整代码:
"""DAG update Rating Movies"""import astimport randomfrom airflow import DAGfrom datetime import datetimefrom airflow.operators.python import PythonOperator, BranchPythonOperatorfrom airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHookdef index_movie_ratings_task(movies): es_hook = ElasticsearchPythonHook(hosts=None, es_conn_args={ "cloud_id": "cloud_id" "api_key": "api-key" }) es_client = es_hook.get_conn actions = [] for movie in ast.literal_eval(movies): actions.append( { "update": { "_id": movie["id"], "_index": "movies" } } ) actions.append( { "doc": { "rating": movie["rating"] }, "doc_as_upsert": True } ) result = es_client.bulk(operations=actions) print(f"Ingestion completed.") print(result) return Truedef get_movie_ratings_task(): movies = [ {"id": i, "rating": round(random.uniform(1, 10), 1)} for i in range(1, 100) ] return moviesdef validate_result(result): if not result: return 'failed_get_rating_operator' else: return 'index_movie_ratings'with DAG( dag_id="update_ratings_movies_2024", start_date=datetime(2024, 12, 29), schedule="@daily", doc_md=__doc__,): get_ratings_operator = PythonOperator( task_id='get_movie_ratings', python_callable=get_movie_ratings_task ) validate_result = BranchPythonOperator( task_id='validate_result', python_callable=validate_result, op_args=["{ { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"], provide_context=True ) index_ratings_operator = PythonOperator( task_id='index_movie_ratings', python_callable=index_movie_ratings_task, op_args=["{ { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"] ) failed_get_rating_operator = PythonOperator( task_id='failed_get_rating_operator', python_callable=lambda: print('Ratings were False, skipping indexing.') )get_ratings_operator >> validate_result >>
可视化 DAG 执行
在 Apache Airflow 界面中,我们可以可视化 DAG 的执行。只需转到 “DAG” 选项卡并找到你创建的 DAG 即可。
https://i-blog.csdnimg.cn/direct/30cbed395ee040329068cbcf16e0d830.webp
下面,我们可以直观地看到任务的执行情况及其各自的状态。通过选择特定日期的执行,我们可以访问每个任务的日志。请注意,在 index_movie_ratings 任务中,我们可以在索引中看到索引结果,而且它已乐成完成。
https://i-blog.csdnimg.cn/direct/bc89859028ab49e4a55e6b23ce6b6713.webp
在其他选项卡中,可以访问有关任务和 DAG 的其他信息,以协助分析和解决潜在题目。
结论
在本文中,我们演示了如何将 Apache Airflow 与 Elasticsearch 集成以创建数据提取解决方案。我们展示了如何设置 DAG、界说负责检索、验证和索引电影数据的任务,以及如何在 Airflow 界面中监控和可视化这些任务的执行。
这种方法可以轻松适应不同范例的数据和工作流,使 Airflow 成为在各种场景中编排数据管道的有用工具。
参考资料:
Apache AirFlow
[*]https://airflow.apache.org/
使用 Docker 安装 Apache Airflow
[*]https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
Elasticsearch Python Hook
[*]https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html
Python 运算符
[*]https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html
想要得到 Elastic 认证?相识下一期 Elasticsearch 工程师培训何时开始!
Elasticsearch 包罗很多新功能,可帮助你为你的用例构建最佳搜刮解决方案。深入相识我们的示例笔记本以相识更多信息,开始免费云试用,或立刻在吗的本地呆板上试用 Elastic。
原文:How to ingest data to Elasticsearch through Apache Airflow - Elasticsearch Labs
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]