麻了,代码改成多线程,服务直接挂了

打印 上一主题 下一主题

主题 958|帖子 958|积分 2874

媒介

许多时候,我们为了提拔接口的性能,会把之前单线程同步执行的代码,改成多线程异步执行。
比如:查询用户信息接口,需要返回用户根本信息、积分信息、成长值信息,而用户、积分和成长值,需要调用不同的接口获取数据。
如果查询用户信息接口,同步调用三个接口获取数据,会非常耗时。
这就非常有必要把三个接口调用,改成异步调用,末了汇总结果。
再比如:注册用户接口,该接口重要包罗:写用户表,分配权限,配置用户导航页,发通知消息等功能。
该用户注册接口包罗的业务逻辑比较多,如果在接口中同步执行这些代码,该接口响应时间会非常慢。
这时就需要把业务逻辑梳理一下,划分:核心逻辑和非核心逻辑。这个例子中的核心逻辑是:写用户表和分配权限,非核心逻辑是:配置用户导航页和发通知消息。
显然核心逻辑必须在接口中同步执行,而非核心逻辑可以多线程异步执行。
等等。
需要利用多线程的业务场景太多了,利用多线程异步执行的好处不言而喻。
但我要说的是,如果多线程没有利用好,它也会给我们带来许多意想不到的问题,不信今后继承看。
本日跟大家一起聊聊,代码改成多线程调用之后,带来的9大问题。
1.获取不到返回值

如果你通过直接继承Thread类,大概实现Runnable接口的方式去创建线程。
那么,恭喜你,你将没法获取该线程方法的返回值。
利用线程的场景有两种:

  • 不需要关注线程方法的返回值。
  • 需要关注线程方法的返回值。
大部分业务场景是不需要关注线程方法返回值的,但如果我们有些业务需要关注线程方法的返回值该怎么处置处罚呢?
查询用户信息接口,需要返回用户根本信息、积分信息、成长值信息,而用户、积分和成长值,需要调用不同的接口获取数据。
如下图所示:

在Java8之前可以通过实现Callable接口,获取线程返回结果。
Java8以后通过CompleteFuture类实现该功能。我们这里以CompleteFuture为例:
  1. public UserInfo getUserInfo(Long id) throws InterruptedException, ExecutionException {
  2.     final UserInfo userInfo = new UserInfo();
  3.     CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
  4.         getRemoteUserAndFill(id, userInfo);
  5.         return Boolean.TRUE;
  6.     }, executor);
  7.     CompletableFuture bonusFuture = CompletableFuture.supplyAsync(() -> {
  8.         getRemoteBonusAndFill(id, userInfo);
  9.         return Boolean.TRUE;
  10.     }, executor);
  11.     CompletableFuture growthFuture = CompletableFuture.supplyAsync(() -> {
  12.         getRemoteGrowthAndFill(id, userInfo);
  13.         return Boolean.TRUE;
  14.     }, executor);
  15.     CompletableFuture.allOf(userFuture, bonusFuture, growthFuture).join();
  16.     userFuture.get();
  17.     bonusFuture.get();
  18.     growthFuture.get();
  19.     return userInfo;
  20. }
复制代码
  温馨提示一下,这两种方式别忘了利用线程池。示例中我用到了executor,表示自定义的线程池,为了防止高并发场景下,出现线程过多的问题。
  此外,Fork/join框架也提供了执行任务并返回结果的能力。
2.数据丢失

我们还是以注册用户接口为例,该接口重要包罗:写用户表,分配权限,配置用户导航页,发通知消息等功能。
此中:写用户表和分配权限功能,需要在一个事务中同步执行。而剩余的配置用户导航页和发通知消息功能,利用多线程异步执行。
表面上看起来没问题。
但如果前面的写用户表和分配权限功能成功了,用户注册接口就直接返回成功了。
但如果后面异步执行的配置用户导航页,或发通知消息功能失败了,怎么办?
如下图所示:

