Airflow:HttpSensor实现API驱动数据流程

打印 上一主题 下一主题

主题 858|帖子 858|积分 2574

数据管道工作流通常依赖于api来访问、获取和处理来自外部体系的数据。为了处理这些场景,Apache Airflow提供了HttpSensor,这是一个内置的Sensor,用于监视HTTP请求的状态,并在满意指定条件时触发后续任务。在这篇博文中,我们将深入探究HttpSensor,涵盖它的特性、用例、实现、自界说和最佳实践。
  先容HttpSensor

界说:HttpSensor是 Airflow 中的一个传感器(Sensor)。传感器在 Airflow 中用于等候某些条件满意后再继续执行后续任务。HttpSensor专门用于检查 HTTP 端点是否返回预期的状态码,以此来判断某个 HTTP 服务是否可用或者某个网页是否可以正常访问等。

工作原理:它会发送 HTTP 请求到指定的端点(URL),然后检查响应的状态码。如果状态码符合预期(默认是 200),则传感器任务完成,允许工作流继续执行后续任务;如果状态码不符合预期或者请求出现错误(如毗连超时、网络题目等),则传感器会按照肯定的策略(如重试策略)不停地重新检查,直到满意条件或者到达重试上限。
应用场景:
HttpSensor 典型应用场景重要有以下几类:


  • 在数据管道方面,用于 ETL 流程中,在开始阶段检查数据来源(如 Web 服务、API)是否可用,确保数据能顺利提取,还用于数据堆栈更新时监控外部数据源。
  • 在微服务架构里,用于服务依赖监控,检查一个微服务所依赖的其他微服务是否正常运行,同时在容器化环境的服务发现中,确定新部署或扩展后的微服务是否可访问。
  • 对于网页和 Web 应用,一是进行网站可用性监测,定期检查网站重要页面状态码,实时发现故障;二是用于内容更新验证,检查内容更新后页面是否可访问、内容是否更新成功,以保障 Web 应用正常运行。
