CAS和原子类

Mr.LR2022年5月15日
大约 11 分钟

cas和原子类

JUC中多数类是通过volatile和CAS来实现的,CAS本质上提供的是一种无锁方案,而Synchronized和Lock是互斥锁方案; java原子类本质上使用的是CAS,而CAS底层是通过Unsafe类实现的。所以本章将对CAS, Unsafe和原子类详解。

cas是什么

compare and swap的缩写,中文翻译成比较并交换,实现并发算法时常用到的一种技术。它包含三个操作数——内存位置、预期原值及更新值。执行CAS操作的时候,将内存位置的值与预期原值比较:

  • 如果相匹配,那么处理器会自动将该位置值更新为新值,
  • 如果不匹配,处理器不做任何操作,多个线程同时执行CAS操作只有一个会成功。

CAS有3个操作数,位置内存值V,旧的预期值A,要修改的更新值B。 当且仅当旧的预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做或重来

image-20220515163227289

CAS是JDK提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。它是非阻塞的且自身原子性,也就是说cas效率更高且通过硬件保证,说明它更可靠。

示例

public static void main(String[] args) throws InterruptedException {
    
    AtomicInteger atomicInteger = new AtomicInteger(5);
    //如果是5 ,则替换为2020
    System.out.println(atomicInteger.compareAndSet(5, 2020) + "\t" + atomicInteger.get());
    //如果是5 ,1024
    System.out.println(atomicInteger.compareAndSet(5, 1024) + "\t" + atomicInteger.get());
}
输出
true	2020
false	2020

cas底层原理,UnSafe类

UnSafe是什么

UnSafe是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于Unsafe类的方法。

这个类尽管里面的方法都是 public 的,但是并没有办法使用它们,JDK API 文档也没有提供任何关于这个类的方法的解释。总而言之,对于 Unsafe 类的使用都是受限制的,只有授信的代码才能获得该类的实例,当然 JDK 库里面的类是可以随意使用的。

例如atomicInteger.getAndIncrement()

  • 源码分析
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

假设线程A和线程B两个线程同时执行getAndAddInt操作:

  1. AtomicInteger里面的value原始值为3,即主内存中AtomicInteger的value为3,根据JMM模型,线程A和线程B各自持有一份值为3的value的副本分别到各自的工作内存。

  2. 线程A通过getIntVolatile(var1, var2)拿到value值3,这时线程A被挂起。

  3. 线程B也通过getIntVolatile(var1, var2)方法获取到value值3,此时刚好线程B没有被挂起并执行compareAndSwapInt方法比较内存值也为3,成功修改内存值为4,线程B成功修改结束。

  4. 这时线程A恢复,执行compareAndSwapInt方法比较,发现自己手里的值数字3和主内存的值数字4不一致,说明该值已经被其它线程抢先一步修改过了,那A线程本次修改失败,只能重新读取重新来一遍了。

  5. 线程A重新获取value值,因为变量value被volatile修饰,所以其它线程对它的修改,线程A总是能够看到,线程A继续执行compareAndSwapInt进行比较替换,直到成功。

cas缺点

CAS 方式为乐观锁,synchronized 为悲观锁。因此使用 CAS 解决并发问题通常情况下性能更优。

但使用 CAS 方式也会有几个问题:

ABA问题

因为CAS需要在操作值的时候,检查值有没有发生变化,比如没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时则会发现它的值没有发生变化,但是实际上却变化了。

ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A->B->A就会变成1A->2B->3A。

JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

解决ABA问题示例

public class test {

