Flink写入Redis集群 重写flink-connector-redis包

打印 上一主题 下一主题

主题 894|帖子 894|积分 2682

起因:使用flink的时候难免和redis打交道,相信大家都使用过flink-connector-redis来处理,但是当我想要使用RedisSink写入集群时,发现居然不支持使用密码,于是有了这篇笔记。
 
事情的经过是这样的,我准备用Flink往Redis写入数据,我照常引入flink-connector-redis包
  1.         <dependency>
  2.             <groupId>org.apache.flink</groupId>
  3.             <artifactId>flink-connector-redis_2.11</artifactId>
  4.             <version>1.1.5</version>
  5.         </dependency>
复制代码
然后洋洋洒洒写下如下代码:
  1. package org.cube.flink
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. import org.apache.flink.streaming.connectors.redis.RedisSink
  4. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
  5. import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
  6. import java.net.InetSocketAddress
  7. import java.util.HashSet
  8. /**
  9. * @Author : Lawrence
  10. * @Date : 2022/7/24 23:11
  11. * @Description : Flink结果写入Redis集群
  12. * @Version : 1.0.0
  13. * @Modification_Record:
  14. * Version  Date       Remark
  15. * v1.0.0   2022/7/24  First Create
  16. */
  17. object RedisClusterSink {
  18.   def main(args: Array[String]): Unit = {
  19.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  20.     env.setParallelism(1)
  21.     // source
  22.     import org.apache.flink.api.scala._
  23.     val source = env.fromElements("1 hadoop","2 spark","3 flink", "4 hive", "5 redis", "6 hbase")
  24.     // process
  25.     val tupleValue = source.map(_.split(" ")).map(x => (x(0), x(1)))
  26.     // redis config
  27.     val builder = new FlinkJedisPoolConfig.Builder
  28.     builder.setHost("cube01").setPort(7001).setPassword("123456")
  29.     val redisConf: FlinkJedisPoolConfig = builder.build()
  30.     // sink
  31.     val redisSink = new RedisSink[(String, String)](redisConf, new MyRedisMapper())
  32.     tupleValue.addSink(redisSink)
  33.     env.execute("RedisClusterSink")
  34.   }
  35. }
  36. class MyRedisMapper extends RedisMapper[(String, String)] {
  37.   override def getCommandDescription: RedisCommandDescription = {
  38.     new RedisCommandDescription(RedisCommand.SET)
  39.   }
  40.   override def getKeyFromData(t: (String, String)): String = t._1
  41.   override def getValueFromData(t: (String, String)): String = t._2
  42. }
复制代码
然后兴高采烈地点击了运行,控制台却给了我一抹中国红,
其中最后一条是这样说的:
  1. Caused by: redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 9842 192.168.20.132:7003
复制代码
哦哦,是因为我的Redis是集群模式,
这并难不倒我,
我只需要把FlinkJedisPoolConfig改成FlinkJedisClusterConfig就万事大吉了。
  1.     // redis config
  2.     val builder = new FlinkJedisClusterConfig.Builder
  3.     val inetSocketAddress = new InetSocketAddress("cube01", 7001)
  4.     val nodeSet = new HashSet[InetSocketAddress]()
  5.     nodeSet.add(inetSocketAddress)
  6.     builder.setNodes(nodeSet).setPassword("123456")
  7.     val redisConf: FlinkJedisClusterConfig = builder.build()
复制代码
可是,这个类并没有setPassword方法,事实上它连"password"这个属性都没有。
这并难不倒我。
先不设密码总行了吧?
燃鹅并不行,控制台又给了我一抹中国红,
他是这样说的:
  1. Caused by: redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required.
复制代码
呵呵,这可难不倒我,
我的本能反应是,应该到Maven仓库中找到新版的flink-connector-redis包。
燃鹅,当我搜索之后发现,这已经是最新版了。

