慢吞云雾缓吐愁 发表于 2025-1-1 23:38:16

04、JUC并发编程之:简单概述(四)

JUC并发编程之:简单概述(四)

##本章内容:
无锁并发--乐观锁(非阻塞)

·CAS与volatile
·原子整数
·原子引用
·原子数组
·字段更新器
·原子累加器
·Unsafe
一、CAS与volatile

1、掩护共享资源

·有一个账户,有1万元,现在有1000个线程,每个线程每次取10元,最后会剩下0元

##错误实现 与加锁实现
@Slf4j
public class CasAndVolatile01 {

    public static void main(String[] args) {

      //unsafe实现
      Account account1 = new AccountUnsafe(10000);
      Account.demo(account1);

      //加锁实现
      Account account2 = new AccountLock(10000);
      Account.demo(account2);


    }
}

//加锁实现
class AccountLock implements Account{

    Integer balance;

    public AccountLock(Integer balance) {
      this.balance = balance;
    }

    @Override
    public Integer getBalance() {
      synchronized (this){
            return this.balance;
      }
    }

    @Override
    public void withDraw(Integer amount) {
      synchronized (this){
            this.balance -= amount;
      }
    }
}

//错误实现
class AccountUnsafe implements Account{

    Integer balance;

    public AccountUnsafe(Integer balance) {
      this.balance = balance;
    }

    @Override
    public Integer getBalance() {
      return this.balance;
    }

    @Override
    public void withDraw(Integer amount) {
      this.balance -= amount;
    }
}

interface Account{

    //获取余额
    Integer getBalance();

    //取款
    void withDraw(Integer amount);

    //方法内启动1000个线程,每个线程-10元
    //如果总额为10000,那么余额将会为0
    static void demo(Account account){
      List<Thread> ts = new ArrayList<>();
      for(int i=0;i<1000;i++){
            ts.add(new Thread(()->{
                account.withDraw(10);
            }));
      }
      //计算起始时间
      long start = System.nanoTime();
      ts.forEach(Thread::start);
      ts.forEach(t->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      });
      long end = System.nanoTime();
      System.out.println(account.getBalance()
      +"cost : "+(end-start)/1000_000+"ms");

    }
}
##结果:
310cost : 80ms##错误实现
0cost : 69ms##加锁实现

##无锁实现
//无锁实现
class AccountCas implements Account{

    AtomicInteger balance;

    public AccountCas(Integer balance) {
      this.balance = new AtomicInteger(balance);
    }

    @Override
    public Integer getBalance() {
      return balance.get(); //底层是volatile
    }

    @Override
    public void withDraw(Integer amount) {
      while(true){
            //余额最新值
            Integer prev = balance.get();
            //修改后的余额
            Integer next = prev - amount;
            //同步到主存--比较并设置
            if(balance.compareAndSet(prev,next)){
                break;
            }
      }
    }
}
##结果:
160cost : 85ms ##错误实现
0cost : 73ms   ##加锁实现
0cost : 65ms   ##无锁实现
2、CAS与volatile

CAS:
·上面AtomicInteger的解决方法,内部并没有使用锁来保护共享变量的线程安全,它的实现原理:

@Override
public void withDraw(Integer amount) {
    while(true){
      //余额最新值
      Integer prev = balance.get();
      //修改后的余额
      Integer next = prev - amount;
      //同步到主存---比较并设置
      if(balance.compareAndSet(prev,next)){
              break;
      }
    }
}

其中compareAndSet,它的简称就是CAS(也称 compare and swap),它必须是原子操作
https://i-blog.csdnimg.cn/direct/1c3cc9fd7fef47f4a5000c48a96933d6.png#pic_center
·CAS底层是lock cmpxchg指令(X86架构),在单个CPU和多核CPU下都能够保证【比较-交换】的原子


·在多核状态下,某个执行到带lock的指令时,CPU会让总线锁住,当这个核把此指令执行完毕,再开启
总线,这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。

【CAS会在修改前判断 自己共享变量修改之前读到的数据和当前的数据是否一致,如果不一致则返回
·false,重新修改,如果一致则返回true修改成功】
volatile:
·获取共享变量时,为了保证该变量的可见性,需要使用volatile修饰。