    static AtomicInteger atomicInteger = new AtomicInteger(100);
    static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1);

    public static void main(String[] args) {
/*        new Thread(() -> {
            atomicInteger.compareAndSet(100,101);
            atomicInteger.compareAndSet(101,100);
        },"t1").start();

        new Thread(() -> {
            //暂停一会儿线程
            try { Thread.sleep( 500 ); } catch (InterruptedException e) { e.printStackTrace(); };            System.out.println(atomicInteger.compareAndSet(100, 2019)+"\t"+atomicInteger.get());
        },"t2").start();

        //暂停一会儿线程,main彻底等待上面的ABA出现演示完成。
        try { Thread.sleep( 2000 ); } catch (InterruptedException e) { e.printStackTrace(); }*/

        System.out.println("============以下是ABA问题的解决=============================");

        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();
            System.out.println(Thread.currentThread().getName()+"\t 首次版本号:"+stamp);//1
            //暂停一会儿线程,
            try { Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); }
            atomicStampedReference.compareAndSet(100,101,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
            System.out.println(Thread.currentThread().getName()+"\t 2次版本号:"+atomicStampedReference.getStamp());
            atomicStampedReference.compareAndSet(101,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
            System.out.println(Thread.currentThread().getName()+"\t 3次版本号:"+atomicStampedReference.getStamp());
        },"t3").start();

        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();
            System.out.println(Thread.currentThread().getName()+"\t 首次版本号:"+stamp);//1
            //暂停一会儿线程,获得初始值100和初始版本号1,故意暂停3秒钟让t3线程完成一次ABA操作产生问题
            try { Thread.sleep( 3000 ); } catch (InterruptedException e) { e.printStackTrace(); }
            boolean result = atomicStampedReference.compareAndSet(100,2019,stamp,stamp+1);
            System.out.println(Thread.currentThread().getName()+"\t"+result+"\t"+atomicStampedReference.getReference());
        },"t4").start();
    }
}

循环时间长开销大

image-20220515173240215

我们可以看到getAndAddInt方法执行时,有个do while,如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销。

只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。

还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量i = 2,j = a,合并一下ij = 2a,然后用CAS来操作ij。

原子操作类

基本类型原子类

使用原子的方式更新基本类型,Atomic包提供了以下3个类。

  • AtomicBoolean: 原子更新布尔类型。
  • AtomicInteger: 原子更新整型。
  • AtomicLong: 原子更新长整型。

常用api

public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)

示例

public class AtomicInt {
    public static void main(String[] args) throws InterruptedException {
        MyNumber myNumber = new MyNumber();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for (int i = 1; i <=100; i++) {
            new Thread(() -> {
                try
                {
                    for (int j = 1; j <=5000; j++)
                    {
                        myNumber.addPlusPlus();
                    }
                }finally {
                    countDownLatch.countDown();
                }
            },String.valueOf(i)).start();
        }

        countDownLatch.await();

        System.out.println(myNumber.getAtomicInteger().get());
    }
}
class MyNumber {
    private AtomicInteger atomicInteger = new AtomicInteger();
    public void addPlusPlus()
    {
        atomicInteger.incrementAndGet();
    }
    public AtomicInteger getAtomicInteger() {
        return atomicInteger;
    }
    public void setAtomicInteger(AtomicInteger atomicInteger) {
        this.atomicInteger = atomicInteger;
    }
}

数组类型原子类

通过原子的方式更新数组里的某个元素,Atomic包提供了以下的3个类:

  • AtomicIntegerArray: 原子更新整型数组里的元素。
  • AtomicLongArray: 原子更新长整型数组里的元素。
  • AtomicReferenceArray: 原子更新引用类型数组里的元素。
public class AtomicArry {
    public static void main(String[] args) {
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});

        for (int i = 0; i <atomicIntegerArray.length(); i++) {
            System.out.println(atomicIntegerArray.get(i));
        }
        System.out.println("------------------------------");
        int tmpInt = 0;
        tmpInt = atomicIntegerArray.getAndSet(0,1122);//将索引为0位置的元素 更新为1122 并返回原来的旧址
        System.out.println(tmpInt+"\t"+atomicIntegerArray.get(0));
        atomicIntegerArray.getAndIncrement(1);
        atomicIntegerArray.getAndIncrement(1);
        tmpInt = atomicIntegerArray.getAndIncrement(1);
        System.out.println(tmpInt+"\t"+atomicIntegerArray.get(1));
    }
}

输出

0
0
0
0
0
------------------------------
0	1122
2	3

