岚图N次方KOC项目复盘总结---记录踩坑日记
转载请标明出处:https://blog.csdn.net/men_ma/article/details/106847165.本文出自 不怕报错 就怕不报错的小猿猿 的博客
项目复盘
生产定时使命推送消息关照重复
题目形貌:
在测试环境中,每隔五分钟会实行一次新建使命的消息推送,每个使命会发送一条消息关照。然而,在生产环境中,同一条使命对于同一用户却收到了两条重复的消息推送。
排查原因:
[*]并发实行:生产环境中运行着两个容器,导致定时使命的并发实行,从而触发同一使命的重复推送。
解决方案:
[*]引入分布式锁:给定时使命加Redis实现分布式锁,确保同一时间只有一个使命在实行。
[*]日记记录:增加了定时使命日记记录,便于后续题目排查和监控。
代码示例:
/**
* 每五分钟执行一次触发微信模板消息通知
*
* @throws JsonProcessingException
* @throws IOException
*/
@Scheduled(cron = "0 0/5 * * * ?")
public void tenantIdNewTaskSend() {
String key = "tenantIdNewTaskSend";
saveSchedulerLog(SnowflakeIdWorker.newID(), "tenantIdNewTaskSend", "每五分钟执行一次触发微信模板消息通知开始, 锁的key: " + key);
String jobInfo = "定时任务-每五分钟执行一次触发微信模板消息通知".concat("Key:").concat(key);
log.info("{}:开始", jobInfo);
if (!getLock(key, 3)) {
log.info("{}: 未获取到锁,跳出执行", jobInfo);
return;
}
saveSchedulerLog(SnowflakeIdWorker.newID(), "tenantIdNewTaskSend", "每五分钟执行一次触发微信模板消息通知结束, tenantIds: " + JSON.toJSONString(tenantList));
}
//保存到定时任务日志表
protected void saveSchedulerLog(Long id, String type, String remark) {
busSchedulerLogsMapper.insert(
new BusSchedulerLogs().setId(id).setType(type).setRemark(remark).setCreateTime(new Date())
);
}
调用access_token次数超额
题目形貌:
当推送人数较多时,每个用户在发送微信模板消息时,都会频仍调用“获取 Access Token”接口。这导致微信公众号的接口调用次数超出限制,进而引发 access_token无效的错误,导致消息推送失败。
解决方案:
将accessToken存入Redis缓存中,每1小时更新一次微信access_token,每次调用微信模板消息发送接口时,都从Redis中取access_token,从而避免频仍请求微信接口。
代码示例:
[*]获取 Token 方法:
[*]使用 Redis 缓存来存储 access_token,并在需要时优先从缓存中获取。
[*]假如缓存不存在或已逾期,则调用微信接口获取新的 access_token。
/**
* 获取微信 access_token,先尝试从 Redis 缓存获取,如果缓存不存在或已过期,则调用微信接口获取
* @return access_token
*/
public String getAccessToken(String appId,String appSecret) {
String accessToken = redisTemplate.opsForValue().get(ACCESS_TOKEN_KEY);
log.info("获取缓存中的accesstoken:{}",accessToken);
if (ToolEmptyUtil.isEmpty(accessToken)) {
weiXinUtil.getJsapiTicket(appId,appSecret);
return redisTemplate.opsForValue().get(ACCESS_TOKEN_KEY);
}
return accessToken;
}
/**
* 获取微信 accessToken
*
* @return
*/
private String getAccessToken(String tenantId,String appID,String appSecret){
String accessToken = "";
log.info("获取微信缓存中AccessToken");
accessToken = redisTemplate.opsForValue().get(ACCESS_TOKEN_KEY+tenantId);
log.info("获取微信缓存中AccessToken,AccessToken:[{}]", accessToken);
if (StringUtils.isNotEmpty(accessToken)) {
return accessToken;
}
HttpClientUtil httpClientUtil = new HttpClientUtil();
Map<String,String> params = new HashMap<>();
params.put("grant_type",grantType);
params.put("appid",appID);
params.put("secret",appSecret);
JSONObject jsonObject = null;
try {
log.info("获取getAccessToken地址[{}]", accessTokenUrl);
HttpResponse httpResponse = httpClientUtil.doGet(accessTokenUrl, null, params);
// 获取返回数据并转为jsonObject
HttpEntity entity = httpResponse.getEntity();
jsonObject = JSONObject.parseObject(EntityUtils.toString(entity));
} catch (Exception e) {
log.error("获取微信accessToken失败,WeChatUtil.getAccessToken");
throw new ServiceException("获取微信accessToken失败,WeChatUtil.getAccessToken" + e.getMessage());
}
accessToken = jsonObject.getString("access_token");
boolean empty = StringUtils.isEmpty(accessToken);
if (empty){
String errCode = jsonObject.getString("errcode");
String errMsg = jsonObject.getString("errmsg");
log.error("获取微信accessToken失败,WeChatUtil.getAccessToken,请求错误代码:" + errCode + ",错误信息:" + errMsg);
throw new ServiceException("获取微信accessToken失败,WeChatUtil.getAccessToken,请求错误代码:" + errCode + ",错误信息:" + errMsg);
}
redisTemplate.opsForValue().set(ACCESS_TOKEN_KEY+tenantId, accessToken, ACCESS_TOKEN_EXPIRE_SECONDS, TimeUnit.SECONDS);
return accessToken;
}
模板消息推送内容长度超额导致发送失败
题目形貌:
所有模板消息推送中的 thing.DATA 参数内容字数凌驾限制,导致消息推送失败。
解决方案:
参数种别参数说明参数值限制说明实用范围thing.DATA事物20个以内字符可汉字、数字、字母或符号组合供姓名关键词、地址关键词、机构/组织名关键词选择(如:单元名、银行名、医院名、科室、班级)、品名关键词选择(如:药品名、股票名、课程名、科目名、岗位名) 判断工单标题内容长度>16个字符就显示…
代码示例:
// 判断长度并截取(超过16个字符就截取后面的显示...)
this.keyword1=keyword1.length() > 16 ? keyword1.substring(0, 16) + "..." : keyword1;
定时使命锁的时间导致下一次使命跳过了没跑到
题目形貌:
由于定时使命的实行间隔为五分钟,而分布式锁的时间也设置为五分钟,导致在下一次定时使命触发时,前一个使命仍然持有锁,从而造成某些定时使命未能实行。这种环境偶尔会导致使命未被触发消息推送,影响体系的正常运行。
排查原因:
经过排查,发现定时使命的分布式锁时间不能大于或等于定时使命的触发时间间隔。若锁的持有时间过长,便会导致后续的使命被锁住,从而无法实行。
解决方案:
将定时使命的分布式锁时间修改为小于定时使命的触发时间,由分布式锁的五分钟时间设置为3分钟或更短,以确保每次使命都能顺遂实行完毕后,锁可以及时释放。
代码示例:
@Scheduled(cron = "0 0/5 * * * ?")
public void tenantIdNewTaskSend() {
String key = "tenantIdNewTaskSend";
saveSchedulerLog(SnowflakeIdWorker.newID(), "tenantIdNewTaskSend", "每五分钟执行一次触发微信模板消息通知开始, 锁的key: " + key);
String jobInfo = "定时任务-每五分钟执行一次触发微信模板消息通知".concat("Key:").concat(key);
log.info("{}:开始", jobInfo);
if (!getLock(key, 3)) {
log.info("{}: 未获取到锁,跳出执行", jobInfo);
return;
}
//业务代码 todo
saveSchedulerLog(SnowflakeIdWorker.newID(), "tenantIdNewTaskSend", "每五分钟执行一次触发微信模板消息通知结束, tenantIds: " + JSON.toJSONString(tenantList));
}
模板消息关照推送速率慢
题目形貌:
当全员推送消息关照时,十几分钟才推送了3000+条,推送的速率很慢
排查原因:
向微信公众号模板消息推送接口发送请求时,http平常连接太慢了,没用连接池,每发送一条消息都请求一次微信模板推送接口,8000人则请求8000次
解决方案:
使用线程池,并行推送消息发送,以进步服从。通过使用线程池,可以同时处理多个发送请求,减少总的实行时间。
新增HTTP请求的工具类(HttpClientPoolUtil),提供了连接池的管理和HTTP请求的实行,使用PoolingHttpClientConnectionManager 来管理 HTTP 连接池,允许多个请求共享相同的连接,减少创建和关闭连接的开销,进步性能。
代码示例:
// 开启100个线程池
ExecutorService executor = Executors.newFixedThreadPool(100);
// 需要推送的人数
int size = template.size();
log.info("总需要推送的人数:{}",size);
for (int i = 0; i < size; i++) {
// 定义常量
int finalI = i = i;
//并行发送
executor.submit(() -> {
try {
// 获取返回的消息
String returnMsg = sendMessage(tenantId,template.get(finalI), wxMpConfig);
log.info("执行线程:" + Thread.currentThread().getName() + ",返回信息" + returnMsg);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
executor.shutdown();
/**
* 发送模板消息
* @return
* @throws Exception
*/
public String sendMessage(String tenantId,Template template, WxMpConfig wxMpConfig) throws Exception{
// 获取accessToken
String token = this.getToken(tenantId,wxMpConfig.getAppId(), wxMpConfig.getAppSecret());
// 获取要发送的消息
String message = template.buildMessage(wxMpConfig);
// 执行发送
String responseContent = HttpClientPoolUtil.execute(this.templateUrl + token, message);
return responseContent;
}
消息推送日记记录不完整
题目形貌:
在消息推送的过程中,由于缺乏有效的日记记录,导致在排查消息关照未发送成功原因时面对困难。未能及时记录推送日记,使得在生产环境中追踪题目变得复杂和耗时,意识到日记记录的重要性。
解决方案:
新建微信消息推送日记表,在每次调用微信模板消息推送接口时,将微信返回的效果和相关信息记录到日记表中。这将有助于后续的错误排查和性能分析。
-- 微信推送日志表结构
CREATE TABLE `bus_wechat_message_send_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`open_id` varchar(255) DEFAULT NULL COMMENT '用户openid',
`message_title` varchar(255) DEFAULT NULL COMMENT '推送标题(工单)',
`template_id` varchar(255) DEFAULT NULL COMMENT '推送模板ID',
`message_data_json` varchar(255) DEFAULT NULL COMMENT '推送内容请求参数JSON',
`wechat_send_thread_name` varchar(32) DEFAULT NULL COMMENT '推送线程名称',
`wechat_send_result_json` varchar(255) DEFAULT NULL COMMENT '微信推送响应结果JSON',
`push_time` datetime DEFAULT NULL COMMENT '推送时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '修改时间',
`tenant_id` varchar(64) DEFAULT NULL COMMENT '租户id',
`task_notice_id` bigint DEFAULT NULL COMMENT '任务/通知id',
`result_errcode` bigint DEFAULT NULL COMMENT '微信返回结果码',
PRIMARY KEY (`id`),
KEY `openid_createTime_index` (`open_id`,`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=147974 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='微信消息推送日志表';
代码示例:
public void beginMessageTask(List<? extends Template> template) throws Exception{
log.info("*************begin task**************");
//1.获取当前登陆用户租户ID
String tenantId = TenantContext.getTenantId();
BaseTenant baseTenant = baseTenantService.getOne(new LambdaQueryWrapper<BaseTenant>().eq(BaseTenant::getTenantId, tenantId));
WxMpConfig wxMpConfig = baseTenant.getWxMpConfig();
//保存日志信息
List<BusWechatMessageSendLog> wechatMessageSendLogList=new ArrayList<>();
// 开启100个线程池
ExecutorService executor = Executors.newFixedThreadPool(100);
// 需要推送的人数
int size = template.size();
log.info("总需要推送的人数:{}",size);
for (int i = 0; i < size; i++) {
// 定义常量
int finalI = i = i;
executor.submit(() -> {
try {
// 获取返回的消息
String returnMsg = sendMessage(tenantId,template.get(finalI), wxMpConfig);
log.info("执行线程:" + Thread.currentThread().getName() + ",返回信息" + returnMsg);
BusWechatMessageSendLog wechatMessageSendLog = new BusWechatMessageSendLog();
wechatMessageSendLog.setTenantId(tenantId);
wechatMessageSendLog.setWechatSendResultJson(returnMsg);
wechatMessageSendLog.setCreateTime(new Date());
wechatMessageSendLog.setOpenId(template.get(finalI).getOpenId());
wechatMessageSendLog.setTemplateId(template.get(finalI).getTemplateName());
wechatMessageSendLog.setWechatSendThreadName(Thread.currentThread().getName());
wechatMessageSendLog.setMessageTitle(template.get(finalI).getMessageTitle());
wechatMessageSendLog.setTaskNoticeId(template.get(finalI).getTaskNoticeId());
// 获取 errcode 的值
JSONObject jsonObject = JSON.parseObject(returnMsg);
Long errcode = jsonObject.getLongValue("errcode");
wechatMessageSendLog.setResultErrcode(errcode);
wechatMessageSendLogService.save(wechatMessageSendLog);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
executor.shutdown();
}
服务宕机/网络中断导致推送消息中断
题目形貌:
在推送某条使命消息给所有人群的过程中,部分用户成功吸收了工单提示关照,而部分用户却没有收到。
排查原因:
经过排查,发现该环境是由于定时使命在实行工单提示消息推送时,服务器宕机/网络中断/重启导致推送中断,未能完成对所有目的用户的消息推送关照。
解决方案:
流程:
[*]创建使命:
[*]保存使命时,异步获取目的发送人群,并将数据持久化到数据库。
[*]将原 templateList 写入数据表 wx_message,设置发送状态为“待发送”。
[*]留意:需思量发送人群的实时性题目,例如在使命发送消息时,有人解绑或新增用户。
[*]定时使命:
[*]每五分钟实行一次,从 wx_message 表中获取待发送使命人员。
[*]限定获取条数及使命发送时间,未到发送时间的使命不处理,以避免一次加载过多数据(例如,每1万条数据耗时加5分钟)。
[*]一旦获取到数据,立即将状态更新为“发送中”。
[*]处剃头送:
[*]每处理一个人时,确保在 finally 块中记录该人员的发送状态(成功或失败),并解锁。
定时使命与分布式锁的时间差:
[*]副本1在30分钟时获取到锁并开始实行使命,预计一连8分钟。
[*]33分钟时锁失效。
[*]副本2在35分钟时获取到锁并继续实行使命,以避免出现重复发送的环境。
[*]finally 在服务器挂掉了是否还能见效,需要新增时间判断
留意事项:
[*]避免漏发和多发:确保每个消息正确发送。
[*]避免发送错误租户:确保消息发送到正确的目的人群。
[*]避免多个定时使命同时实行:使用分布式锁来控制使命实行。
补充:
[*]新增一个接口,支持根据参数进行手动触发推送。
代码示例(todo):
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]