这也难不倒我。 
FlinkJedisPoolConfig不是可以设置密码吗?
FlinkJedisClusterConfig不是可以访问集群吗?
如果我把他们两个的代码整合一下呢?是不是就好了。
于是我本能地把"FlinkJedisClusterConfig"改写成了"MyFlinkJedisClusterConfig"类,增加了password属性和对应的get,set方法。
  1. package org.cube.flink;
  2. /**
  3. * @Author : Lawrence
  4. * @Date : 2022/7/25 21:14
  5. * @Description : 包含了password的FlinkJedisClusterConfig
  6. * @Version : 1.0.0
  7. * @Modification_Record:
  8. * Version  Date       Remark
  9. * v1.0.0   2022/7/25  First Create
  10. */
  11. import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
  12. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
  13. import org.apache.flink.util.Preconditions;
  14. import redis.clients.jedis.HostAndPort;
  15. import redis.clients.jedis.Protocol;
  16. import java.net.InetSocketAddress;
  17. import java.util.HashSet;
  18. import java.util.Iterator;
  19. import java.util.Set;
  20. public class MyFlinkJedisClusterConfig extends FlinkJedisConfigBase {
  21.     private static final long serialVersionUID = 1L;
  22.     private final Set<InetSocketAddress> nodes;
  23.     private final int maxRedirections;
  24.     private int soTimeout;
  25.     private String password;
  26.     private MyFlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int soTimeout,
  27.                                       int maxRedirections, int maxTotal, int maxIdle, int minIdle, String password) {
  28.         super(connectionTimeout, maxTotal, maxIdle, minIdle);
  29.         Preconditions.checkNotNull(nodes, "Node information should be presented");
  30.         Preconditions.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
  31.         this.nodes = new HashSet(nodes);
  32.         this.soTimeout = soTimeout;
  33.         this.maxRedirections = maxRedirections;
  34.         this.password = password;
  35.     }
  36.     public Set<HostAndPort> getNodes() {
  37.         Set<HostAndPort> ret = new HashSet();
  38.         Iterator var2 = this.nodes.iterator();
  39.         while(var2.hasNext()) {
  40.             InetSocketAddress node = (InetSocketAddress)var2.next();
  41.             ret.add(new HostAndPort(node.getHostName(), node.getPort()));
  42.         }
  43.         return ret;
  44.     }
  45.     public int getMaxRedirections() {
  46.         return this.maxRedirections;
  47.     }
  48.     public int getSoTimeout() { return this.soTimeout; }
  49.     protected String getPassword() { return this.password; }
  50.     public String toString() {
  51.         return "JedisClusterConfig{nodes=" + this.nodes + ", timeout=" + this.connectionTimeout
  52.                 + ", maxRedirections=" + this.maxRedirections + ", maxTotal=" + this.maxTotal
  53.                 + ", maxIdle=" + this.maxIdle + ", minIdle=" + this.minIdle + '}';
  54.     }
  55.     public static class Builder {
  56.         private Set<InetSocketAddress> nodes;
  57.         private int timeout = Protocol.DEFAULT_TIMEOUT;
  58.         private int maxRedirections = 5;
  59.         //新增属性
  60.         private int soTimeout = Protocol.DEFAULT_TIMEOUT;
  61.         private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
  62.         private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
  63.         private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
  64.         //增加的属性
  65.         private String password;
  66.         public Builder() {
  67.         }
  68.         public MyFlinkJedisClusterConfig.Builder setNodes(Set<InetSocketAddress> nodes) {
  69.             this.nodes = nodes;
  70.             return this;
  71.         }
  72.         public MyFlinkJedisClusterConfig.Builder setTimeout(int timeout) {
  73.             this.timeout = timeout;
  74.             return this;
  75.         }
  76.         public MyFlinkJedisClusterConfig.Builder setSoTimeout(int soTimeout) {
  77.             this.soTimeout = soTimeout;
  78.             return this;
  79.         }
  80.         public MyFlinkJedisClusterConfig.Builder setMaxRedirections(int maxRedirections) {
  81.             this.maxRedirections = maxRedirections;
  82.             return this;
  83.         }
  84.         public MyFlinkJedisClusterConfig.Builder setMaxTotal(int maxTotal) {
  85.             this.maxTotal = maxTotal;
  86.             return this;
  87.         }
  88.         public MyFlinkJedisClusterConfig.Builder setMaxIdle(int maxIdle) {
  89.             this.maxIdle = maxIdle;
  90.             return this;
  91.         }
  92.         public MyFlinkJedisClusterConfig.Builder setMinIdle(int minIdle) {
  93.             this.minIdle = minIdle;
  94.             return this;
  95.         }
  96.         public MyFlinkJedisClusterConfig.Builder setPassword(String password) {
  97.             this.password = password;
  98.             return this;
  99.         }
  100.         public MyFlinkJedisClusterConfig build() {
  101.             return new MyFlinkJedisClusterConfig(this.nodes, this.timeout, this.soTimeout,
  102.                     this.maxRedirections, this.maxTotal, this.maxIdle, this.minIdle, this.password);
  103.         }
  104.     }
  105. }