引用类型原子类

Atomic包提供了以下三个类:

  • AtomicReference: 原子更新引用类型。
  • AtomicStampedReference:
    • 携带版本号的引用类型原子类,可以解决ABA问题。
    • 解决修改过几次,状态戳原子引用
  • AtomicMarkableReferce: 原子更新带有标记位的引用类型。
    • 原子更新带有标记位的引用类型对象
    • 解决是否修改过,即将状态戳简化为true|false

示例代码

  • AtomicReference使用示例
public class AtomicReferenceDemo {
    public static void main(String[] args)
    {
        User z3 = new User("user01",24);
        User li4 = new User("user02",26);

        AtomicReference<User> atomicReferenceUser = new AtomicReference<>();

        atomicReferenceUser.set(z3);
        System.out.println(atomicReferenceUser.compareAndSet(z3,li4)+"\t"+atomicReferenceUser.get().toString());
        System.out.println(atomicReferenceUser.compareAndSet(z3,li4)+"\t"+atomicReferenceUser.get().toString());
    }
}

输出

true	com.company.jucmax.atomic.User@60e53b93
false	com.company.jucmax.atomic.User@60e53b93
  • 利用AtomicReference实现自旋锁
public class SpinLockDemo {
    AtomicReference<Thread> atomicReference = new AtomicReference<>();

    public void myLock() {
        Thread thread = Thread.currentThread();
        System.out.println(Thread.currentThread().getName()+"\t come in");
        while(!atomicReference.compareAndSet(null,thread)) {

        }
    }

    public void myUnLock() {
        Thread thread = Thread.currentThread();
        atomicReference.compareAndSet(thread,null);
        System.out.println(Thread.currentThread().getName()+"\t myUnLock over");
    }

    public static void main(String[] args) {
        SpinLockDemo spinLockDemo = new SpinLockDemo();

        new Thread(() -> {
            spinLockDemo.myLock();
            //暂停一会儿线程
            try { TimeUnit.SECONDS.sleep( 5 ); } catch (InterruptedException e) { e.printStackTrace(); }
            spinLockDemo.myUnLock();
        },"A").start();
        //暂停一会儿线程,保证A线程先于B线程启动并完成
        try { TimeUnit.SECONDS.sleep( 1 ); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            spinLockDemo.myLock();
            spinLockDemo.myUnLock();
        },"B").start();

    }
}

说明:可以理解为 A线程先调用myLock方法,持有锁,B调用myLock时,由于不是null,所以一直while循环等待,直到5秒后,A调用myUnLock方法释放锁,B才可以继续往后执行。

  • AtomicStampedReference解决ABA问题
public class ABADemo {
    static AtomicInteger atomicInteger = new AtomicInteger(100);
    static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1);

    public static void main(String[] args) {
        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            atomicStampedReference.compareAndSet(100,101,stamp,stamp+1);
            atomicStampedReference.compareAndSet(101,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
        },"t3").start();

        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();
            System.out.println("t4 ----第1次stamp  "+stamp);
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            boolean result = atomicStampedReference.compareAndSet(100, 20210308, stamp, stamp + 1);
            System.out.println(Thread.currentThread().getName()+"\t"+result+"\t"+atomicStampedReference.getReference());
        },"t4").start();
    }
}    

说明:t3在调用两次atomicStampedReference.compareAndSet方法后,虽然值改回了100,但是版本号为3,此时t4是第一次修改,虽然目前发现版本号已经不是1,因此修改失败。

对象的属性修改原子类

Atomic包提供了3个类进行原子字段更新:

  • AtomicIntegerFieldUpdater原子更新对象中int类型字段的值
  • AtomicLongFieldUpdater原子更新对象中Long类型字段的值
  • AtomicReferenceFieldUpdater原子更新引用类型字段的值

使用方法:

  • 因为原子更新字段类都是抽象类,每次使用的时候必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
  • 更新类的字段必须使用public volatile修饰

AtomicReferenceFieldUpdater示例