·它可以用来修饰成员变量和静态成员变量,它可以避免线程从自己的工作缓存中查找变量的值,必须到
主存中获取他的值,线程操作volatile变量都是直接操作主存。即一个线程对volatile变量的修改,
对另一个线程可见。

##注意:
volatile仅保证了共享变量的可见性,让其他线程能够看到最新值,但不能解决指令交错问题
(不能保证原子性)

·【CAS必须借助volatile才能读取到共享变量的最新值来实现【比较并交换】的效果】

##AtomicInteger部分源码:

public class AtomicInteger{
private volatile int value;
}
3、CAS效率分析

##为什么无锁效率高?

·无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而synchronized会让线程在没有获得
锁的时候,发生上下文切换,进入阻塞
(打个比方:线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车减
速,熄火,等被唤醒又得重新打火、启动、加速恢复到高速运行,代价比较大)

·但无锁情况下因为线程要保持运行,需要额外CPU的支持,CPU在这里就好比高速跑道,没有额外的跑
道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,
还是会导致上下文切换。
4、CAS特点

·CAS结合volatile可以实现无锁并发,适用于线程数少,多核CPU的场景下:

>CAS是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃点亏
再重试呗

>synchronized是基于悲观锁的思想:最悲观的估计,得防着其他线程来修改共享变量,我上了锁你们
都别想改,我改完了解开锁,你们才有机会

>CAS体现的是无锁并发,无阻塞并发:
>>因为没有使用synchronized,所以县城不会陷入阻塞,这是效率提升的因素之一
>>但如果竞争激烈,可以想到重试必然发生,反而会影响效率
二、原子整数

##JUC并发包提供了一些供CAS实现工具类:
·AtomicBoolean
·AtomicInteger
·AtomicLong

##ActomicInteger常用方法:
@Slf4j
public class ActomicIntegerTest {
    public static void main(String[] args) {
      AtomicInteger i = new AtomicInteger(0);

      System.out.println(i.incrementAndGet());//++i
      System.out.println(i.getAndIncrement());//i++

      System.out.println(i.addAndGet(5));//先+5然后get
      System.out.println(i.getAndAdd(5));//先get然后+5

      System.out.println(i.updateAndGet(x->x*5));//先乘以5后get
      System.out.println(i.getAndUpdate(x->x*5));//先get然后乘以5

      System.out.println(i.get());
    }
}
updateAndGet( )底层源码:
public final int updateAndGet(IntUnaryOperator updateFunction) {
    int prev, next;
    do {
      prev = get();
      next = updateFunction.applyAsInt(prev);
    } while (!compareAndSet(prev, next));
    return next;
}
三、原子引用

##由于我们的共享变量并不一定都是基本数据类型,比如说BigDecimal,我们就可以使用原子引用
保证该共享变量的线程安全

·AtomicReference
·AtomicMarkableReference
·AtomicStampedReference有版本号的
3.1、AtomicReference

public class CasAndVolatile02 {

    public static void main(String[] args) {
      DecimalAccount account = new BigDecimalAccountCas(new BigDecimal("10000"));
      DecimalAccount.demo(account);
    }
}

class BigDecimalAccountCas implements DecimalAccount{

    private AtomicReference<BigDecimal> balance;

    public BigDecimalAccountCas(BigDecimal balance) {
      this.balance = new AtomicReference<>(balance);
    }

    @Override
    public BigDecimal getBalance() {
      return balance.get();
    }

    @Override
    public void withDraw(BigDecimal amount) {
      while (true){
            BigDecimal prev = balance.get();
            BigDecimal next = prev.subtract(amount);
            if(balance.compareAndSet(prev,next)){
                break;
            }
      }
    }
}

interface DecimalAccount{

    //获取余额
    BigDecimal getBalance();

    //取款
    void withDraw(BigDecimal amount);

    //方法内启动1000个线程,每个线程-10元
    //如果总额为10000,那么余额将会为0
    static void demo(DecimalAccount account2){
      List<Thread> ts = new ArrayList<>();
      for(int i=0;i<1000;i++){
            ts.add(new Thread(()->{
                account2.withDraw(BigDecimal.TEN);
            }));
      }
      //计算起始时间
      long start = System.nanoTime();
      ts.forEach(Thread::start);
      ts.forEach(t->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      });
      long end = System.nanoTime();
      System.out.println(account2.getBalance()
                +"cost : "+(end-start)/1000_000+"ms");

    }
}
3.2、ABA问题:

