记一次Flink遇到性能瓶颈

  金牌会员 | 2023-4-15 22:08:34 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 896|帖子 896|积分 2688

前言

这周的主要时间花在Flink上面,做了一个简单的从文本文件中读取数据,然后存入数据库的例子,能够正常的实现功能,但是遇到个问题,我有四台机器,自己搭建了一个standalone的集群,不论我把并行度设置多少,跑起来的耗时都非常接近,实在是百思不得其解。机器多似乎并不能帮助它。 把过程记录在此,看后面随着学习的深入能不能解答出这个问题。

尝试过的修复方法

集群搭建

出现这个问题后,我从集群的角度来进行了些修改,
1,机器是2核的,slots被设置成了6,那我就有点怀疑是这个设置问题,因为其实只有2核,设置的多了,反而存在抢占资源,导致运行达不到效果,改成2后效果一样,没有改进。这个参数在
taskmanager.numberOfTaskSlots: 2
2,调整内存, taskmanager 从2G调整为4G, 效果也没有变化。
taskmanager.memory.process.size: 4000m
这里说下这个内存,我们设置的是总的Memory,也就是这个Total Process Memory。

剔除掉些比较固定的Memory,剩下的大头就是这个Task Heap 和 Managed Memory。
所以我们调整大小后,它两个也就相应的增加了。  我查了下这两个,可以理解为堆内存和堆外内存,
一个是存放我们程序的对象,会被垃圾回收器回收;一个是堆外内存,比如RockDB 和 缓存 sort,hash 等的中间结果。
程序方面修改

最开始的时候我把保存数据库操作写在MapFunction里面,后来改到SinkFunction里面。
SinkFunction里面保存数据库的方法也进行了反复修改,从开始使用Spring的JdbcTemplate,换成后来直接使用最原始JDBC。 而且还踩了一个坑,开始的时候用的注入的JdbcTemplate, 本地运行没有问题,到了集群上面,发到别的机器的时候,注入的东西就是空的了。
换成原始的JDBC速度能提升不少, 我猜想这里的原因是jdbctemplate做了些多余的事情, JDBC打开一次,后面Invoke的时候就直接存了,效率要高些,所以速度上提升不少。
这里把部分代码贴出来, 在Open的时候就预加载好PreparedStatement,  Invoke的时候直接传参数,调用就可以了。
  1. public class SinkToMySQL2 extends RichSinkFunction<MarketPrice> {
  2.     private PreparedStatement updatePS;
  3.     private PreparedStatement insertPS;
  4.     private Connection connection;
  5.     @Override
  6.     public void open(Configuration parameters) throws Exception {
  7.         super.open(parameters);
  8.         HikariDataSource dataSource = new HikariDataSource();
  9.         connection = getConnection(dataSource);
  10.         if(connection != null)
  11.         {
  12.             String updateSQL = " update MarketPrice set open_price=?,high_price=?,low_price=?,close_price=? where performance_id = ? and price_as_of_date = ?";
  13.             updatePS = this.connection.prepareStatement(updateSQL);
  14.             String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
  15.             insertPS = this.connection.prepareStatement(insertSQL);
  16.         }
  17.     }
  18.     @Override
  19.     public void close() throws Exception {
  20.         super.close();
  21.         if (updatePS != null) {
  22.             updatePS.close();
  23.         }
  24.         if (insertPS != null) {
  25.             insertPS.close();
  26.         }
  27.         //关闭连接和释放资源
  28.         if (connection != null) {
  29.             connection.close();
  30.         }
  31.     }
  32.     /**
  33.      * 每条数据的插入都要调用一次 invoke() 方法
  34.      *
  35.      * @param marketPrice
  36.      * @param context
  37.      * @throws Exception
  38.      */
  39.     @Override
  40.     public void invoke(MarketPrice marketPrice, Context context) throws Exception {
  41.         log.info("start save for {}", marketPrice.getPerformanceId().toString() );
  42.         updatePS.setDouble(1,marketPrice.getOpenPrice());
  43.         updatePS.setDouble(2,marketPrice.getHighPrice());
  44.         updatePS.setDouble(3,marketPrice.getLowPrice());
  45.         updatePS.setDouble(4,marketPrice.getClosePrice());
  46.         updatePS.setString(5, marketPrice.getPerformanceId().toString());
  47.         updatePS.setInt(6, marketPrice.getPriceAsOfDate());
  48.         int result = updatePS.executeUpdate();
  49.         log.info("finish update for {} result {}", marketPrice.getPerformanceId().toString(), result);
  50.         if(result == 0)
  51.         {
  52.             String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
  53.             insertPS = this.connection.prepareStatement(insertSQL);
  54.             insertPS.setString(1, marketPrice.getPerformanceId().toString());
  55.             insertPS.setInt(2, marketPrice.getPriceAsOfDate());
  56.             insertPS.setDouble(3,marketPrice.getOpenPrice());
  57.             insertPS.setDouble(4,marketPrice.getHighPrice());
  58.             insertPS.setDouble(5,marketPrice.getLowPrice());
  59.             insertPS.setDouble(6,marketPrice.getClosePrice());
  60.             result = insertPS.executeUpdate();
  61.             log.info("finish save for {} result {}", marketPrice.getPerformanceId().toString(), result);
  62.         }
  63.     }
  64. }
复制代码
总结

从多个方面去改进,结果发现还是一样的,就是使用一台机器和使用三台机器,时间上一样的,再怀疑我只能怀疑是某台机器有问题,然后运行的时候,由最慢的机器决定了速度。 我在使用MapFunction的时候有观察到,有的时候,某台机器已经处理上千条,而有的只处理了几十条,到最后完成的时候,大家处理的数量又是很接近的。这样能够解释为什么机器多了,速度却是一样的。但是我没有办法找出哪台机器来。 我自己的本地运行,并行数设置的多,速度上面是有提升的,到了集群就碰到这样的现象,后面看能不能解决它, 先记录在此。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表