原子引用
JUC 并发包提供了:
- AtomicReference
- AtomicMarkableReference
- AtomicStampedReference
AtomicReference 使用举例
- public interface DecimalAccount {
- // 获取余额
- BigDecimal getBalance();
- // 取款
- void withdraw(BigDecimal amount);
- /**
- * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
- * 如果初始余额为 10000 那么正确的结果应当是 0
- */
- static void demo(DecimalAccount account) {
- List<Thread> ts = new ArrayList<>();
- for (int i = 0; i < 1000; i++) {
- ts.add(new Thread(() -> {
- account.withdraw(BigDecimal.TEN);
- }));
- }
- ts.forEach(Thread::start);
- ts.forEach(t -> {
- try {
- t.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- System.out.println(account.getBalance());
- }
- }
复制代码 怎样提供 DecimalAccount 实现,实现安全的取款操作
不安全实现
- class DecimalAccountUnsafe implements DecimalAccount {
- BigDecimal balance;
-
- public DecimalAccountUnsafe(BigDecimal balance) {
- this.balance = balance;
- }
- @Override
- public BigDecimal getBalance() {
- return balance;
- }
- @Override
- public void withdraw(BigDecimal amount) {
- BigDecimal balance = this.getBalance();
- this.balance = balance.subtract(amount);
- }
- }
复制代码 安全实现-使用synchronized
- class DecimalAccountSafeLock implements DecimalAccount {
- private final Object lock = new Object();
- BigDecimal balance;
-
- public DecimalAccountSafeLock(BigDecimal balance) {
- this.balance = balance;
- }
- @Override
- public BigDecimal getBalance() {
- return balance;
- }
- @Override
- public void withdraw(BigDecimal amount) {
- synchronized (lock) {
- BigDecimal balance = this.getBalance();
- this.balance = balance.subtract(amount);
- }
- }
- }
复制代码 安全实现-使用CAS
- class DecimalAccountSafeCas implements DecimalAccount {
-
- AtomicReference<BigDecimal> ref;
-
- public DecimalAccountSafeCas(BigDecimal balance) {
- ref = new AtomicReference<>(balance);
- }
-
- @Override
- public BigDecimal getBalance() {
- return ref.get();
- }
- @Override
- public void withdraw(BigDecimal amount) {
- while (true) {
- BigDecimal prev = ref.get();
- BigDecimal next = prev.subtract(amount);
- if (ref.compareAndSet(prev, next)) {
- break;
- }
- }
- }
- }
复制代码 测试代码
- DecimalAccount.demo(new DecimalAccountUnsafe(new BigDecimal("10000")));
- DecimalAccount.demo(new DecimalAccountSafeLock(new BigDecimal("10000")));
- DecimalAccount.demo(new DecimalAccountSafeCas(new BigDecimal("10000")));
复制代码 运行结果
- 4310 cost: 425 ms
- 0 cost: 285 ms
- 0 cost: 274 ms
复制代码 AtomicStampedReference 使用举例
使用 AtomicReference 时线程仅能判定出共享变量的值与最初值 A 是否类似,不能感知到从 A 改为 B 又改回 A 的情况,假如主线程 希望: 只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比力值是不够的,须要再加一个版本号,使用 AtomicStampedReference 可以实现
- static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
- public static void main(String[] args) throws InterruptedException {
- log.debug("main start...");
- // 获取值 A
- String prev = ref.getReference();
- // 获取版本号
- int stamp = ref.getStamp();
- log.debug("版本 {}", stamp);
- // 如果中间有其它线程干扰,发生了 ABA 现象
- other();
- sleep(1);
- // 尝试改为 C
- log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
- }
- private static void other() {
- new Thread(() -> {
- log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
- ref.getStamp(), ref.getStamp() + 1));
- log.debug("更新版本为 {}", ref.getStamp());
- }, "t1").start();
- sleep(0.5);
- new Thread(() -> {
- log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",
- ref.getStamp(), ref.getStamp() + 1));
- log.debug("更新版本为 {}", ref.getStamp());
- }, "t2").start();
- }
复制代码 输出如下,最后因为版本号不同等没有修改成功
- 15:21:34.891 c.Test36 [main] - main start...
- 15:21:34.894 c.Test36 [main] - 版本 0
- 15:21:34.956 c.Test36 [t1] - change A->B true
- 15:21:34.956 c.Test36 [t1] - 更新版本为 1
- 15:21:35.457 c.Test36 [t2] - change B->A true
- 15:21:35.457 c.Test36 [t2] - 更新版本为 2
- 15:21:36.457 c.Test36 [main] - change A->C false
复制代码 AtomicMarkableReference 使用举例
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A -> C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。但是有时间,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了 AtomicMarkableReference。
- class GarbageBag {
- String desc;
- public GarbageBag(String desc) {
- this.desc = desc;
- }
- public void setDesc(String desc) {
- this.desc = desc;
- }
- @Override
- public String toString() {
- return super.toString() + " " + desc;
- }
- }
复制代码- @Slf4j
- public class TestABAAtomicMarkableReference {
- public static void main(String[] args) throws InterruptedException {
- GarbageBag bag = new GarbageBag("装满了垃圾");
- // 参数2 mark 可以看作一个标记,表示垃圾袋满了
- AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
- log.debug("主线程 start...");
- GarbageBag prev = ref.getReference();
- log.debug(prev.toString());
- new Thread(() -> {
- log.debug("打扫卫生的线程 start...");
- bag.setDesc("空垃圾袋");
- while (!ref.compareAndSet(bag, bag, true, false)) {}
- log.debug(bag.toString());
- }).start();
- Thread.sleep(1000);
- log.debug("主线程想换一只新垃圾袋?");
- boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
- log.debug("换了么?" + success);
- log.debug(ref.getReference().toString());
- }
- }
复制代码 输出
- 15:30:09.264 [main] 主线程 start...
- 15:30:09.270 [main] cn.itcast.GarbageBag@5f0fd5a0 装满了垃圾
- 15:30:09.293 [Thread-1] 打扫卫生的线程 start...
- 15:30:09.294 [Thread-1] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋
- 15:30:10.294 [main] 主线程想换一只新垃圾袋?
- 15:30:10.294 [main] 换了么?false
- 15:30:10.294 [main] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋
复制代码 原子数组
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
AtomicIntegerArray 使用举例
有如下方法
- /**
- 参数1,提供数组、可以是线程不安全数组或线程安全数组
- 参数2,获取数组长度的方法
- 参数3,自增方法,回传 array, index
- 参数4,打印数组的方法
- */
- // supplier 提供者 无中生有 ()->结果
- // function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
- // consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)-> void
- private static <T> void demo(
- Supplier<T> arraySupplier,
- Function<T, Integer> lengthFun,
- BiConsumer<T, Integer> putConsumer,
- Consumer<T> printConsumer ) {
- List<Thread> ts = new ArrayList<>();
- T array = arraySupplier.get();
- int length = lengthFun.apply(array);
- for (int i = 0; i < length; i++) {
- // 每个线程对数组作 10000 次操作
- ts.add(new Thread(() -> {
- for (int j = 0; j < 10000; j++) {
- putConsumer.accept(array, j%length);
- }
- }));
- }
- ts.forEach(t -> t.start()); // 启动所有线程
- ts.forEach(t -> {
- try {
- t.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }); // 等所有线程结束
- printConsumer.accept(array);
- }
复制代码 不安全的数组
- demo(
- ()->new int[10],
- (array)->array.length,
- (array, index) -> array[index]++,
- array-> System.out.println(Arrays.toString(array))
- );
复制代码 结果
- [9870, 9862, 9774, 9697, 9683, 9678, 9679, 9668, 9680, 9698]
复制代码 安全的数组
- demo(
- ()-> new AtomicIntegerArray(10),
- (array) -> array.length(),
- (array, index) -> array.getAndIncrement(index),
- array -> System.out.println(array)
- );
复制代码 结果
- [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
复制代码 字段更新器
- AtomicReferenceFieldUpdater // 域 字段
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
使用字段更新器,可以针对对象的某个域(Field)举行原子操作,只能配合 volatile 修饰的字段使用,否则会出现 IllegalArgumentException 异常
AtomicReferenceFieldUpdater 使用举例
- public class Test {
- @Test
- public void test() {
- A a = new A();
- a.setNum(0);
- AtomicReferenceFieldUpdater<A, Integer> num = AtomicReferenceFieldUpdater.newUpdater(A.class, Integer.class, "num");
- List<Thread> ts = new ArrayList<>();
- for (int i = 0; i < 20; i++) {
- // 每个线程对数组作 10000 次操作
- ts.add(new Thread(() -> {
- for (int j = 0; j < 10000; j++) {
- while (true) {
- Integer prev = num.get(a);
- Integer next = prev + 1;
- if (num.compareAndSet(a, prev, next)) {
- break;
- }
- }
- }
- }));
- }
- ts.forEach(Thread::start);
- ts.forEach(t -> {
- try {
- t.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- System.out.println(num.get(a));
- }
- }
- @Data
- class A{
- volatile Integer num;
- }
复制代码 结果
200000
执行对比:
- public class Test {
-
- @Test
- public void test() {
- A a = new A();
- a.setNum(new AtomicInteger(0));
- List<Thread> ts = new ArrayList<>();
- for (int i = 0; i < 20; i++) {
- // 每个线程对数组作 10000 次操作
- ts.add(new Thread(() -> {
- for (int j = 0; j < 1000; j++) {
- a.getNum().addAndGet(1);
- }
- }));
- }
- ts.forEach(Thread::start);
- ts.forEach(t -> {
- try {
- t.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- System.out.println(a.getNum().get());
- }
- }
- @Data
- class A {
- AtomicInteger num;
- }
复制代码- public class Test {
- @Test
- public void test() {
- A a = new A();
- a.setNum(new AtomicReference<>(0));
- List<Thread> ts = new ArrayList<>();
- for (int i = 0; i < 20; i++) {
- // 每个线程对数组作 10000 次操作
- ts.add(new Thread(() -> {
- for (int j = 0; j < 10000; j++) {
- while (true) {
- Integer prev = a.getNum().get();
- Integer next = prev + 1;
- if (a.getNum().compareAndSet(prev, next)) {
- break;
- }
- }
- }
- }));
- }
- ts.forEach(Thread::start);
- ts.forEach(t -> {
- try {
- t.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- System.out.println(a.getNum().get());
- }
- }
- @Data
- class A {
- AtomicReference<Integer> num;
- }
复制代码 AI 解释:
- 在第三个示例中,使用的是AtomicReference<Integer>,并且在每次递增时,需要显式地进行一个循环,在这个循环中不断尝试使用compareAndSet方法来安全地更新共享变量。在高并发的情况下,多个线程可能会竞争更新同一个变量,这可能导致多次失败的compareAndSet尝试,因而增加了循环的迭代次数。
- 在第二个示例中,使用的是AtomicInteger,这是一个针对整数值的原子类。在这个示例中,使用了addAndGet方法,它是一个原子操作,无需显式循环来尝试更新。addAndGet方法会自动处理并发更新的问题,使得代码更加简洁,并且在性能上可能更优,因为它减少了失败更新尝试的可能性。
- 在效率和性能方面,第二种方法(使用AtomicInteger和addAndGet)通常比第一种方法(使用AtomicReference<Integer>和显式循环)更优,原因包括:
- 更少的重试次数:AtomicInteger的addAndGet方法通常会有更少的重试次数,因为它是专为数值运算设计的原子操作。
- 原子操作的优化:AtomicInteger的方法可能会更好地利用底层硬件的原子性支持,如使用CPU的原子指令集(比如compareAndSwap),从而提高操作的效率。
- 更简洁的代码:AtomicInteger提供了直接的方法来进行原子性数值操作,使得代码更简洁、更易于理解和维护。
- 但是,实际性能的差异还是需要通过具体的性能测试来确定,因为它可能受到多种因素的影响,包括JVM实现、底层硬件、操作系统以及其他并发任务的特性。
- --------------------------------------------------------------------------------------------
- 这第一段代码使用了AtomicReferenceFieldUpdater来以一种线程安全的方式更新A类的num字段。与前面的例子相比,这里有几个关键点需要注意:
- AtomicReferenceFieldUpdater的使用:AtomicReferenceFieldUpdater是一个原子更新器,它允许你以线程安全的方式更新指定类的指定volatile字段。这是一个高级工具,通常用于在不直接使用原子类(例如AtomicInteger)时优化内存使用和性能,因为它可以在不为每个字段创建额外对象的情况下实现字段的原子操作。
- 字段定义:num字段定义为volatile类型的Integer,这意味着其修改对其他线程立即可见,但请注意volatile并不能保证复合(多步)操作的原子性。
- 性能:尽管使用了AtomicReferenceFieldUpdater,但每次递增操作仍然需要显式的循环来尝试更新,类似于第三个示例中的AtomicReference<Integer>。这意味着高并发下可能会有多次失败的compareAndSet尝试,这可能导致性能上的开销。
- 在性能和效率方面,尽管AtomicReferenceFieldUpdater提供了一种更轻量级的原子操作方式,这段代码的性能可能与第三个示例(使用AtomicReference<Integer>)相似,因为它也依赖于显式的重试循环来成功更新值。虽然AtomicReferenceFieldUpdater可能会比为每个变量创建一个AtomicReference对象更节省内存,但由于有失败重试的可能性,其性能可能不如第二个示例(使用AtomicInteger和addAndGet方法)。
- 需要注意的是,AtomicReferenceFieldUpdater要求字段必须是volatile类型的,而且对于基本数据类型,比如int,应该使用相应的AtomicIntegerFieldUpdater。在此代码示例中,可能需要将Integer字段替换为int类型,并使用AtomicIntegerFieldUpdater,因为AtomicReferenceFieldUpdater通常是用于引用类型的字段。如果保持为Integer类型,可能会产生额外的装箱和拆箱操作,这将进一步影响性能。
复制代码 原子累加器
LongAdder 使用举例
- private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
- T adder = adderSupplier.get();
- List<Thread> ts = new ArrayList<>();
- // 4 个线程,每人累加 50 万
- for (int i = 0; i < 40; i++) {
- ts.add(new Thread(() -> {
- for (int j = 0; j < 500000; j++) {
- action.accept(adder);
- }
- }));
- }
- ts.forEach(t -> t.start());
- ts.forEach(t -> {
- try {
- t.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- long end = System.nanoTime();
- System.out.println(adder + " cost:" + (end - start)/1000_000);
- }
复制代码 比力 AtomicLong 与 LongAdder
- for (int i = 0; i < 5; i++) {
- demo(() -> new LongAdder(), adder -> adder.increment());
- }
- for (int i = 0; i < 5; i++) {
- demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
- }
复制代码 测试结果:LongAdder 的累加性能更优,因为 LongAdder 在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]… 总Cell个数不超过CPU核心数,最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此淘汰了 CAS 重试失败,从而进步性能
Unsafe类
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射得到,前面的原子整数、原子引用这些类的底层都是调用 Unsafe,Unsafe 底层会调用 CPU 的 lock 打头的指令之类
得到 Unsafe 对象的代码:
- public class UnsafeAccessor {
- static Unsafe unsafe;
- static {
- try {
- Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe 是private的");
- // 因为 theUnsafe 是 private 的
- theUnsafe.setAccessible(true);
- // 因为 theUnsafe 是静态的,从属于类,不属于对象,所以传null即可获得字段值
- unsafe = (Unsafe) theUnsafe.get(null);
- } catch (NoSuchFieldException | IllegalAccessException e) {
- throw new Error(e);
- }
- }
- static Unsafe getUnsafe() {
- return unsafe;
- }
- }
复制代码 使用 Unsafe 对象实现 CAS 操作
- @Data
- class Student {
- volatile int id;
- volatile String name;
- }
复制代码- Unsafe unsafe = UnsafeAccessor.getUnsafe();
- Field id = Student.class.getDeclaredField("id");
- Field name = Student.class.getDeclaredField("name");
- // 获得成员变量的偏移量
- long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id);
- long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);
- Student student = new Student();
- // 使用 cas 方法替换成员变量的值
- UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 true
- UnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true
- System.out.println(student);
复制代码 输出
Student(id=20, name=张三)
怎样自己实现线程安全的整数自减操作?
- class MyAtomicInteger {
- private volatile int num;
- static final Unsafe unsafe;
- static final long DATA_OFFSET;
-
- static {
- unsafe = UnsafeAccessor.getUnsafe();
- try {
- // num 属性在 DataContainer 对象中的偏移量,用于 Unsafe 直接访问该属性
- DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("num"));
- } catch (NoSuchFieldException e) {
- throw new Error(e);
- }
- }
-
- public MyAtomicInteger(int num) {
- this.num = num;
- }
- public void decrease(int amount) {
- while(true) {
- int prev = this.num;
- int next = prev - amount;
- // cas 尝试修改 num 为 旧值 + amount,如果期间旧值被别的线程改了,返回 false
- if (unsafe.compareAndSwapInt(this, DATA_OFFSET, prev, next)) {
- return;
- }
- }
- }
- public int getNum() {
- return num;
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |