通过 Flink SQL 使用 Hive 表丰富流

锦通  金牌会员 | 2024-8-30 20:36:15 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 554|帖子 554|积分 1662

1. 先容

  流处理是通过在数据活动时对数据应用逻辑来创造贸易代价。很多时候,这涉及组合数据源以丰富数据流。Flink SQL 实行此操纵并将您应用于数据的任何函数的结果定向到接收器中。业务用例,例如欺诈检测、广告印象跟踪、医疗保健数据丰富、增加财务付出信息、GPS 设备数据丰富或个性化客户通信,都是使用蜂巢表来丰富数据流的很好的例子。 因此,Hive 表与 Flink SQL 有两种常见的用例:
  

  •          Lookup(查找)表用于丰富数据流
  •          用于写入 Flink 结果的接收器
  对于这些用例中的任何一个,还有两种方法可以使用 Hive 表。您可以使用 Hive catalog,也可以使用 Flink DDL 中使用的 Flink JDBC 连接器。让我们讨论一下它们是如何工作的,以及它们的优点和缺点是什么。
  2. 在 SQL Stream Builder 中注册 Hive Catalog

  SQL Stream Builder (SSB) 旨在为分析师提供无代码界面中 Flink 的强大功能。SSB 有一种注册Hive Catalog的简单方法:
  

  •          单击侧边栏上的“Data Provider”菜单
  •          单击下方框中的“Register Catalog”
  •          选择“Hive”作为Catalog类型
  •          给它起个名字
  •          声明你的默认数据库
  •          点击“验证”
  •          验证乐成后,点击“创建”
  完成上述步调后,您的 Hive 表将在您选择它作为活动Catalog后表现在表列表中。目前,通过Catalog概念,当直接从 HDFS 访问以进行读取或写入时,Flink 仅支持非事件性 Hive 表。
  3. 将 Flink DDL 与 JDBC 连接器联合使用

  使用 Flink JDBC 连接器,可以直接从控制台屏幕为任何 Hive 表创建 Flink 表,此中可以提供表的 Flink DDL 创建脚本。这将为 Hive DB 和表名指定一个 URL。无论其类型如何,都可以通过这种方式访问全部 Hive 表。JDBC DDL 语句甚至可以通过“模板”生成。点击“Templates”->“jdbc”,控制台会将代码粘贴到编辑器中。
  1. CREATE TABLE `ItemCategory_transactional_jdbc_2` (<br /><br /> `id` VARCHAR(2147483647),<br /> `category` VARCHAR(2147483647)<br />) WITH (<br /> ‘connector’ = ‘jdbc’,<br /> ‘lookup.cache.ttl’ = ‘10s’,<br /> ‘lookup.cache.max.rows’ = ‘10000’,<br /> ‘tablename’ = ‘item_category_transactional’,<br /> ‘url’ = ‘jdbc:hive2://<host>:<port>/default’<br />)<br /><br />Using a Hive table as a lookup table<br />
复制代码
Hive 表通常用作查找表以丰富 Flink 流。Flink 能够缓存在 Hive 表中找到的数据以提高性能。需要设置 FOR SYSTEM_TIME AS OF 子句来告诉 Flink 与时态表连接。有关详细信息,请查看相关的 Flink 文档。
  1. SELECT t.itemId, i.category<br />FROM TransactionsTable t<br />LEFT JOIN ItemCategory_transactional_jdbc FOR SYSTEM_TIME AS OF t.event_time i ON i.id = t.itemId<br />Hive Catalog tables<br />
复制代码
对于 Hive Catalog 表,可以使用Hive 表的属性“lookup.join.cache.ttl”(此值的默认值为一小时)设置缓存查找表的 TTL(生存时间),就像 Beeline 中的这样或Hue:
  优点: 不需要界说 DDL,一个简单的 Hive Catalog就可以了。 缺点:仅实用于非事件性表
  3.1. 使用 JDBC 连接器的 Flink DDL 表

  使用带有 JDBC 连接器的 Hive 表时,默认环境下没有缓存,这意味着Flink 会为每个需要丰富的条目连接 Hive!我们可以通过在 DDL 下令中指定两个属性lookup.cache.max-rows和lookup.cache.ttl来更改它。
  Flink 会先查找缓存,只有在缓存缺失时才向外部数据库发送请求,并用返回的行更新缓存。当缓存到达最大缓存行lookup.cache.max-rows或当行超过lookup.cache.ttl的最长时间时,缓存中最旧的行将逾期。缓存的行可能不是最新的。一些用户可能希望通过调解 lookup.cache.ttl 来更频繁地刷新数据,但这可能会增加发送到数据库的请求数。用户将不得不均衡缓存数据的吞吐量和希奇度。
  1. CREATE TABLE `ItemCategory_transactional_jdbc_2` (<br /><br /> `id` VARCHAR(2147483647),<br /> `category` VARCHAR(2147483647)<br />) WITH (<br /> ‘connector’ = `jdbc’,<br /> ‘lookup.cache.ttl’ = ‘10s’,<br /> ‘lookup.cache.max-rows’ = ‘10000’,<br /> ‘table-name’ = ‘item_category_transactional’,<br /> ‘url’ = ‘jdbc:hive2://<host>:<port>/default’<br /><br />)<br /><br />Pros: All Hive tables can be accessed this way, and the caching<br />
复制代码
请留意缓存参数——这是我们确保良好的JOIN性能与来自 Hive 的新数据均衡的方式,根据需要进行调解。
  4. 使用 Hive 表作为接收器

  将 Flink 作业的输出保存到 Hive 表中,可以让我们存储处理过的数据以满意各种需求。为此,可以使用INSERT INTO语句并将查询结果写入指定的 Hive 表。请留意,您可能必须使用 Hive ACID 表调解 JDBC 接收器作业的检查点超时持续时间。
  1. INSERT INTO ItemCategory_transactional_jdbc_2
  2. SELECT t.itemId, i.category<br />FROM TransactionsTable t<br />LEFT JOIN ItemCategory_transactional_jdbc FOR SYSTEM_TIME AS OF t.event_time i ON i.id = t.itemId<br />Hive Catalog tables<br />No DDL needs to be written. Only non-transactional tables are supported, thus it only works with append-only streams.
  3. Flink DDL tables with JDBC connector
  4. With this option upsert type data can be written into transactional tables. In order to be able to do that a primary key should be defined.
  5. CREATE TABLE `ItemCategory_transactional_jdbc_sink` (
  6.  `id` STRING,
  7.  `category` STRING,
  8.  PRIMARY KEY (`id`) NOT ENFORCED
  9. ) WITH (
  10.  ‘connector’ = ‘jdbc’,
  11.  ‘table-name’ = ‘item_category_transactional_sink’,
  12.  ‘url’ = ‘jdbc:hive2://<host>:<port>/default’
  13. )
复制代码
当这个作业实行时,Flink 将覆盖全部具有相同主键值的记录,如果它已经存在于表中。这也实用于更新插入流以及事件性 Hive 表。
  5. 结论

  我们已经先容了如何使用 SSB 通过 Hive 表丰富 Flink 中的数据流,以及如何使用 Hive 表作为 Flink 结果的接收器。这在涉及使用查找数据丰富数据流的许多业务用例中非常有用。我们深入探讨了使用 Hive 表的差异方法。我们还讨论了差异方法的优缺点以及各种与缓存相关的选项以提高性能。有了这些信息,您就可以决定哪种方法最适合您。
  如果您想切身体验 SQL Stream Builder,请务必立即下载社区版!
  原文作者:Jimit Patel, and Ferenc Csaky
  原文链接:https://blog.cloudera.com/enriching-streams-with-hive-tables-via-flink-sql/
  本文由 mdnice 多平台发布

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

锦通

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表