愛在花開的季節 发表于 2022-9-3 10:32:12

Flink写入Redis集群 重写flink-connector-redis包

起因:使用flink的时候难免和redis打交道,相信大家都使用过flink-connector-redis来处理,但是当我想要使用RedisSink写入集群时,发现居然不支持使用密码,于是有了这篇笔记。
 
事情的经过是这样的,我准备用Flink往Redis写入数据,我照常引入flink-connector-redis包
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
      </dependency>然后洋洋洒洒写下如下代码:
package org.cube.flink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

import java.net.InetSocketAddress
import java.util.HashSet

/**
* @Author : Lawrence
* @Date : 2022/7/24 23:11
* @Description : Flink结果写入Redis集群
* @Version : 1.0.0
* @Modification_Record:
* VersionDate       Remark
* v1.0.0   2022/7/24First Create
*/
object RedisClusterSink {
def main(args: Array): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // source
    import org.apache.flink.api.scala._
    val source = env.fromElements("1 hadoop","2 spark","3 flink", "4 hive", "5 redis", "6 hbase")

    // process
    val tupleValue = source.map(_.split(" ")).map(x => (x(0), x(1)))

    // redis config
    val builder = new FlinkJedisPoolConfig.Builder
    builder.setHost("cube01").setPort(7001).setPassword("123456")
    val redisConf: FlinkJedisPoolConfig = builder.build()

    // sink
    val redisSink = new RedisSink[(String, String)](redisConf, new MyRedisMapper())

    tupleValue.addSink(redisSink)

    env.execute("RedisClusterSink")
}
}

class MyRedisMapper extends RedisMapper[(String, String)] {
override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.SET)
}

override def getKeyFromData(t: (String, String)): String = t._1

override def getValueFromData(t: (String, String)): String = t._2
}然后兴高采烈地点击了运行,控制台却给了我一抹中国红,
其中最后一条是这样说的:
Caused by: redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 9842 192.168.20.132:7003哦哦,是因为我的Redis是集群模式,
这并难不倒我,
我只需要把FlinkJedisPoolConfig改成FlinkJedisClusterConfig就万事大吉了。
    // redis config
    val builder = new FlinkJedisClusterConfig.Builder
    val inetSocketAddress = new InetSocketAddress("cube01", 7001)
    val nodeSet = new HashSet()
    nodeSet.add(inetSocketAddress)
    builder.setNodes(nodeSet).setPassword("123456")
    val redisConf: FlinkJedisClusterConfig = builder.build()可是,这个类并没有setPassword方法,事实上它连"password"这个属性都没有。
这并难不倒我。
先不设密码总行了吧?
燃鹅并不行,控制台又给了我一抹中国红,
他是这样说的:
Caused by: redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required.呵呵,这可难不倒我,
我的本能反应是,应该到Maven仓库中找到新版的flink-connector-redis包。
燃鹅,当我搜索之后发现,这已经是最新版了。
https://img2022.cnblogs.com/blog/1139383/202207/1139383-20220728225928206-971465484.png
这也难不倒我。 
FlinkJedisPoolConfig不是可以设置密码吗?
FlinkJedisClusterConfig不是可以访问集群吗?
如果我把他们两个的代码整合一下呢?是不是就好了。
于是我本能地把"FlinkJedisClusterConfig"改写成了"MyFlinkJedisClusterConfig"类,增加了password属性和对应的get,set方法。
package org.cube.flink;

/**
* @Author : Lawrence
* @Date : 2022/7/25 21:14
* @Description : 包含了password的FlinkJedisClusterConfig
* @Version : 1.0.0
* @Modification_Record:
* VersionDate       Remark
* v1.0.0   2022/7/25First Create
*/
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.util.Preconditions;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Protocol;

import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

public class MyFlinkJedisClusterConfig extends FlinkJedisConfigBase {
    private static final long serialVersionUID = 1L;
    private final Set<InetSocketAddress> nodes;
    private final int maxRedirections;
    private int soTimeout;
    private String password;