public class AtomicIntegerFieldUpdaterDemo {
    public static void main(String[] args) throws InterruptedException {
        AtomicReferenceFieldUpdater<Mycar,String> atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Mycar.class,String.class,"no");
        Mycar myVar = new Mycar();
        atomicReferenceFieldUpdater.compareAndSet(myVar,null,"03");
        System.out.println(myVar);
    }
}
class Mycar {
    public volatile String no ;
    @Override
    public String toString() {
        return "Mycar{" +
                "no='" + no + '\'' +
                '}';
    }
}

AtomicReferenceFieldUpdater实现:多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能初始化一次

public class AtomicReferenceFieldUpdaterSing {
    public static void main(String[] args) throws InterruptedException {
        MyVar myVar = new MyVar();

        for (int i = 1; i <= 5; i++) {
            new Thread(() -> {
                myVar.init(myVar);
            }, String.valueOf(i)).start();
        }
    }
}

class MyVar {
    public volatile Boolean isInit = Boolean.FALSE;
    AtomicReferenceFieldUpdater<MyVar, Boolean> atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class, Boolean.class, "isInit");
    public void init(MyVar myVar) {
        if (atomicReferenceFieldUpdater.compareAndSet(myVar, Boolean.FALSE, Boolean.TRUE)) {
            System.out.println(Thread.currentThread().getName() + "\t" + "---init.....");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "\t" + "---init.....over");
        } else {
            System.out.println(Thread.currentThread().getName() + "\t" + "------其它线程正在初始化");
        }
    }
}

原子类增强对比

这里以一个案例对比各种原子类的性能区别

50个线程,每个线程100w次,计算总点赞数

public class LongAdderDemo {

    public static final int SIZE_THREAD = 50;

    public static final int _1W = 10000;
    
    public static void main(String[] args) throws Exception{
        ClickNumber clickNumber = new ClickNumber();

        CountDownLatch countDownLatch = new CountDownLatch(SIZE_THREAD);
        CountDownLatch countDownLatch2 = new CountDownLatch(SIZE_THREAD);
        CountDownLatch countDownLatch3 = new CountDownLatch(SIZE_THREAD);
        CountDownLatch countDownLatch4 = new CountDownLatch(SIZE_THREAD);
        CountDownLatch countDownLatch5 = new CountDownLatch(SIZE_THREAD);
        long starttime;
        long endtime;
        starttime = System.currentTimeMillis();
        for(int i=0;i<SIZE_THREAD;i++){

            new Thread(()->{
                try {
                    for(int j=0;j<100*_1W;j++){
                        clickNumber.add_Sy();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    countDownLatch.countDown();
                }

            }).start();
        }
        countDownLatch.await();
        endtime = System.currentTimeMillis();
        System.out.println("synchronized--"+(endtime - starttime)+"毫秒  "+clickNumber.number);
        //另外四个写法基本相同
        //atomicInteger
        //atomicLong
        //longAdder
        //accumulator
    }

}


class ClickNumber{

    int number = 0;
    public synchronized void add_Sy(){//之前的加锁解决并发安全问题
        number++;
    }

    AtomicInteger atomicInteger = new AtomicInteger();
    public void add_atomicInteger(){
        atomicInteger.incrementAndGet();
    }

    AtomicLong atomicLong = new AtomicLong();
    public void add_atomicLong(){
        atomicLong.incrementAndGet();
    }

    LongAdder longAdder = new LongAdder();
    public void add_longAdder(){
        longAdder.increment();
    }

    LongAccumulator accumulator = new LongAccumulator((x,y)->{
        return x+y;
    },0);
    public void add_accumulator(){
        accumulator.accumulate(1);
    }
}

输出

synchronized--3560毫秒  50000000
AtomicInteger--1011毫秒  50000000
AtomicLong--1033毫秒  50000000
LongAdder--128毫秒  50000000
LongAccumulator--69毫秒  50000000

从结果可看出synchronized最慢,LongAdder速度快。

LongAdder为什么这么快?

后续补充-------

参考

上次编辑于: 2022/6/15 22:52:54
贡献者: liurui_60837