##主线程将共享变量从A---修改成---->C
##但在主线程修改过程中t线程将共享变量从A---修改成---->B---又修改成--->A
##主线程还是会修改成功,但主线程是无法感知共享变量的变化的

##代码如下:
@Slf4j
public class AtomicReferenceABA01 {

    static AtomicReference<String> at = new AtomicReference("A");
    public static void main(String[] args) {
      log.debug("main start...");
      String prev = at.get();
      other();
      try {
            Thread.sleep(1000);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
      log.debug("main change A-->C : {}",at.compareAndSet(prev,"C"));
    }

    private static void other(){
      new Thread(()->{
            log.debug("t1 change A-->B : {}",at.compareAndSet("A","B"));
      },"t1").start();

      try {
            Thread.sleep(500);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }

      new Thread(()->{
            log.debug("t1 change B-->A : {}",at.compareAndSet("B","A"));
      },"t2").start();
    }
}
##结果:
10:06:54.024 DEBUG xxx - main start...
10:06:54.119 DEBUG xxx - t1 change A-->B : true
10:06:54.620 DEBUG xxx - t1 change B-->A : true
10:06:55.634 DEBUG xxx - main change A-->C : true

##如何才能让main线程感知到 共享变量被修改呢?
##只要有其他线程动过了共享变量,那么自己的CAS就算失败,这时仅比较值是不够的,
需要再增加一个版本号
3.3、AtomicStampedReference

@Slf4j
public class AtomicStampedReferenceABA02 {