该接口前面明显已经提示用户成功了,但结果后面又有一部分功能在多线程异步执行中失败了。
这时该如何处置处罚呢?
没错,你可以做失败重试。
但如果重试了肯定的次数,还是没有成功,这条哀求数据该如何处置处罚呢?如果不做任何处置处罚,该数据是不是就丢掉了?
为了防止数据丢失,可以用如下方案:

  • 利用mq异步处置处罚。在分配权限之后,发送一条mq消息,到mq服务器,然后在mq的消耗者中利用多线程,去配置用户导航页和发通知消息。如果mq消耗者中处置处罚失败了,可以自己重试。
  • 利用job异步处置处罚。在分配权限之后,往任务表中写一条数据。然后有个job定时扫描该表,然后配置用户导航页和发通知消息。如果job处置处罚某条数据失败了,可以在表中记载一个重试次数,然后不停重试。但该方案有个缺点,就是实时性可能不太高。
3.次序问题

如果你利用了多线程,就必须担当一个非常现实的问题,即次序问题。
如果之前代码的执行次序是:a,b,c,改成多线程执行之后,代码的执行次序可能变成了:a,c,b。(这个跟cpu调理算法有关)
例如:
  1. public static void main(String[] args) {
  2.     Thread thread1 = new Thread(() -> System.out.println("a"));
  3.     Thread thread2 = new Thread(() -> System.out.println("b"));
  4.     Thread thread3 = new Thread(() -> System.out.println("c"));
  5.     thread1.start();
  6.     thread2.start();
  7.     thread3.start();
  8. }
复制代码
执行结果:
  1. a
  2. c
  3. b
复制代码
那么,来自灵魂的一问:如何包管线程的次序呢?
即线程启动的次序是:a,b,c,执行的次序也是:a,b,c。
如下图所示:

3.1 join

Thread类的join方法它会让主线程等待子线程运行竣事后,才气继承运行。
列如:
  1. public static void main(String[] args) throws InterruptedException {
  2.     Thread thread1 = new Thread(() -> System.out.println("a"));
  3.     Thread thread2 = new Thread(() -> System.out.println("b"));
  4.     Thread thread3 = new Thread(() -> System.out.println("c"));
  5.     thread1.start();
  6.     thread1.join();
  7.     thread2.start();
  8.     thread2.join();
  9.     thread3.start();
  10. }
复制代码
执行结果永久都是:
  1. a
  2. b
  3. c
复制代码
3.2 newSingleThreadExecutor

我们可以利用JDK自带的Excutors类的newSingleThreadExecutor方法,创建一个单线程的线程池。
例如:
  1.  public static void main(String[] args)  {
  2.     ExecutorService executorService = Executors.newSingleThreadExecutor();
  3.     Thread thread1 = new Thread(() -> System.out.println("a"));
  4.     Thread thread2 = new Thread(() -> System.out.println("b"));
  5.     Thread thread3 = new Thread(() -> System.out.println("c"));
  6.     executorService.submit(thread1);
  7.     executorService.submit(thread2);
  8.     executorService.submit(thread3);
  9.     executorService.shutdown();
  10. }
复制代码
执行结果永久都是:
  1. a
  2. b
  3. c
复制代码
利用Excutors类的newSingleThreadExecutor方法创建的单线程的线程池,利用了LinkedBlockingQueue作为队列,而此队列按 FIFO(先进先出)排序元素。
添加到队列的次序是a,b,c,则执行的次序也是a,b,c。
3.3 CountDownLatch

CountDownLatch是一个同步工具类,它允许一个或多个线程不停等待,直到其他线程执行完后再执行。
例如:
  1. public class ThreadTest {
  2.     public static void main(String[] args) throws InterruptedException {
  3.         CountDownLatch latch1 = new CountDownLatch(0);
  4.         CountDownLatch latch2 = new CountDownLatch(1);
  5.         CountDownLatch latch3 = new CountDownLatch(1);
  6.         Thread thread1 = new Thread(new TestRunnable(latch1, latch2, "a"));
  7.         Thread thread2 = new Thread(new TestRunnable(latch2, latch3, "b"));
  8.         Thread thread3 = new Thread(new TestRunnable(latch3, latch3, "c"));
  9.         thread1.start();
  10.         thread2.start();
  11.         thread3.start();
  12.     }
  13. }
  14. class TestRunnable implements Runnable {
  15.     private CountDownLatch latch1;
  16.     private CountDownLatch latch2;
  17.     private String message;
  18.     TestRunnable(CountDownLatch latch1, CountDownLatch latch2, String message) {
  19.         this.latch1 = latch1;
  20.         this.latch2 = latch2;
  21.         this.message = message;
  22.     }
  23.     @Override
  24.     public void run() {
  25.         try {
  26.             latch1.await();
  27.             System.out.println(message);
  28.         } catch (InterruptedException e) {
  29.             e.printStackTrace();
  30.         }
  31.         latch2.countDown();
  32.     }
  33. }
复制代码
执行结果永久都是:
  1. a
  2. b
  3. c
复制代码
此外,利用CompletableFuture的thenRun方法,也能多线程的执行次序,在这里就不一一先容了。
4.线程安全问题

既然利用了线程,陪同而来的还会有线程安全问题。
如果现在有这样一个需求:用多线程执行查询方法,然后把执行结果添加到一个list集合中。
代码如下:
  1. List<User> list = Lists.newArrayList();
  2.  dataList.stream()
  3.      .map(data -> CompletableFuture
  4.           .supplyAsync(() -> query(list, data), asyncExecutor)
  5.          ));
  6. CompletableFuture.allOf(futureArray).join();
复制代码
利用CompletableFuture异步多线程执行query方法:
  1. public void query(List<User> list, UserEntity condition) {
  2.    User user = queryByCondition(condition);
  3.    if(Objects.isNull(user)) {
  4.       return;
  5.    }
  6.    list.add(user);
  7.    UserExtend userExtend = queryByOther(condition);
  8.    if(Objects.nonNull(userExtend)) {
  9.       user.setExtend(userExtend.getInfo());
  10.    }
  11. }
复制代码
在query方法中,将获取的查询结果添加到list集合中。
结果list会出现线程安全问题,偶然候会少数据,固然也不肯定是必现的。
这是由于ArrayList是非线程安全的,没有利用synchronized等关键字修饰。
如何办理这个问题呢?
答:利用CopyOnWriteArrayList集合,代替平凡的ArrayList集合,CopyOnWriteArrayList是一个线程安全的机会。
只需一行小小的改动即可:
  1. List<User> list Lists.newCopyOnWriteArrayList();
复制代码
  温馨的提示一下,这里创建集合的方式,用了google的collect包。
  5.ThreadLocal获取数据异常

我们都知道JDK为相识决线程安全问题,提供了一种用空间换时间的新思路:ThreadLocal。
它的核心思想是:共享变量在每个线程都有一个副本,每个线程操作的都是自己的副本,对另外的线程没有影响。
例如:
  1. @Service
  2. public class ThreadLocalService {
  3.     private static final ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
  4.     public void add() {
  5.         threadLocal.set(1);
  6.         doSamething();
  7.         Integer integer = threadLocal.get();
  8.     }
  9. }
复制代码
ThreadLocal在平凡中线程中,简直能够获取正确的数据。
但在真实的业务场景中,一般很少用单独的线程,绝大多数,都是用的线程池。
那么,在线程池中如何获取ThreadLocal对象生成的数据呢?
如果直接利用平凡ThreadLocal,显然是获取不到正确数据的。
我们先试试InheritableThreadLocal,具体代码如下:
  1. private static void fun1() {
  2.     InheritableThreadLocal<Integer> threadLocal = new InheritableThreadLocal<>();
  3.     threadLocal.set(6);
  4.     System.out.println("父线程获取数据:" + threadLocal.get());
  5.     ExecutorService executorService = Executors.newSingleThreadExecutor();
  6.     threadLocal.set(6);
  7.     executorService.submit(() -> {
  8.         System.out.println("第一次从线程池中获取数据:" + threadLocal.get());
  9.     });
  10.     threadLocal.set(7);
  11.     executorService.submit(() -> {
  12.         System.out.println("第二次从线程池中获取数据:" + threadLocal.get());
  13.     });
  14. }
复制代码
执行结果:
  1. 父线程获取数据:6
  2. 第一次从线程池中获取数据:6
  3. 第二次从线程池中获取数据:6