HttpSensor示例



  • 环境预备
    假设已经安装并配置好 Airflow。如果没有,请先安装 Airflow 并初始化数据库(比方,利用airflow initdb下令)。
  • 界说 DAG(有向无环图)和 HttpSensor 任务
    以下是简单的 Airflow DAG 示例,此中包罗一个HttpSensor任务,用于检查一个示例网站(这里以https://www.example.com为例)是否可以正常访问。
  1. from airflow import DAG
  2. from airflow.operators.http_operator import SimpleHttpOperator
  3. from airflow.sensors.http_sensor import HttpSensor
  4. from datetime import datetime, timedelta
  5. # 设置默认参数
  6. default_args = {
  7.     'owner': 'airflow',
  8.    'start_date': datetime(2023, 1, 1),
  9.    'retries': 3,
  10.    'retry_delay': timedelta(minutes=5)
  11. }
  12. # 创建DAG对象
  13. dag = DAG(
  14.     'http_sensor_example',
  15.     default_args=default_args,
  16.     schedule_interval='@daily'
  17. )
  18. # 创建HttpSensor任务
  19. check_website = HttpSensor(
  20.     task_id='check_website',
  21.     http_conn_id='http_default',
  22.     endpoint='https://www.example.com',
  23.     poke_interval=60,  # 每60秒检查一次
  24.     dag=dag
  25. )
  26. # 创建一个简单的后续任务,用于在网站可访问后执行
  27. def print_message():
  28.     print("网站可以正常访问!")
  29.    
  30. send_message = SimpleHttpOperator(
  31.     task_id='send_message',
  32.     http_conn_id='http_default',
  33.     endpoint='/',
  34.     method='GET',
  35.     python_callable=print_message,
  36.     dag=dag
  37. )
  38. # 设置任务依赖关系
  39. check_website >> send_message
复制代码
代码表明


  • 导入模块
    首先导入必要的 Airflow 模块,包罗DAG用于界说工作流,SimpleHttpOperator用于发送简单的 HTTP 请求操作(这里用于后续的简单演示),HttpSensor用于检查 HTTP 端点的传感器,以及日期时间干系的模块用于设置工作流的开始日期等参数。
  • 设置默认参数
    界说default_args字典,此中包罗工作流的全部者(owner)、开始日期(start_date)、重试次数(retries)和重试延迟(retry_delay)等参数。这些参数将被应用到 DAG 中的全部任务。
  • 创建 DAG 对象
    利用DAG类创建一个名为http_sensor_example的 DAG 对象,指定了之前界说的默认参数和调度间隔(schedule_interval),这里设置为每天执行一次(@daily)。
  • 创建 HttpSensor 任务
    创建HttpSensor任务,命名为check_website。http_conn_id通常用于指定 Airflow 中预界说的 HTTP 毗连配置(这里利用http_default,可以根据实际情况修改和配置),endpoint是要检查的 HTTP 端点的 URL。poke_interval表现检查的间隔时间,单元是秒。
  • 创建后续任务
    界说一个简单的函数print_message,用于在网站可以正常访问后打印一条消息。然后利用SimpleHttpOperator创建一个名为send_message的任务,这个任务在check_website任务成功后执行,它会发送一个简单的 GET 请求到网站根目次(/),而且在请求时调用print_message函数。
  • 设置任务依赖关系
    通过check_website >> send_message设置任务的依赖关系,确保check_website任务成功完成后才会执行send_message任务。
运行和监控 DAG



  • 启动 Airflow 服务
    启动 Airflow 的 Web 服务器(airflow webserver)和调度器(airflow scheduler),以便可以在 Web 界面中查看和管理 DAG。
  • 在 Web 界面中操作
    打开 Airflow 的 Web 界面(通常是http://localhost:8080),在 DAG 列表中找到http_sensor_example,可以手动触发 DAG 执行,或者等候按照调度间隔自动执行。在执行过程中,可以在任务实例页面查看HttpSensor任务的状态,观察它是否成功检查到网站可以正常访问,以及后续任务是否准确执行。
请注意,在实际利用中,大概必要根据具体的网络环境、要检查的 HTTP 端点的特性以及业务需求等因素,调整HttpSensor的参数(如poke_interval、retries、retry_delay等)和其他干系任务的设置。同时,要确保 Airflow 的配置准确,而且具有访问目标 HTTP 端点的网络权限。
自界说HttpSensor 行为

HttpSensor提供了几个参数,你可以用它们来定制它的行为:


  • http_conn_id: HTTP服务器的毗连ID,引用Airflow界面中创建的毗连。
  • endpoint:发送请求的API端点(路径)。
  • method:用于请求的HTTP方法(比方,‘GET’、‘POST’、‘PUT’等)。
  • headers:要包罗在HTTP请求中的header字典。
  • data:在POST和PUT等方法的请求体中发送的数据。
  • response_check:一个Python可调用函数(比方lambda函数),它担当HTTP响应作为参数并返回bool值
  • mode:传感器的工作模式。默认情况下,它利用“poke”模式,定期检查所需的条件。
  • timeout:传感器在失败前等候所需条件满意的最大时间(以秒为单元)。缺省情况下,没有超时。
  • poke_interval:检查所需条件之间的时间间隔(以秒为单元)。默认值是60秒。

最佳实践



  • 利用形貌性的task_id:确保为HttpSensors利用清楚且有意义的task_id,以提高dag的可读性和可维护性。
  • 设置得当的超时:为HttpSensor设置合理的超时,以避免让任务无穷期地等候API可用或完成处理。这有助于防止资源耗尽,并确保如果在预期的时间范围内没有满意所需的条件,管道可以正常失败。
  • 调整间隔:根据具体用例自界说poke_interval。如果API的响应时间不确定,你大概盼望利用更长的间隔来避免过多的轮询。相反,如果盼望API快速响应,则较短的间隔大概更符合。
  • 处理API身份验证:如果你的API必要身份验证,请确保在HTTP毗连设置中设置得当的身份验证方法(比方,基自己份验证,令牌身份验证等)。
  • 利用response_check可调用对象:始终界说一个response_check可调用对象,它正确地反映了HttpSensor所需的条件。这允许传感器在继续执行下一个任务之前确定API的响应是否满意要求。
总结

Apache Airflow HttpSensor是功能强大的通用工具,用于监控数据管道中外部api的状态。通过了解它的各种用例和参数,你可以创建高效的工作流,可以在继续之前等候特定的API条件得到满意。在继续利用Apache Airflow 时,请记住利用HttpSensor的强大功能来有效地监视和管理dag中api驱动的依赖项。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

反转基因福娃

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表