    static AtomicStampedReference<String> asr =new AtomicStampedReference<>("A",0);
    public static void main(String[] args) {
      log.debug("main start....");
      String prev = asr.getReference();
      int stamp = asr.getStamp();
      log.debug("main prev:{} , stamp:{}",prev,stamp);
      other();
      try {
            Thread.sleep(1000);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
      log.debug("main change A-->C : {}",asr.compareAndSet(prev,"C",stamp,stamp+1));
    }

    private static void other(){
      new Thread(()->{
            String prev = asr.getReference();
            int stamp = asr.getStamp();
            log.debug("t1 prev:{} , stamp:{}",prev,stamp);
            log.debug("t1 change A-->B : {}",asr.compareAndSet(prev,"B",stamp,stamp+1));
      },"t1").start();
      try {
            Thread.sleep(1000);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
      new Thread(()->{
            String prev = asr.getReference();
            int stamp = asr.getStamp();
            log.debug("t2 prev:{} , stamp:{}",prev,stamp);
            log.debug("t2 change B-->A : {}",asr.compareAndSet(prev,"A",stamp,stamp+1));
      },"t2").start();
    }
}
##结果:
10:25:43.158 DEBUG xxx - main start....
10:25:43.172 DEBUG xxx - main prev:A , stamp:0
10:25:43.217 DEBUG xxx - t1 prev:A , stamp:0
10:25:43.217 DEBUG xxx - t1 change A-->B : true
10:25:44.227 DEBUG xxx - t2 prev:B , stamp:1
10:25:44.227 DEBUG xxx - t2 change B-->A : true
10:25:45.242 DEBUG xxx - main change A-->C : false
3.4、AtomicMarkableReference

##AtomicStampedReference 通过版本号 可以记录 是否被修改了,和修改了几次
##AtomicMarkableReference 通过boolean标记 是否被修改
@Slf4j
public class AtomicMarkableReferenceTest {

    public static void main(String[] args) {
      GarbageBag bag = new GarbageBag("一个满了的垃圾袋");
      AtomicMarkableReference<GarbageBag> amr = new AtomicMarkableReference<>(bag,true);
      log.debug("main start...");
      GarbageBag prev = amr.getReference();
      log.debug("main prev:{}",prev);

      new Thread(()->{
            log.debug("清洁阿姨 更换垃圾袋: {}",amr.compareAndSet(prev,new GarbageBag("清洁阿姨放了一个新的垃圾袋"),true,false));
      },"清洁阿姨").start();


      try {
            Thread.sleep(1000);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }

      log.debug("main 更换垃圾袋: {}",amr.compareAndSet(prev,new GarbageBag("main放了一个新的垃圾袋"),true,false));

    }

}

@Data
@ToString
@AllArgsConstructor
class GarbageBag{
    private String desc;
}
##结果:
10:41:53.070 DEBUG xxx - main start...
10:41:53.086 DEBUG xxx - main prev:GarbageBag(desc=一个满了的垃圾袋)
10:41:53.150 [清洁阿姨] DEBUG xxx - 清洁阿姨 更换垃圾袋: true
10:41:54.149 DEBUG xxx - main 更换垃圾袋: false
四、原子数组

·AtomicIntegerArray
·AtomicLongArray
·AtomicReferenceArray

##原子数组可以在多线程中保护数组里的元素
@Slf4j
public class AtomicIntegerArrayTest {

    public static void main(String[] args) {
      //创建了容量为10个int的AtomicIntegerArray
      AtomicIntegerArray array = new AtomicIntegerArray(10);
      log.debug("第2个值:{}",array.get(1));


      int[] array2 = new int[]{1,2,3,4,5,6,7,8,9,10};
      AtomicIntegerArray array3 = new AtomicIntegerArray(array2);
      log.debug("第2个值:{}",array3.get(1));
      log.debug("第2个值+1:{}",array3.getAndIncrement(1));
      log.debug("第2个值:{}",array3.get(1));
      log.debug("第3个值+5:{}",array3.getAndAdd(2,5));
      log.debug("第3个值:{}",array3.get(2));

      log.debug("第4个值*7:{}",array3.getAndUpdate(3,t->t*7));
      log.debug("第3个值:{}",array3.get(3));
      
      log.debug("修改第5个值为999");
      array3.set(4,999);
      log.debug("第5个值:{}",array3.get(4));
    }
}
##结果:
11:45:37.301 DEBUG xxx - 第2个值:0
11:45:37.316 DEBUG xxx - 第2个值:2
11:45:37.317 DEBUG xxx - 第2个值+1:2
11:45:37.317 DEBUG xxx - 第2个值:3
11:45:37.317 DEBUG xxx - 第3个值+5:3
11:45:37.317 DEBUG xxx - 第3个值:8
11:45:37.361 DEBUG xxx - 第4个值*7:4
11:45:37.361 DEBUG xxx - 第3个值:28
11:47:45.114 DEBUG xxx - 修改第5个值为999
11:47:45.114 DEBUG xxx - 第5个值:999
五、字段更新器

·AtomicReferenceFieldUpdater
·AtomicIntegerFieldUpdater
·AtomicLongFieldUpdater

##字段更新器可以在多线程中保护对象的某个属性\成员变量
@Slf4j
public class AtomicReferenceFieldUpdaterTest {
    public static void main(String[] args) {
      Student st = new Student("李四");

      //参数:类,要更新字段的类型,要更新字段的名称
      AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(Student.class,String.class,"name");

      log.debug("更新名称为张三:{}",updater.compareAndSet(st,"李四","张三"));
      log.debug("st : {}",st);
    }
}

@Data
@ToString
@AllArgsConstructor
class Student{
    volatile String name;
}
六、原子累加器

##ActomicLong和LongAdder累加的性能比较
@Slf4j
public class LongAdderTest {
    public static void main(String[] args) {
      for (int i = 0; i <5 ; i++) {
            demo(
                  ()->new AtomicLong(0),
                  (adder)->adder.getAndIncrement()
            );
      }
      log.debug("-------------");
      for (int i = 0; i <5 ; i++) {
            demo(
                  ()->new LongAdder(),
                  (adder)->adder.increment()
            );
      }
    }

    private static <T> void demo(Supplier<T> supplier, Consumer<T> consumer){
      T adder = supplier.get();
      List<Thread> ts = new ArrayList<>();
      //4个线程,每人累加50万
      for (int i=0;i<4;i++){
         ts.add( new Thread(()->{
                for(int j=0;j<500000;j++){
                  consumer.accept(adder);
                }
         }));
      }

      long start = System.nanoTime();
      ts.forEach(t->t.start());
      ts.forEach(t->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      });
      long end = System.nanoTime();
      log.debug("adder {}, cost : {} ms",adder,(end-start)/1000_000);

    }
}
##结果:
14:57:20.170 DEBUG xxx - adder 2000000, cost : 49 ms
14:57:20.228 DEBUG xxx - adder 2000000, cost : 43 ms
14:57:20.265 DEBUG xxx - adder 2000000, cost : 36 ms
14:57:20.303 DEBUG xxx - adder 2000000, cost : 37 ms
14:57:20.344 DEBUG xxx - adder 2000000, cost : 41 ms
14:57:20.344 DEBUG xxx - -------------
14:57:20.359 DEBUG xxx - adder 2000000, cost : 13 ms
14:57:20.364 DEBUG xxx - adder 2000000, cost : 5 ms
14:57:20.370 DEBUG xxx - adder 2000000, cost : 5 ms
14:57:20.377 DEBUG xxx - adder 2000000, cost : 6 ms
14:57:20.382 DEBUG xxx - adder 2000000, cost : 4 ms

##LongAdder性能提升的原因:

LongAdder性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Thread-0累加Cell
而Thread-1累加Cell...最后将结果汇总,这样他们在累加时操作的不同的Cell变量,因此减
少了CAS重试失败,从而提高性能
七、Unsafe

·Unsafe对象提供了非常底层的,操作内存、线程的方法
·Unsafe对象不能直接调用,只能通过反射获得
获取Unsafe:
@Slf4j
public class UnsafeAccessor {
    public static void main(String[] args) {
      Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
      theUnsafe.setAccessible(true);
      Unsafe unsafe = (Unsafe) theUnsafe.get(null);
      log.debug("unsafe:{}",unsafe);
    }
}
Unsafe CAS操纵:
/**
* 使用Unsafe线程安全的操作Teacher对象的成员变量
* (之前使用AtomicReferenceFieldUpdater)
*/
@Slf4j
public class UnsafeAccessor {

