ToB企服应用市场:ToB评测及商务社交产业平台

标题: Redis+Hbase+RocketMQ 实际使用问题案例分享 [打印本页]

作者: 卖不甜枣    时间: 2023-2-3 14:40
标题: Redis+Hbase+RocketMQ 实际使用问题案例分享
需求

简单画个流程图就是:

分析及确定方案

Redis
Hbase
RocketMQ
实现

配置
  1. #server configuration
  2. server.port=8896
  3. #log config
  4. logging.file.path=./logs
  5. #redis-standalone
  6. redis.standalone.host=
  7. redis.standalone.port=6379
  8. redis.standalone.password=
  9. redis.standalone.enable=true
  10. #redis-cluster
  11. redis.cluster.nodes=
  12. redis.cluster.password=
  13. redis.cluster.timeout=30000
  14. redis.cluster.enable=false
  15. # Zookeeper 集群地址,逗号分隔
  16. hbase.zookeeper.quorum=
  17. # Zookeeper 端口
  18. hbase.zookeeper.property.clientPort=2181
  19. # 消息目的rocketmq地址
  20. rocketmq.server.host=
  21. # 发送消息间隔时间,防止发送过快mq受不了
  22. rocketmq.send.interval.millisec=10
  23. # 每次从redis读取数据量限制。
  24. data.access.redisDataSize=100
  25. # 失败数据重试次数,超过的直接丢弃
  26. data.access.retryNum=10
  27. # 需要接入的表,需要发送到rocketmq的topic和在redis中的key的映射。xxx.xxx.xxx[topic]=redisKey
  28. data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back
  29. data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back
复制代码
部分代码
获取配置,其余的直接@Value("${}"):
  1. @Setter
  2. @Getter
  3. @Configuration
  4. @ConfigurationProperties(prefix = "data.access")
  5. public class AccessRedisMqConfig {
  6.     /**
  7.      * key:topic; value:redis的key
  8.      */
  9.     private Map<String, String> topicKeyMap = new HashMap<>();
  10.     /**
  11.      * 一次从redis中读取数据量限制
  12.      */
  13.     private long redisDataSize = 50;
  14.     /**
  15.      * 失败数据重试次数
  16.      */
  17.     private int retryNum = 10;
  18. }
复制代码
开启接入:
  1. @Component
  2. public class AdapterRunner implements ApplicationRunner {
  3.     @Resource
  4.     private DataAccessService dataAccessService;
  5.     @Override
  6.     public void run(ApplicationArguments args) {
  7.         System.out.println("项目已启动,开始接入数据到RocketMQ……");
  8.         dataAccessService.accessData2Mq();
  9.     }
  10. }
复制代码
其他代码其实也在分析里了。
踩坑

  1. org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout
  2.         at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525)
  3.         at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523)
  4.         at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610)
  5.         at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167)
  6.         at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572)
  7.         at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
  8.         at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319)
  9.         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  10.         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  11.         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  12.         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  13.         at java.base/java.lang.Thread.run(Thread.java:834)
复制代码
上面分析也说了,注意发送速度,有多少资源就接入多快。还有注意相关三个端口是否开放。
总结

程序很简单,主要涉及方案的是,获取redis的list数据时,是考虑效率,及加入重试策略,保证数据不丢失等。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4