复制代码
由于这个例子中利用了单例线程池,固定线程数是1。
第一次submit任务的时候,该线程池会自动创建一个线程。由于利用了InheritableThreadLocal,所以创建线程时,会调用它的init方法,将父线程中的inheritableThreadLocals数据复制到子线程中。所以我们看到,在主线程中将数据设置成6,第一次从线程池中获取了正确的数据6。
之后,在主线程中又将数据改成7,但在第二次从线程池中获取数据却依然是6。
由于第二次submit任务的时候,线程池中已经有一个线程了,就直接拿过来复用,不会再重新创建线程了。所以不会再调用线程的init方法,所以第二次其实没有获取到最新的数据7,还是获取的老数据6。
那么,这该怎么办呢?
答:利用TransmittableThreadLocal,它并非JDK自带的类,而是阿里巴巴开源jar包中的类。
可以通过如下pom文件引入该jar包:
  1. <dependency>
  2.    <groupId>com.alibaba</groupId>
  3.    <artifactId>transmittable-thread-local</artifactId>
  4.    <version>2.11.0</version>
  5.    <scope>compile</scope>
  6. </dependency>
复制代码
代码调整如下:
  1. private static void fun2() throws Exception {
  2.     TransmittableThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>();
  3.     threadLocal.set(6);
  4.     System.out.println("父线程获取数据:" + threadLocal.get());
  5.     ExecutorService ttlExecutorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(1));
  6.     threadLocal.set(6);
  7.     ttlExecutorService.submit(() -> {
  8.         System.out.println("第一次从线程池中获取数据:" + threadLocal.get());
  9.     });
  10.     threadLocal.set(7);
  11.     ttlExecutorService.submit(() -> {
  12.         System.out.println("第二次从线程池中获取数据:" + threadLocal.get());
  13.     });
  14. }
复制代码
执行结果:
  1. 父线程获取数据:6
  2. 第一次从线程池中获取数据:6
  3. 第二次从线程池中获取数据:7
复制代码
我们看到,利用了TransmittableThreadLocal之后,第二次从线程中也能正确获取最新的数据7了。
nice。
如果你仔细观察这个例子,你可能会发现,代码中除了利用TransmittableThreadLocal类之外,还利用了TtlExecutors.getTtlExecutorService方法,去创建ExecutorService对象。
这是非常重要的地方,如果没有这一步,TransmittableThreadLocal在线程池中共享数据将不会起作用。
创建ExecutorService对象,底层的submit方法会TtlRunnable或TtlCallable对象。
以TtlRunnable类为例,它实现了Runnable接口,同时还实现了它的run方法:
  1. public void run() {
  2.     Map<TransmittableThreadLocal<?>, Object> copied = (Map)this.copiedRef.get();
  3.     if (copied != null && (!this.releaseTtlValueReferenceAfterRun || this.copiedRef.compareAndSet(copied, (Object)null))) {
  4.         Map backup = TransmittableThreadLocal.backupAndSetToCopied(copied);
  5.         try {
  6.             this.runnable.run();
  7.         } finally {
  8.             TransmittableThreadLocal.restoreBackup(backup);
  9.         }
  10.     } else {
  11.         throw new IllegalStateException("TTL value reference is released after run!");
  12.     }
  13. }
复制代码
这段代码的重要逻辑如下:

  • 把当时的ThreadLocal做个备份,然后将父类的ThreadLocal拷贝过来。
  • 执行真正的run方法,可以获取到父类最新的ThreadLocal数据。
  • 从备份的数据中,恢复当时的ThreadLocal数据。
如果你想进一步相识ThreadLocal的工作原理,可以看看我的另一篇文章《ThreadLocal夺命11连问》
6.OOM问题

众所周知,利用多线程可以提拔代码执行效率,但也不是绝对的。
对于一些耗时的操作,利用多线程,确实可以提拔代码执行效率。
但线程不是创建越多越好,如果线程创建多了,也可能会导致OOM异常。
例如:
  1. Caused by: 
  2. java.lang.OutOfMemoryError: unable to create new native thread
复制代码
在JVM中创建一个线程,默认需要占用1M的内存空间。
如果创建了过多的线程,一定会导致内存空间不足,从而出现OOM异常。
除此之外,如果利用线程池的话,特别是利用固定巨细线程池,即利用Executors.newFixedThreadPool方法创建的线程池。
该线程池的核心线程数和最大线程数是一样的,是一个固定值,而存放消息的队列是LinkedBlockingQueue。
该队列的最大容量是Integer.MAX_VALUE,也就是说如果利用固定巨细线程池,存放了太多的任务,有可能也会导致OOM异常。
  1. java.lang.OutOfMemeryError:Java heap space