    private MyFlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int soTimeout,
                                    int maxRedirections, int maxTotal, int maxIdle, int minIdle, String password) {
      super(connectionTimeout, maxTotal, maxIdle, minIdle);
      Preconditions.checkNotNull(nodes, "Node information should be presented");
      Preconditions.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
      this.nodes = new HashSet(nodes);
      this.soTimeout = soTimeout;
      this.maxRedirections = maxRedirections;
      this.password = password;
    }

    public Set<HostAndPort> getNodes() {
      Set<HostAndPort> ret = new HashSet();
      Iterator var2 = this.nodes.iterator();

      while(var2.hasNext()) {
            InetSocketAddress node = (InetSocketAddress)var2.next();
            ret.add(new HostAndPort(node.getHostName(), node.getPort()));
      }

      return ret;
    }
    public int getMaxRedirections() {
      return this.maxRedirections;
    }
    public int getSoTimeout() { return this.soTimeout; }
    protected String getPassword() { return this.password; }

    public String toString() {
      return "JedisClusterConfig{nodes=" + this.nodes + ", timeout=" + this.connectionTimeout
                + ", maxRedirections=" + this.maxRedirections + ", maxTotal=" + this.maxTotal
                + ", maxIdle=" + this.maxIdle + ", minIdle=" + this.minIdle + '}';
    }

    public static class Builder {
      private Set<InetSocketAddress> nodes;
      private int timeout = Protocol.DEFAULT_TIMEOUT;
      private int maxRedirections = 5;
      //新增属性
      private int soTimeout = Protocol.DEFAULT_TIMEOUT;
      private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
      private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
      private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
      //增加的属性
      private String password;

      public Builder() {
      }

      public MyFlinkJedisClusterConfig.Builder setNodes(Set<InetSocketAddress> nodes) {
            this.nodes = nodes;
            return this;
      }

      public MyFlinkJedisClusterConfig.Builder setTimeout(int timeout) {
            this.timeout = timeout;
            return this;
      }

      public MyFlinkJedisClusterConfig.Builder setSoTimeout(int soTimeout) {
            this.soTimeout = soTimeout;
            return this;
      }

      public MyFlinkJedisClusterConfig.Builder setMaxRedirections(int maxRedirections) {
            this.maxRedirections = maxRedirections;
            return this;
      }

      public MyFlinkJedisClusterConfig.Builder setMaxTotal(int maxTotal) {
            this.maxTotal = maxTotal;
            return this;
      }

      public MyFlinkJedisClusterConfig.Builder setMaxIdle(int maxIdle) {
            this.maxIdle = maxIdle;
            return this;
      }

      public MyFlinkJedisClusterConfig.Builder setMinIdle(int minIdle) {
            this.minIdle = minIdle;
            return this;
      }

      public MyFlinkJedisClusterConfig.Builder setPassword(String password) {
            this.password = password;
            return this;
      }

      public MyFlinkJedisClusterConfig build() {
            return new MyFlinkJedisClusterConfig(this.nodes, this.timeout, this.soTimeout,
                  this.maxRedirections, this.maxTotal, this.maxIdle, this.minIdle, this.password);
      }
    }
}燃鹅,中国红却提醒我:
Caused by: java.lang.IllegalArgumentException: Jedis configuration not found原来,Flink任务执行的时候会调用RedisSink中的open()方法:
    public void open(Configuration parameters) throws Exception {
      this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
    }而这个方法调用的"RedisCommandsContainerBuilder.build"方法,所使用的参数,依然是旧的FlinkJedisClusterConfig类:
    public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase)所以,还得改写这两个类:
MyRedisSink:
package org.cube.flink;

/**
* @Author : Lawrence
* @Date : 2022/7/25 23:52
* @Description :
* @Version : 1.0.0
* @Modification_Record :
* VersionDate       Remark
* v1.0.0   2022/7/25First Create
*/
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class MyRedisSink<IN> extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.streaming.connectors.redis.RedisSink.class);
    private String additionalKey;
    private RedisMapper<IN> redisSinkMapper;
    private RedisCommand redisCommand;
    private FlinkJedisConfigBase flinkJedisConfigBase;
    private RedisCommandsContainer redisCommandsContainer;

    public MyRedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
      Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
      Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can not be null");
      Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
      this.flinkJedisConfigBase = flinkJedisConfigBase;
      this.redisSinkMapper = redisSinkMapper;
      RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
      this.redisCommand = redisCommandDescription.getCommand();
      this.additionalKey = redisCommandDescription.getAdditionalKey();
    }

    @Override
    public void invoke(IN input) throws Exception {
      String key = this.redisSinkMapper.getKeyFromData(input);
      String value = this.redisSinkMapper.getValueFromData(input);
      switch(this.redisCommand) {
            case RPUSH:
                this.redisCommandsContainer.rpush(key, value);
                break;
            case LPUSH:
                this.redisCommandsContainer.lpush(key, value);
                break;
            case SADD:
                this.redisCommandsContainer.sadd(key, value);
                break;
            case SET:
                this.redisCommandsContainer.set(key, value);
                break;
            case PFADD:
                this.redisCommandsContainer.pfadd(key, value);
                break;
            case PUBLISH:
                this.redisCommandsContainer.publish(key, value);
                break;
            case ZADD:
                this.redisCommandsContainer.zadd(this.additionalKey, value, key);
                break;
            case HSET:
                this.redisCommandsContainer.hset(this.additionalKey, key, value);
                break;
            default:
                throw new IllegalArgumentException("Cannot process such data type: " + this.redisCommand);
      }

    }

    @Override
    public void open(Configuration parameters) throws Exception {
      this.redisCommandsContainer = MyRedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
    }

    @Override
    public void close() throws IOException {
      if (this.redisCommandsContainer != null) {
            this.redisCommandsContainer.close();
      }

    }
}MyRedisCommandsContainerBuilder:
package org.cube.flink;

