无锁

锁能保证同一时刻只能有一个线程访问临界区的资源,从而实现线程安全。然而,锁虽然有效,但采用的是一种悲观的策略。它假设每一次对临界区资源的访问都会发生冲突,当有一个线程访问资源,其他线程就必须等待,所以锁是会阻塞线程执行的。

无锁就是一种乐观的策略,它假设线程对资源的访问是没有冲突的,同时所有的线程执行都不需要等待,可以持续执行。如果遇到冲突的话,就使用一种叫做CAS (比较交换) 的技术来鉴别线程冲突,如果检测到冲突发生,就重试当前操作到没有冲突为止。

CAS比较交换

CAS的全称是 Compare-and-Swap,也就是比较并交换,是并发编程中一种常用的算法。它包含了三个参数:V,A,B。其中,

  • V:表示要读写的内存位置

  • A:表示旧的预期值

  • B:表示新值

CAS指令执行时,当且仅当V的值等于预期值A时,才会将V的值设为B,如果V和A不同,说明可能是其他线程做了更新,那么当前线程就什么都不做,最后,CAS返回的是V的真实值。

而在多线程的情况下,当多个线程同时使用CAS操作一个变量时,只有一个会成功并更新值,其余线程均会失败,但失败的线程不会被挂起,而是不断的再次循环重试。正是基于这样的原理,CAS即时没有使用锁,也能发现其他线程对当前线程的干扰,从而进行及时的处理。

CAS的缺点

循环的时间开销大

在getAndAddInt的方法中,我们可以看到,只是简单的设置一个值却调用了循环,如果CAS失败,会一直进行尝试。如果CAS长时间不成功,那么循环就会不停的跑,无疑会给系统造成很大的开销。

下面是Java源码中关于getAndAddInt方法的实现,可以看到是一个do while循环,不停地尝试。

  • jdk.internal.misc.Unsafe
1
2
3
4
5
6
7
8
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

ABA问题

CAS判断变量操作成功的条件是V的值和A是一致的,这个逻辑有个小小的缺陷,就是如果V的值一开始为A,在准备修改为新值前的期间曾经被改成了B,后来又被改回为A,经过两次的线程修改对象的值还是旧值,那么CAS操作就会误任务该变量从来没被修改过。这就是CAS中的“ABA”问题。

Java并发包中提供了一个带有时间戳的对象引用 AtomicStampedReference,其内部不仅维护了一个对象值,还维护了一个时间戳,当AtomicStampedReference对应的数值被修改时,除了更新数据本身,还需要更新时间戳,只有对象值和时间戳都满足期望值,才能修改成功。

原子操作类

java.util.concurrent.atomic 这个包里面提供了一组原子类。

image-20210902155751377

其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由 JVM 从等待队列中选择另一个线程进入,这只是一种逻辑上的理解。

  • **原子类:**AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference

  • **原子数组:**AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray

  • **原子属性更新器:**AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater

  • **解决 ABA 问题的原子类:**AtomicMarkableReference(通过引入一个 boolean来反映中间有没有变过),AtomicStampedReference(通过引入一个 int 来累加来反映中间有没有变过)。

下面是一个AtomicInteger使用的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class AtomicIntegerTest {
    /** 如果使用注释掉的int,则最后total的值会明显小于100*10000,
     *  因为虽然 total++ 是一条语句,但在虚拟机中会被翻译成多条,并不是线程安全的。
     */
    // private static int total = 0;
    private static AtomicInteger total = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {

        List<Thread> threadList = new ArrayList<>();

        // 100个线程,每个线程加10000次
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    // total++;
                    total.incrementAndGet();
                }
            });
            threadList.add(t);
            t.start();
        }

        for (Thread t : threadList) {
            t.join();
        }

        // assert total = 100 * 10000
        System.out.println("total = " + total);
    }
}

JDK高并发容器