复制代码
燃鹅,中国红却提醒我:
  1. Caused by: java.lang.IllegalArgumentException: Jedis configuration not found
复制代码
原来,Flink任务执行的时候会调用RedisSink中的open()方法:
  1.     public void open(Configuration parameters) throws Exception {
  2.         this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
  3.     }
复制代码
而这个方法调用的"RedisCommandsContainerBuilder.build"方法,所使用的参数,依然是旧的FlinkJedisClusterConfig类:
  1.     public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase)
复制代码
所以,还得改写这两个类:
MyRedisSink:
  1. package org.cube.flink;
  2. /**
  3. * @Author : Lawrence
  4. * @Date : 2022/7/25 23:52
  5. * @Description :
  6. * @Version : 1.0.0
  7. * @Modification_Record :
  8. * Version  Date       Remark
  9. * v1.0.0   2022/7/25  First Create
  10. */
  11. import org.apache.flink.configuration.Configuration;
  12. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  13. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
  14. import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
  15. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  16. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  17. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  18. import org.apache.flink.util.Preconditions;
  19. import org.slf4j.Logger;
  20. import org.slf4j.LoggerFactory;
  21. import java.io.IOException;
  22. public class MyRedisSink<IN> extends RichSinkFunction<IN> {
  23.     private static final long serialVersionUID = 1L;
  24.     private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.streaming.connectors.redis.RedisSink.class);
  25.     private String additionalKey;
  26.     private RedisMapper<IN> redisSinkMapper;
  27.     private RedisCommand redisCommand;
  28.     private FlinkJedisConfigBase flinkJedisConfigBase;
  29.     private RedisCommandsContainer redisCommandsContainer;
  30.     public MyRedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
  31.         Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
  32.         Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can not be null");
  33.         Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
  34.         this.flinkJedisConfigBase = flinkJedisConfigBase;
  35.         this.redisSinkMapper = redisSinkMapper;
  36.         RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
  37.         this.redisCommand = redisCommandDescription.getCommand();
  38.         this.additionalKey = redisCommandDescription.getAdditionalKey();
  39.     }
  40.     @Override
  41.     public void invoke(IN input) throws Exception {
  42.         String key = this.redisSinkMapper.getKeyFromData(input);
  43.         String value = this.redisSinkMapper.getValueFromData(input);
  44.         switch(this.redisCommand) {
  45.             case RPUSH:
  46.                 this.redisCommandsContainer.rpush(key, value);
  47.                 break;
  48.             case LPUSH:
  49.                 this.redisCommandsContainer.lpush(key, value);
  50.                 break;
  51.             case SADD:
  52.                 this.redisCommandsContainer.sadd(key, value);
  53.                 break;
  54.             case SET:
  55.                 this.redisCommandsContainer.set(key, value);
  56.                 break;
  57.             case PFADD:
  58.                 this.redisCommandsContainer.pfadd(key, value);
  59.                 break;
  60.             case PUBLISH:
  61.                 this.redisCommandsContainer.publish(key, value);
  62.                 break;
  63.             case ZADD:
  64.                 this.redisCommandsContainer.zadd(this.additionalKey, value, key);
  65.                 break;
  66.             case HSET:
  67.                 this.redisCommandsContainer.hset(this.additionalKey, key, value);
  68.                 break;
  69.             default:
  70.                 throw new IllegalArgumentException("Cannot process such data type: " + this.redisCommand);
  71.         }
  72.     }
  73.     @Override
  74.     public void open(Configuration parameters) throws Exception {
  75.         this.redisCommandsContainer = MyRedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
  76.     }
  77.     @Override
  78.     public void close() throws IOException {
  79.         if (this.redisCommandsContainer != null) {
  80.             this.redisCommandsContainer.close();
  81.         }
  82.     }
  83. }