/**
* @Author : Lawrence
* @Date : 2022/7/25 21:30
* @Description : 包含了password的RedisCommandsContainerBuilder
* @Version : 1.0.0
* @Modification_Record :
* VersionDate       Remark
* v1.0.0   2022/7/25First Create
*/
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
import org.apache.flink.streaming.connectors.redis.common.container.RedisClusterContainer;
import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
import org.apache.flink.streaming.connectors.redis.common.container.RedisContainer;
import org.apache.flink.util.Preconditions;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSentinelPool;

public class MyRedisCommandsContainerBuilder {
    public MyRedisCommandsContainerBuilder() {
    }

    public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase) {
      if (flinkJedisConfigBase instanceof FlinkJedisPoolConfig) {
            FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig)flinkJedisConfigBase;
            return build(flinkJedisPoolConfig);
      } else if (flinkJedisConfigBase instanceof MyFlinkJedisClusterConfig) {
            MyFlinkJedisClusterConfig flinkJedisClusterConfig = (MyFlinkJedisClusterConfig)flinkJedisConfigBase;
            return build(flinkJedisClusterConfig);
      } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
            FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig)flinkJedisConfigBase;
            return build(flinkJedisSentinelConfig);
      } else {
            throw new IllegalArgumentException("Jedis configuration not found");
      }
    }

    public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
      Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null");
      GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
      genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
      genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
      genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
      JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), jedisPoolConfig.getPort()
                , jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(), jedisPoolConfig.getDatabase());
      return new RedisContainer(jedisPool);
    }

    public static RedisCommandsContainer build(MyFlinkJedisClusterConfig jedisClusterConfig) {
      Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null");
      GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
      genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
      genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
      genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
      JedisCluster jedisCluster;
      if (null == jedisClusterConfig.getPassword()) {
            jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
                  jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
      } else
            {
            jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout()
                  , jedisClusterConfig.getSoTimeout(), jedisClusterConfig.getMaxRedirections()
                  , jedisClusterConfig.getPassword(), genericObjectPoolConfig);
      }
      return new RedisClusterContainer(jedisCluster);
    }

    public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
      Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
      GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
      genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
      genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
      genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
      JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName()
                , jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, jedisSentinelConfig.getConnectionTimeout()
                , jedisSentinelConfig.getSoTimeout(), jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
      return new RedisContainer(jedisSentinelPool);
    }
}燃鹅,在重写"MyRedisCommandsContainerBuilder"类时,你会惊奇地发现,jedisCluster 也不支持密码。
你可千万别惯性思维去重新jedisCluster ,
因为这回可真的是版本问题了。
所以这依然难不倒我,
只需要把redis.clients包升级到2.9以上版本即可:
                  redis.clients            jedis            2.9.1                <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
      </dependency>                                        redis.clients                  jedis                                    好了,到这里咱们终于大功告成了。
 
代码写完了,但是咱们却留下一个疑惑,
为什么这么简单的需求却没有jar包更新呢?
我只是想把Flink数据写到带密码的Redis集群里,这过分吗?
这并不过分,那这又是为啥呢?
我想可能是这样的:
首先,先想一个问题,在流计算中我们往Redis写的是什么数据?
通常是一些状态信息,中间结果。而Flink本身支持状态、缓存和广播机制,导致对Redis的需求下降了。
其次,大数据应用实际运行的环境通常是提交到内网的机器上进行的,大家知道大数据集群之间的主机是需要设置免验证登录的,单单给Redis设密码显得有一点点多余。
其三,Redis的密码机制据说是很弱鸡的,出于安全考虑,更多地是通过防火墙来限制端口,所以很多Redis集群处于管理方便并没有设置密码的。
 其四,出于人类懒惰的本性,发现RedisSink不支持密码后,最省事的方式,或许是放弃使用密码。
 
好了该写的写了,该想的也想了,差不多可以愉快地结束这一天了。
那么晚安了,咱们下期再肝。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Flink写入Redis集群 重写flink-connector-redis包