JDK提供的这些容器大部分在java.util.concurrent包中,这里先提纲挈领地介绍一下它们,初次露脸,大家只需要知道他们的作用即可。有关具体的实现和注意事项,在后面我会慢慢道来。

  • **ConcurrentHashMap:**这是一个高效的并发HashMap。你可以理解为一个线程安全的HashMap。

  • **CopyOnWriteArrayList:**这是一个List,从名字看就是和ArrayList是一族的。在读多写少的场合,这个List的性能非常好,远远好于Vector。

  • **ConcurrentLinkedQueue:**高效的并发队列,使用链表实现。可以看做一个线程安全的LinkedList。

  • **BlockingQueue:**这是一个接口,JDK内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。

  • **ConcurrentSkipListMap:**跳表的实现。这是一个Map,使用跳表的数据结构进行快速查找。

除了以上并发包中的专有数据结构外,java.util下的Vector是线程安全的(虽然性能和上述专用工具没得比),另外Collections工具类可以帮助我们将任意集合包装成线程安全的集合。

ConcurrentHashMap

  1. 底层数据结构: JDK1.7的 ConcurrentHashMap 底层采用 分段的数组+链表 实现,JDK1.8 采用的数据结构跟HashMap1.8的结构一样,数组+链表/红黑二叉树。Hashtable 和 JDK1.8 之前的 HashMap 的底层数据结构类似都是采用 数组+链表 的形式,数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的;

  2. 实现线程安全的方式(重要):

    • 在JDK1.7的时候,ConcurrentHashMap(分段锁) 对整个桶数组进行了分割分段(Segment),每一把锁只锁容器其中一部分数据,多线程访问容器里不同数据段的数据,就不会存在锁竞争,提高并发访问率。(默认分配16个Segment,比Hashtable效率提高16倍。) 到了 JDK1.8 的时候已经摒弃了Segment的概念,而是直接用 Node 数组+链表+红黑树的数据结构来实现,并发控制使用 synchronized 和 CAS 来操作。(JDK1.6以后 对 synchronized锁做了很多优化) 整个看起来就像是优化过且线程安全的 HashMap,虽然在JDK1.8中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本;

    • Hashtable(同一把锁) :使用 synchronized 来保证线程安全,效率非常低下。当一个线程访问同步方法时,其他线程也访问同步方法,可能会进入阻塞或轮询状态,如使用 put 添加元素,另一个线程不能使用 put 添加元素,也不能使用 get,竞争会越来越激烈效率越低。

CopyOnWriteArrayList

CopyOnWriteArrayList 是 ArrayList 的线程安全的实现,所有的可变操作(add, set等等)都会通过拷贝一份新的数据来完成。这通常代价巨大,但对于那些遍历操作(读取)显著多余修改操作(写)的情况,这种方式可以很好地避免频繁加锁带来的性能损耗。

  • java.util.concurrent.CopyOnWriteArrayList
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * ArrayList的线程安全的实现,所有的可变操作(add, set等等)都会通过拷贝一份新的数据来完成。
 * 这通常代价巨大,但对于那些遍历操作(读取)显著多余修改操作(写)的情况,这种方式可以很好地避免频繁加锁带来的性能损耗。
 */
public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    
    //...
    
    public E get(int index) {
        return elementAt(getArray(), index);
    }

    public E set(int index, E element) {
        synchronized (lock) {
            Object[] elements = getArray();
            E oldValue = elementAt(elements, index);

            if (oldValue != element) {
                int len = elements.length;
                Object[] newElements = Arrays.copyOf(elements, len);
                newElements[index] = element;
                setArray(newElements);
            } else {
                // Not quite a no-op; ensures volatile write semantics
                setArray(elements);
            }
            return oldValue;
        }
    }
    
    /**
     * 如果数组的长度比较大,每次添加元素都会执行copy操作,将数据复制一份,这对性能将会
     * 造成很大影响,因此,CopyOnWriteArrayList不适合频繁修改的操作场景。
     */
    public boolean add(E e) {
        synchronized (lock) {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        }
    }
}

线程组与线程池

img

Executor框架

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads);
public static ExecutorService newSingleThreadExecutor();
public static ExecutorService newCachedThreadPool();
public static ScheduledExecutorService newSingleThreadScheduledExecutor();
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);