复制代码
MyRedisCommandsContainerBuilder:
  1. package org.cube.flink;
  2. /**
  3. * @Author : Lawrence
  4. * @Date : 2022/7/25 21:30
  5. * @Description : 包含了password的RedisCommandsContainerBuilder
  6. * @Version : 1.0.0
  7. * @Modification_Record :
  8. * Version  Date       Remark
  9. * v1.0.0   2022/7/25  First Create
  10. */
  11. import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
  12. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
  13. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  14. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
  15. import org.apache.flink.streaming.connectors.redis.common.container.RedisClusterContainer;
  16. import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
  17. import org.apache.flink.streaming.connectors.redis.common.container.RedisContainer;
  18. import org.apache.flink.util.Preconditions;
  19. import redis.clients.jedis.JedisCluster;
  20. import redis.clients.jedis.JedisPool;
  21. import redis.clients.jedis.JedisSentinelPool;
  22. public class MyRedisCommandsContainerBuilder {
  23.     public MyRedisCommandsContainerBuilder() {
  24.     }
  25.     public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase) {
  26.         if (flinkJedisConfigBase instanceof FlinkJedisPoolConfig) {
  27.             FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig)flinkJedisConfigBase;
  28.             return build(flinkJedisPoolConfig);
  29.         } else if (flinkJedisConfigBase instanceof MyFlinkJedisClusterConfig) {
  30.             MyFlinkJedisClusterConfig flinkJedisClusterConfig = (MyFlinkJedisClusterConfig)flinkJedisConfigBase;
  31.             return build(flinkJedisClusterConfig);
  32.         } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
  33.             FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig)flinkJedisConfigBase;
  34.             return build(flinkJedisSentinelConfig);
  35.         } else {
  36.             throw new IllegalArgumentException("Jedis configuration not found");
  37.         }
  38.     }
  39.     public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
  40.         Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null");
  41.         GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
  42.         genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
  43.         genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
  44.         genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
  45.         JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), jedisPoolConfig.getPort()
  46.                 , jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(), jedisPoolConfig.getDatabase());
  47.         return new RedisContainer(jedisPool);
  48.     }
  49.     public static RedisCommandsContainer build(MyFlinkJedisClusterConfig jedisClusterConfig) {
  50.         Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null");
  51.         GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
  52.         genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
  53.         genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
  54.         genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
  55.         JedisCluster jedisCluster;
  56.         if (null == jedisClusterConfig.getPassword()) {
  57.             jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
  58.                     jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
  59.         } else
  60.             {
  61.             jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout()
  62.                     , jedisClusterConfig.getSoTimeout(), jedisClusterConfig.getMaxRedirections()
  63.                     , jedisClusterConfig.getPassword(), genericObjectPoolConfig);
  64.         }
  65.         return new RedisClusterContainer(jedisCluster);
  66.     }
  67.     public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
  68.         Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
  69.         GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
  70.         genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
  71.         genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
  72.         genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
  73.         JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName()
  74.                 , jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, jedisSentinelConfig.getConnectionTimeout()
  75.                 , jedisSentinelConfig.getSoTimeout(), jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
  76.         return new RedisContainer(jedisSentinelPool);
  77.     }
  78. }
复制代码
燃鹅,在重写"MyRedisCommandsContainerBuilder"类时,你会惊奇地发现,jedisCluster 也不支持密码。
你可千万别惯性思维去重新jedisCluster ,
因为这回可真的是版本问题了。
所以这依然难不倒我,
只需要把redis.clients包升级到2.9以上版本即可:
  1.                     redis.clients            jedis            2.9.1                <dependency>
  2.             <groupId>org.apache.flink</groupId>
  3.             <artifactId>flink-connector-redis_2.11</artifactId>
  4.             <version>1.1.5</version>
  5.         </dependency>                                        redis.clients                    jedis                                    
复制代码
好了,到这里咱们终于大功告成了。
 
代码写完了,但是咱们却留下一个疑惑,
为什么这么简单的需求却没有jar包更新呢?
我只是想把Flink数据写到带密码的Redis集群里,这过分吗?
这并不过分,那这又是为啥呢?
我想可能是这样的:
首先,先想一个问题,在流计算中我们往Redis写的是什么数据?
通常是一些状态信息,中间结果。而Flink本身支持状态、缓存和广播机制,导致对Redis的需求下降了。
其次,大数据应用实际运行的环境通常是提交到内网的机器上进行的,大家知道大数据集群之间的主机是需要设置免验证登录的,单单给Redis设密码显得有一点点多余。
其三,Redis的密码机制据说是很弱鸡的,出于安全考虑,更多地是通过防火墙来限制端口,所以很多Redis集群处于管理方便并没有设置密码的。
 其四,出于人类懒惰的本性,发现RedisSink不支持密码后,最省事的方式,或许是放弃使用密码。
 
好了该写的写了,该想的也想了,差不多可以愉快地结束这一天了。
那么晚安了,咱们下期再肝。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

愛在花開的季節

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表