    public static void main(String[] args) {
      try {
            //获取Unsafe
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            Unsafe unsafe = (Unsafe) theUnsafe.get(null);
            log.debug("unsafe:{}",unsafe);

            Teacher tc = new Teacher(1,"张三");
            //获取域的偏移地址
            long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
            long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));

            //执行CAS操作
            unsafe.compareAndSwapInt(tc,idOffset,1,2);
            unsafe.compareAndSwapObject(tc,nameOffset,"张三","李四");

            //检验
            log.debug("tc : {}",tc);

      } catch (NoSuchFieldException e) {
            e.printStackTrace();
      } catch (IllegalAccessException e) {
            e.printStackTrace();
      }
    }
}

@Data
@ToString
@AllArgsConstructor
class Teacher{
    volatile int id;
    volatile String name;
}
Unsafe模仿原子整数:
@Slf4j
public class UnsafeForAtomicInteger {

    public static void main(String[] args) {
      MyAtomicInteger mat = new MyAtomicInteger(10000);

      List<Thread> ts = new ArrayList<>();
      //1000个线程,每个线程减去10
      for(int i=0;i<1000;i++){
            ts.add(new Thread(()->{
                mat.decrement(10);
            }));
      }

      ts.forEach(t->t.start());
      ts.forEach(t->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      });

      log.debug("mat : {}",mat.get());
    }
}

class MyAtomicInteger{

    private volatile Integer value;
    private static final long valueOffset;
    private static final Unsafe UNSAFE;

    static{
      try {
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            UNSAFE = (Unsafe) theUnsafe.get(null);

            valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));
      } catch (NoSuchFieldException e) {
            e.printStackTrace();
            throw new RuntimeException();
      } catch (IllegalAccessException e) {
            e.printStackTrace();
            throw new RuntimeException();
      }
    }

    public MyAtomicInteger(Integer value) {
      this.value = value;
    }

    public Integer get(){
      return value;
    }

    public void decrement(Integer num){
      while (true){
            Integer prev = this.value;
            Integer next = prev - num;
            if(UNSAFE.compareAndSwapObject(this,valueOffset,prev,next)){
                break;
            }
      }
    }
}


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 04、JUC并发编程之:简单概述(四)