以上工厂方法分别返回具有不同工作特性的线程池。这些线程池工厂方法的具体说明如下。

  • **newFixedThreadPool()方法:**该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。

  • **newSingleThreadExecutor()方法:**该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。

  • **newCachedThreadPool()方法:**该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

  • **newSingleThreadScheduledExecutor()方法:**该方法返回一个ScheduledExecutor Service对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行,或者周期性执行某个任务。

  • **newScheduledThreadPool()方法:**该方法也返回一个ScheduledExecutorService对象,但该线程池可以指定线程数量。

ThreadPoolExecutor

对于核心的几个线程池,无论是newFixedThreadPool()方法、newSingleThreadExecutor()还是newCachedThreadPool()方法,虽然看起来创建的线程有着完全不同的功能特点,但其内部实现均使用了ThreadPoolExecutor实现。

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)                         

函数的参数含义如下。

  • corePoolSize:指定了线程池中的线程数量。
  • maximumPoolSize:指定了线程池中的最大线程数量。
  • keepAliveTime:当线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间。即,超过corePoolSize的空闲线程,在多长时间内,会被销毁。
  • unit:keepAliveTime的单位。
  • workQueue:任务队列,被提交但尚未被执行的任务。
  • threadFactory:线程工厂,用于创建线程,一般用默认的即可。
  • handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。

以上参数中,大部分都很简单,只有workQueue和handler需要进行详细说明。

image-20210808165449584

对于饱和策略,Java默认提供了4中策略:Abort、CallerRuns、DiscardOldes、Discard。

image-20210808165808982

  • Abort:默认,队列满了丢任务抛出异常。
  • CallerRuns:如果添加到线程池失败,那么主线程自己去执行该任务。
  • DiscardOldes:将最早进入队列的任务删除,之后再尝试加入队列。
  • Discard:队列满了丢任务不抛出异常。

锁优化

减小锁持有时间

尽量减少锁持有的时间,例如在一个方法中,只有少部分地方会涉及到资源竞争,那就尽量少地锁定,而不是简单地将整个方法加上synchronized关键字。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 减少锁持有的时间
public synchronized void syncMethod(){
    othercode1();
    mutextMethod();
    othercode2();
}

public void syncMethod2(){
    othercode1();
    synchronized(this){
        mutextMethod();
    }
    othercode2();
}

减小锁粒度

减小锁粒度也是一种削弱多线程锁竞争的有效手段。这种技术典型的使用场景就是ConcurrentHashMap类的实现。

对于HashMap来说,最重要的两个方法就是get()和put()。一种最自然的想法就是对整个HashMap加锁,必然可以得到一个线程安全的对象。但是这样做,我们就认为加锁粒度太大。对于ConcurrentHashMap,它内部进一步细分了若干个小的HashMap,称之为段(SEGMENT)。默认情况下,一个ConcurrentHashMap被进一步细分为16个段。

这种思想被称为分段加锁的思想,例如数据库的行锁粒度就比表锁的粒度小,正确处理好行锁所带来的并发量肯定比简单锁住整张表要大得多。

读写分离锁来替换独占锁

对于读多写少的情况,使用读写锁来替换独占锁可以取得很好的性能提升,例如Java中的ReadWriteLock。

  • java.util.concurrent.locks.ReadWriteLock
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     */
    Lock writeLock();
}

ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁。Java并发库中ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性。