复制代码
7.CPU利用率飙高

不知道你有没有做过excel数据导入功能,需要将一批excel的数据导入到系统中。
每条数据都有些业务逻辑,如果单线程导入所有的数据,导入效率会非常低。
于是改成了多线程导入。
如果excel中有大量的数据,很可能会出现CPU利用率飙高的问题。
我们都知道,如果代码出现死循环,cpu利用率会飚的许多高。由于代码不停在某个线程中循环,没法切换到其他线程,cpu不停被占用着,所以会导致cpu利用率不停高居不下。
而多线程导入大量的数据,虽说没有死循环代码,但由于多个线程不停在不停的处置处罚数据,导致占用了cpu很长的时间。
也会出现cpu利用率很高的问题。
那么,如何办理这个问题呢?
答:利用Thread.sleep休眠一下。
在线程中处置处罚完一条数据,休眠10毫秒。
固然CPU利用率飙高的缘故原由许多,多线程处置处罚数据和死循环只是此中两种,还有比如:频仍GC、正则匹配、频仍序列化和反序列化等。
后面我会写一篇先容CPU利用率飙高的缘故原由的专题文章,感爱好的小伙伴,可以关注一下我后续的文章。
8.事务问题

在实际项目开发中,多线程的利用场景还是挺多的。如果spring事务用在多线程场景中,会有问题吗?
例如:
  1. @Slf4j
  2. @Service
  3. public class UserService {
  4.     @Autowired
  5.     private UserMapper userMapper;
  6.     @Autowired
  7.     private RoleService roleService;
  8.     @Transactional
  9.     public void add(UserModel userModel) throws Exception {
  10.         userMapper.insertUser(userModel);
  11.         new Thread(() -> {
  12.             roleService.doOtherThing();
  13.         }).start();
  14.     }
  15. }
  16. @Service
  17. public class RoleService {
  18.     @Transactional
  19.     public void doOtherThing() {
  20.         System.out.println("保存role表数据");
  21.     }
  22. }
复制代码
从上面的例子中,我们可以看到事务方法add中,调用了事务方法doOtherThing,但是事务方法doOtherThing是在另外一个线程中调用的。
这样会导致两个方法不在同一个线程中,获取到的数据库连接不一样,从而是两个不同的事务。如果想doOtherThing方法中抛了异常,add方法也回滚是不可能的。
如果看过spring事务源码的朋侪,可能会知道spring的事务是通过数据库连接来实现的。当火线程中保存了一个map,key是数据源,value是数据库连接。
  1. private static final ThreadLocal<Map<Object, Object>> resources =
  2.   new NamedThreadLocal<>("Transactional resources");
复制代码
我们说的同一个事务,其实是指同一个数据库连接,只有拥有同一个数据库连接才气同时提交和回滚。如果在不同的线程,拿到的数据库连接肯定是不一样的,所以是不同的事务。
   所以不要在事务中开启另外的线程,去处置处罚业务逻辑,这样会导致事务失效。
  9.导致服务挂掉

利用多线程会导致服务挂掉,这不是危言耸听,而是确有其事。
假设现在有这样一种业务场景:在mq的消耗者中需要调用订单查询接口,查到数据之后,写入业务表中。
原来是没啥问题的。
突然有一天,mq生产者跑了一个批量数据处置处罚的job,导致mq服务器上堆积了大量的消息。
此时,mq消耗者的处置处罚速度,远远跟不上mq消息的生产速度,导致的结果是出现了大量的消息堆积,对用户有很大的影响。
为相识决这个问题,mq消耗者改成多线程处置处罚,直接利用了线程池,并且最大线程数配置成了20。
这样调整之后,消息堆积问题确实得到相识决。
但带来了另外一个更严肃的问题:订单查询接口并发量太大了,有点扛不住压力,导致部分节点的服务直接挂掉。

为相识决问题,不得不暂时加服务节点。
   在mq的消耗者中利用多线程,调用接口时,肯定要评估好接口能够承受的最大访问量,防止由于压力过大,而导致服务挂掉的问题。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

八卦阵

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表