下面是一个使用ReentrentReadWriteLock的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class ReadWriteLockTest {

    /**
     * 可重入的读写锁
     */
    private static ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

    private static String message = "hello liwenbo!";

    private static class ReadThread extends Thread {
        @Override
        public void run() {
            rwLock.readLock().lock();
            try {
                System.out.println(this.getName() + " say " + message + " at " + nowSec());
                sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            rwLock.readLock().unlock();
        }
    }

    private static class WriteThread extends Thread {
        @Override
        public void run() {
            rwLock.writeLock().lock();
            try {
                message = "hello hanzhu!";
                System.out.println(this.getName() + " say " + message + " at " + nowSec());
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            rwLock.writeLock().unlock();
        }
    }

    private static long nowSec() {
        return System.currentTimeMillis() / 1000;
    }

    public static void main(String[] args) throws InterruptedException {

        ReadThread t0 = new ReadThread();
        ReadThread t1 = new ReadThread();

        t0.start();
        t1.start();

        t0.join();
        t1.join();

        System.out.println("t0 & t1 done at " + nowSec());

        ReadThread t2 = new ReadThread();
        WriteThread t3 = new WriteThread();
        t2.start();
        Thread.sleep(1000);
        t3.start();

        t2.join();
        t3.join();

        System.out.println("t2 & t3 done at " + nowSec());
    }
}

// 示例输出:
Thread-0 say hello liwenbo! at 1630575690
Thread-1 say hello liwenbo! at 1630575690
t0 & t1 done at 1630575693
Thread-2 say hello liwenbo! at 1630575693
Thread-3 say hello hanzhu! at 1630575696
t2 & t3 done at 1630575697

根据输出的结果可以很明显的看出,readlock不会阻塞readlock,readlock会阻塞writelock。

锁粗化

虽然前面讲过要减小锁持有的时间,但有的时候也需要根据实际情况合理地使用锁,例如下面的情况

1
2
3
4
5
6
7
8
9
public void demoMethod(){
    synchronized(lock){
        //do sth.
    }
    //做其他不需要的同步的工作,但能很快执行完毕
    synchronized(lock){
        //do sth.
    }
}

像这样的情况,完全没有必要请求两次锁。在开发过程中,也应该有意识地在合理的场合进行锁的粗化,尤其当在循环内请求锁时。以下是一个循环内请求锁的例子,在这种情况下,意味着每次循环都有申请锁和释放锁的操作。但在这种情况下,显然是没有必要的。

1
2
3
4
5
for(int i=0;i<CIRCLE;i++){
    synchronized(lock){
		// do something
    }
}

一种更加合理的做法应该是在外层只请求一次锁:

1
2
3
4
5
synchronized(lock){
    for(int i=0;i<CIRCLE;i++){
		// do something
    }
}

JVM的锁优化

锁偏向

锁偏向是一种针对加锁操作的优化手段。它的核心思想是:如果一个线程获得了锁,那么锁就进入偏向模式。当这个线程再次请求锁时,无须再做任何同步操作。这样就节省了大量有关锁申请的操作,从而提高了程序性能。因此,对于几乎没有锁竞争的场合,偏向锁有比较好的优化效果,因为连续多次极有可能是同一个线程请求相同的锁。而对于锁竞争比较激烈的场合,其效果不佳。因为在竞争激烈的场合,最有可能的情况是每次都是不同的线程来请求相同的锁。这样偏向模式会失效,因此还不如不启用偏向锁。使用Java虚拟机参数-XX:+UseBiasedLocking可以开启偏向锁。

轻量级锁

如果偏向锁失败,虚拟机并不会立即挂起线程。它还会使用一种称为轻量级锁的优化手段。轻量级锁的操作也很轻便,它只是简单地将对象头部作为指针,指向持有锁的线程堆栈的内部,来判断一个线程是否持有对象锁。如果线程获得轻量级锁成功,则可以顺利进入临界区。如果轻量级锁加锁失败,则表示其他线程抢先争夺到了锁,那么当前线程的锁请求就会膨胀为重量级锁。

自旋锁

锁膨胀后,虚拟机为了避免线程真实地在操作系统层面挂起,虚拟机还会在做最后的努力——自旋锁。由于当前线程暂时无法获得锁,但是什么时候可以获得锁是一个未知数。也许在几个CPU时钟周期后,就可以得到锁。如果这样,简单粗暴地挂起线程可能是一种得不偿失的操作。因此,系统会进行一次赌注:它会假设在不久的将来,线程可以得到这把锁。因此,虚拟机会让当前线程做几个空循环(这也是自旋的含义),在经过若干次循环后,如果可以得到锁,那么就顺利进入临界区。如果还不能获得锁,才会真实地将线程在操作系统层面挂起。

锁销除

锁消除是一种更彻底的锁优化。Java虚拟机在JIT编译时,通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁。通过锁消除,可以节省毫无意义的请求锁时间。