你得先从操作系统原理中的进程中得到启示,然后类比到多线程环境,最后延申到分布式的情况。本文会详细介绍锁,主要是Java中对锁的实现,以及锁的各种优化。本文是JUC的朴素解释的基础。

故事如果从早古时代的单进程系统开始讲起的话那就会又臭又长。总结起来就是由于硬件的升级,资源的利用率不太高,于是发明了支持多进程多线程操作系统。
是的,多进程多线程都是为了使计算机一直处于“忙”这个状态而存在的,如果你有一个无止境的有效计算任务在一个单cpu的计算机上跑着,那你不需要这些优化。但是理论上不存在这样的任务,人们对现代计算机的处理速度实际上是缺乏认知的,记得我刚出来工作的时候有个面试官出了这么一个题:typescript写颗二叉搜索树,要求10w节点的插入在10s内。当时我的垃圾笔记本电脑写个基本的搜索树结果出来只有20ms,那时候我意识到如果是纯粹的计算与内存操作实际上是非常非常快的,不然也不至于与要求题目相差如此巨大。拖慢系统的是一些其他原因,并且大多数原因都会归结到人的身上。

一、多进程交互与冲突

假如所有的进程都只是执行自己的任务,不会和其他的进程发生交互,那自然是最优状态,但是实际情况是通常一个进程通常需要与其他进程合作才能完成一些工作(或者是进程A的工作受到进程B的制约,或者是进程A与进程B存在竞争关系)。而协调这些关系必然需要沟通,沟通必然需要遵守同一套约定。于是问题就变成了如何制定一套这样的规范来协调多进程之间的交互与冲突。我们很容易想到何不让他们之间互相发送消息表示自己正在等待某个条件或者自己已经把某个条件变成的true,这样收到消息的双方就很容易协调先后顺序。于是就引出了三个术语:wait():等待函数,表示在等待某个条件(信号)、signal():释放函数,表示释放了某个条件(信号;意思是将一个条件变为了true)、Semaphore:信号量,是对一个条件(亦可以是资源)的抽象,wait()与signal()都属于这个抽象条件的行为。但是实际情况不可能只是进程A需要进程B完成一些工作,也可能是进程A与进程B同时需要第三个互斥资源(其实这种情况可以将第三个互斥资源抽象为进程C,这样模型就可以简化为进程A与B都怕需要进程C完成一些工作)。于是我们得到了这样一个基本模型:

  • 情况1

    // 初始信号量为0
    进程A Semaphore.wait()等待(消费)一个信号好继续往下执行。
    进程B Semaphore.signal()释放一信号(并且通知所有等待进程)
    进程A 收到信号,从wait()方法返回继续向下执行。
  • 情况2

    // 初始信号量为N , N>0 ,这里是1
    进程A Semaphore.wait()等待(消费)一个信号好继续往下执行。| 进程B Semaphore.wait()等待(消费)一个信号好继续往下执行。
    进程A Semaphore.wait()等待(消费)一个信号好继续往下执行。| 进程B 抢先获得这个信号,然后进程B向下执行。
    进程A 收到信号,从wait()方法返回继续向下执行。            | 进程B Semaphore.signal()释放一个信号,通常是表示资源已经使用完。

    情况1的Semaphore称为Private Semaphore(私有信号量),情况2的Semaphore称为共有信号量。你可以想象到如果情况2,Semaphore所代表的是可以有两个进程同时使用的资源时,那么进程A和进程B的wait()都可以获得一个信号,这样A与B会同时往下执行。
    是的,实际情况的实现也是这样,Semaphore并非代表一个独占资源;准确点说,它代表的是一个临界资源,而从wait到signal这一段代码称为临界区。在Linux中实现(sem.c)是这样定义的

    /* 信号量的结构体 */
    struct sem {
      int semval;        /* 当前信号量值 */
      /*
       * 最后一个修改此信号量的进程pid,在Linux系统中他们可能是通过以下几个操作实现修改的:
       *  - semop PV操作
       *  - semctl 直接控制信号量 SETVAL 设置一个信号量值 and SETALL设置一个信号量集的信号量值.
       *  - exit_sem undo操作时归还一个信号量.
           *  - 另外还有一个获得信号量的操作semget,获得后的信号量在第一次使用前应该用semctl设置初始值。
       */
      struct pid *sempid;
      spinlock_t    lock;    /* 细粒度自旋锁*/
      struct list_head pending_alter; /* 改变信号量的待定单次操作(其实我看不懂什么意思) */
      struct list_head pending_const; /* 不改变信号量的待定单次操作(其实我看不懂什么意思) */
      time64_t     sem_otime;    /* 上次改变的时间(其实我看不懂) */
    } ____cacheline_aligned_in_smp;

    代码中semctl的SETALL操作是对一个信号量集(在sem.c定义有sem_array和sem_queue)统一设置值,这个信号量集代表着一组临界资源,代表着一组不可分割的操作。这和事务是非常像的,可以类比过去。另外可以看到这里是使用一个int来表示信号量值的。semop操作与semctl的不同在于semop是一个原子的可阻塞操作,而semctl相当于外挂直接改变信号量。

sem.c在linux定义为进程通信的一种方式,而进程通信其实隐含这进程同步关系,进程互斥则可以通过转换视角转换为进程同步。

  • 进程互斥
    上文中情况2这样的同时等待一个资源的进程A与进程B的进程关系叫做进程互斥。正如上文提到的,假如将共同等待资源看作是进程C,则进程A与进程C的关系就是进程同步(比如进程A和进程B同时需要打印机资源C则他们共同依赖的打印机驱动程序则可看作是进程C)。
  • 进程同步
    上文中情况1这样的进程A与进程B需要合作(按照一定次序)完成工作的程序关系称叫做进程同步。这是一种合作关系,如我其他文章所说,合作最大的的问题是沟通,而沟通最大问题是信任,在计算机科学中为了达到信任条件我们需要建立一系列的约定(协议)。
  • 进程通信
    进程通信隐含同步的意思,应该说进程通信的目的就是为了达到多进程的高效合作这一目标。Linux上有多种通信方式,上文中的信号量、信号量集就是其中一种。它们的介绍如下:
    1.共享内存(shared memory):总结来说就是允许多个进程访问同一块内存,通过对这块内存的读写达到通信的目的,你可以想象到你得自己协调各个不同进程对这块内存使用这样才不会出错。ipc/shm.c是Linux对共享内存的实现。
    2.管道(pipe):它有读端和写端之分,信息只会从读端流向写端,也就是说是单向传递的。有匿名管道与命名管道之分。匿名管道用于父子进程进行通信;命名管道用于任意两个进程进行通信,它与某个file相互绑定。你可以想象到这个命名管道之所以可以在两个不相关的进程之间进行通信正是因为它与某个file相互绑定,毕竟一个无法被定位找到的东西是无法被共享的,前文的提到的shm.c中共享内存的实现也是与某个file挂钩的。另外管道流这种思想很常见,无论是Linux | 符还是MongoDB的聚合管道亦或是Java中的stream都是同一个思想,前一个操作的输出是后一个操作的输入,同时前一个操作的中间数据不会保留给后一个操作。管道是不用用户自己协调多进程的顺序的。
    3.socket:你对这个一定很熟悉,总得来说就是开启一个套接字,在我认为这是一种最完美的多进程通信方式,不仅可以跨进程还能跨系统跨主机。你依然这种方式你依然需要自己协调进程关系,尽管服务端是被动接受连接后才能通信,但是连接成功后是可以相互发数据的。你一定听过在Linux中everything is file,socket连接也是可以表示为文件比如docker会在在启动服务后创建/var/run/docker.sock文件,我们实际上可以通过这个问题与docker服务通信,nginx和mysql在服务启动后也有类似的sock文件。
    4.信号(signal):实际是一种中断(后面会继续涉及这个问题,Java中的取消操作是通过响应中断实现的)方式,kill -l可以查看信号列表,比如kill -15 pid(后台程序使用)和ctrl+c(前台程序使用)都是发送一个取消操作的信号,应用程序通过注册对这些信号的响应来到达通信效果。
    5.信号量(Semaphore):是一种同步工具,前文已经阐述过了,这里不再赘述。上文中的sem.c是linux对信号量的实现,只复制了一个结构体。注意,生成信号量的key(semget函数有个参数)的时候依然和前面的命名管道等等共享操作一样需要提供一个与file绑定参数。
    6.消息队列(message queue):和我们熟悉的消息队列同一个用处,它有一个好处是它的数据是有结构的。具体我个人也不太了解,使用步骤上都是信号量差不多。

总得来说,除了信号,本质上都是对一个共享数据进行访问,通过对这个数据的读写向外界释放出自己状态的信息,然后外界(也就是其他合作进程),通过对这个数据变化做出响应达到协调效果。这点非常重要,这个思想是锁的基石。

对该数据变化的响应会体现在进程的状态变化上(三态模型)。下面是五态模型,可以这么说,进程线程都是遵守这个模型的状态流转过程的。

进程状态

新建态:初始化完成但是未就绪还不能处理任务。(对应线程就是new出来还未start)
就绪态:已经准备好,等待时间片分配。(对应线程就是start还未被分配cpu,被notify()也会进入就绪态)
运行态:正在执行。(对应线程就是已经在跑run()了)
阻塞态:某个必须条件为false,等待条件变为true上文Semaphore.wait()无法获取信号量时就进入了阻塞态。(对应线程就是object.wait(),Thread.sleep()或者或者自旋也算因为它向下执行)
终止态:进程到达了自然结束点,或是出现了无法克服的错误,或是被操作系统所终结,或是被其他有终止权的进程所终结(来自百度百科)。

一般程序会在就绪态,运行态和阻塞态待很久,并且在这三个状态相互切换,我们的主要精力也是在这上面,我们最终的目的是尽量让程序在运行态做正确的任务。

二、锁

前文的进程同步思想和进程模型与我们接下来的多线程环境是类似的,这也是上面扯这么多关于进程的原因。现在回到锁,所谓锁,就是一种同步机制/同步方式。实现同步的方式有很多,现代锁的实现是基于monitors(管程)理论实现的,你可以查看这篇文章了解关于monitors与条件变量的相关研究。

监控器可以被认为是一个概念性的盒子。程序员可以把函数/程序/方法放到这个盒子里,监控器为他提供一个非常简单的保证:每次只有一个函数在监控器内执行--相互排斥将得到保证。

总得来说monitors是一种程序结构(一种模式),它能够保证这个结构内多个线程访问共享资源是互斥的。它定义了互斥资源(抽象)的访问方式以及资源不可用时的阻塞策略以及资源变为可用时的唤醒策略。于是我们可以粗略的抽象出这个monitors结构:

//注:这里的List也可以是queue也可以是set,总之是一种线性存储结构用来存放多个线程的
waitList //不满足条件时的等待队列
entryList //等待运行的就绪队列
signalList // 信号队列
mutex //互斥锁,代表资源是否可用
run{a,b,c} //代表你写函数、代码、逻辑。a/b/c代表你期望互斥访问的共享资源

enter monitors
{
    thread enter entryList // 进入该结构的线程首先会进入就绪队列
    if(!thread lock(mutex)){ // 占用资源
        thread enter waitList and thread exit entryList // 资源不可用时进入等待队列且退出就绪队列
    }
    thread exit entryList // 退出就绪队列
    thread do run{a,b,c} // 如果资源可用则执行业务逻辑
    thread enter signalList // 执行完成如果你释放了某些互斥资源则唤醒wait队列里面的线程使他们进入entry队列
    thread unlock(mutex) // 释放资源
    thread exit signalList // 退出signal队列,
}
exit monitors

这是个粗略的样子,还有很多细节上的东西还未解决其中唤醒和等待就是一重要的细节。你可以想象得到当线程A释放了一个资源的时候应该唤醒在这个资源(条件)上等待的队列,使它们进入就绪态及时获得资源并向下执行逻辑。但是线程A是应该先执行完后续逻辑再唤醒还是唤醒后再执行完呢?围绕这个问题有三种语义:
Mesa语义
过程:线程A进入monitor ---> 线程A发现不满足执行条件进入等待队列(这里会释放锁) ---> 线程B进入monitor --> 线程B释放X,线程A执行条件变化,唤醒线程A(也就是由等待队列移入就绪队列) --> 线程B执行完毕释放锁(mutex) ---> 线程A进入monitor --> 线程A执行完毕释放锁。
monitor_mesa.jpg

这种语义的问题是,线程A所等待的条件X可能在线程B执行完后就再次为false了(你不知道线程B还需要执行多久),导致线程A再次检查X时发现X不可用而再次进入等待队列,浪费时间。

Hoare语义
过程:线程A进入monitor ---> 线程A发现不满足执行条件进入等待队列,这里会释放锁mutex ---> 线程B进入monitor --> 线程B释放资源,线程A执行条件X变化,线程B进入信号队列并释放锁 ---> 线程A直接由等待队列进入monitor --> 线程A执行完毕释放锁 --> 线程B从信号队列继续执行完毕,释放锁。
monitor_hoare.jpg

这个语义的问题在于过早释放的锁,这可能会导致线程A如果将线程B接下来的条件Y变为不可用。

Brinch Hanson语义
过程:线程A进入monitor ---> 线程A发现不满足执行条件进入等待队列,这里会释放锁mutex ---> 线程B进入monitor --> 线程B释放资源,线程B执行完毕释放锁 --> 线程B唤醒线程A(就是由等待队列移入就绪队列) ---> 线程A执行完毕并释放锁。
monitor_bh.jpg

这种语义的问题在于,理论上线程B已经退出了monitor,也就是说它已经不在享有占用那些由这个monitor保护的资源X的权限了,也就是说它这个唤醒操作理论上是不合法的。退一步说,已经退出monitor的线程需要释放的信号所依据的条件X的状态也是不可靠的(这一点在后文讲原子性的时候会再提到)。

前文中提到的阻塞(wait)与唤醒(signal)策略严格来说不属于monitors负责的范围,因为Monitor原则上只负责多线程执行一段代码时互斥。这就引出了条件变量(condition variables)的概念:

一个条件变量就是一个线程队列(queue), 其中的线程正等待某个条件变为真。每个条件变量c关联着一个断言P(c). 当一个线程等待一个条件变量,该线程不算作占用了该管程,因而其它线程可以进入该管程执行,改变管程的状态,通知条件变量c其关联的断言P(c)在当前状态下为真.

从上面的描述中可以大概推出一个条件变量的结构:

waitList // 正在等待同一个条件的等待队列

wait() // 当前线程进入等待队列释放锁
signal() // 条件可用,唤醒一个线程向下执行
broadcast() //条件可用,唤醒所有等待队列的线程

上文中如果线程A与线程B分别在不同的条件变量上等待,则可以将它们分开,而不是使用同一个monitor,不同独占资源使用不同的条件变量可以减少竞争提高资源利用率。

注:Linux里面锁的实现,里面包括了mutex(互斥锁),spinlock(自旋锁)和semaphore(信号量)等等一系列同步机制的实现。你需要理解的是这里的锁是给线程使用的而非给进程,这也是为什么上面进程通信里有信号量,这里又有信号量。另外前文中也阐述了通信隐含同步。

三、synchronized、volatile、CAS

很多人经过对monitor的学习与理解后后很容易理解同步的原子性,但是却会忽略可见性这个重要特性。接下来主要说明Java同步相关的东西,我这里假设你是Java老司机。

使用Java语言你很容易使用synchronized关键字来做到多线程同步,也很容易通过Thread类来创建线程:

public class TestMain {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        new Thread(()->{
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            setKeyValue(2,"first");
        }).start();
        new Thread(()->{
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            setKeyValue(3,"second");
        }).start();

    }
    private static int value = 1;
    private static String key = "2";
    private static byte[] b = new byte[0];
    private static void setKeyValue(int value,String key){
        System.out.println(value+key);
        synchronized (b){
            value = value;
            key = key;
        }
        System.out.println(value+key);
    }
}

其中最关键的synchronized同步块,同步块里面的数据由b对象保护,它被编译它会被编译成monitorenter和monitorexit指令,如下:

   L5
    // 源码第33行
    LINENUMBER 33 L5
    //获取b
    GETSTATIC com/bigbrootherlee/demo/TestMain.b : [B
    // 压两次,因为下面要操作两次
    DUP
    // b存入局部变量2 (这里其实是将栈顶数据存入局部变量2)
    ASTORE 2
    // 对b进行MONITORENTER指令操作(这里其实是使用栈顶数据进行MONITORENTER,上面DUP压了两次栈,所以这里还是b)
    MONITORENTER
   L0
    LINENUMBER 34 L0
    ILOAD 0
    ISTORE 0
   L6
    LINENUMBER 35 L6
    ALOAD 1
    ASTORE 1
   L7
    LINENUMBER 36 L7
    // 载入局部变量2到栈顶,这里是b
    ALOAD 2
    // 对栈顶数据进行monitorexit
    MONITOREXIT
   L1
    GOTO L8
   L2
   FRAME FULL [I java/lang/String java/lang/Object] [java/lang/Throwable]
    ASTORE 3
    ALOAD 2
    MONITOREXIT
   L3
    ALOAD 3
    ATHROW

需要注意的是上面代码使用对b进行monitorenter操作来描述而不是对b进行加锁来描述,这是有原因的,在上文有提到我们所说的锁是基于monitor(管程)理论实现的,虽然这里的monitorenter和monitorexit组合已经符合我们管程模型了,但是为了下文中描述锁升级过程描述无锁、偏向锁、轻量锁、重量锁你不会理解混乱,这里还是使用monitorenter操作来描述,毕竟这一个指令包含多种情况。
在此,我们先放下Java自带锁,先探讨一下Java线程:
在jvm定义有以下几类线程:

// Class hierarchy
// - Thread
//   - JavaThread
//     - various subclasses eg CompilerThread, ServiceThread
//   - NonJavaThread
//     - NamedThread
//       - VMThread
//       - ConcurrentGCThread
//       - WorkerThread
//     - WatcherThread
//     - JfrThreadSampler
//     - LogAsyncWriter

大致可分为JavaThread与nonJavaThread,他们是使用is_Java_thread()方法返回值是true还是false来区分的,源码定义,语义上来说给Java开发者能够创建的就是JavaThread,当然,我们Java中的Thread绑定的不是JavaThread而是下面同名的Thread.c (模块化让他变成了这样),这个类的头文件java_lang_Thread.h我找了半天没找到,谷歌的Android的git仓库倒是有一个java_lang_Thread.h。Thread.c如下:

#include "jni.h"
#include "jvm.h"

#include "java_lang_Thread.h"

#define THD "Ljava/lang/Thread;"
#define OBJ "Ljava/lang/Object;"
#define STE "Ljava/lang/StackTraceElement;"
#define STR "Ljava/lang/String;"

#define ARRAY_LENGTH(a) (sizeof(a)/sizeof(a[0]))
// 绑定对应的方法,实现在jvm.cpp https://github.com/openjdk/jdk/blob/master/src/hotspot/share/prims/jvm.cpp

static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};

#undef THD
#undef OBJ
#undef STE
#undef STR

JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
    (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

JNIEXPORT void JNICALL
Java_java_lang_Thread_clearInterruptEvent(JNIEnv *env, jclass cls)
{
#if defined(_WIN32)
    // Need to reset the interrupt event used by Process.waitFor
    ResetEvent((HANDLE) JVM_GetThreadInterruptEvent());
#endif
}

具体创建过程可以参考这篇文章,值得注意的是,JavaThread是在java的Thread对象调用start()后才会构造创建的。以下是JavaThread的部分创建过程:

JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) : JavaThread() {
  _jni_attach_state = _not_attaching_via_jni;
  set_entry_point(entry_point);
  // Create the native thread itself.
  // %note runtime_23
  os::ThreadType thr_type = os::java_thread;
  thr_type = entry_point == &CompilerThread::thread_entry ? os::compiler_thread :
                                                            os::java_thread;
  // 这里会创建操作系统线程与当前JavaThread绑定
  os::create_thread(this, thr_type, stack_sz);
}

同时Java定义了一个标准流程:

// 线程执行流程与动作(行为):
// 对于所有类型的线程:
//  - thread_native_entry  // 每个操作系统的本地入口
//    - stack initialization //堆栈初始化
//    - other OS-level initialization (signal masks etc) //其他操作系统层面的初始化(Linux信号掩码等等)
//    - handshake with creating thread (if not started suspended)//与创建线程握手(如果没有开始中止)
//    - this->call_run()  // 通用的共享入口
//      - shared common initialization // 通用初始化
//      - this->pre_run()  // 各类线程的初始化,虚函数
//      - this->run()      // 各类线程的主逻辑,虚函数
//      - shared common tear-down // 通用回收
//      - this->post_run()  // 各类线程的回收逻辑,虚函数
//      - // 'this' no longer referenceable // this不再是可引用的
//    - OS-level tear-down (minimal) //操作系统级别的拆解(最小)
//    - final logging // 记录日志
//
// 对于JavaThread类型的线程:
//   - this->run()  // 实现Thread定义的虚函数,通常JavaThread的子类不会不会被覆盖JavaThread的逻辑
//     - this->thread_main_inner()  // 额外的调用级别,以确保正确的堆栈计算
//       - this->entry_point()  // 对每一种JavaThread的设置都不同,这是我们的业务逻辑,是对Runnable的封装在底层抽象为ThreadFunction对象(c++)

对于Thread.java定义有以下几种状态,基本上与进程状态差不多:

    public enum State {
        /**
         * new之后还未start. ( 也就是还未调用start()方法 )
         */
        NEW,

        /**
         * 可执行状态,不一定已经在执行中,可能正在等待时间片。但是一旦获得时间片就可以执行。
         * 它其实包括就绪态与执行态两种状态(ready、running)。从running到ready使用yield()方法切换。
         */
        RUNNABLE,

        /**
         * 指的是未获取到锁(monitor)或者调用wait()方法后返回需要重新获取锁的状态
         */
        BLOCKED,

        /**
         * 调用以下方法进入等待状态:Object.wait()、Thread.join()、LockSupport.park();
         * 与阻塞态不同的是:阻塞态等待的是一个临界资源/锁,而等待态等待的是其他线程执行一个特殊的动作/行为(action)
         * 比如:wait()是等待其他线程调用notify()或者调用notifyAll();join()是等待其他线程进入终止态;park()等待其他线程调用unpark();
         */
        WAITING,

        /**
         * 超时等待与普通的等待状态不同的是,该等待状态多了一个等待的东西 —— 时间。而超时等待特指等待时间的状态
         * 调用以下方法进入超时等待状态:Thread.sleep() 、 Object.wait(long) 、Thread.join(long) 、
         *  LockSupport.parkNanos() 、  LockSupport.parkUntil()
         */
        TIMED_WAITING,

        /**
         * 线程执行完则进入终止态
         */
        TERMINATED;
    }

关于线程到这里浅尝辄止,现在回到Java内置锁。
在我们广泛使用的jdk1.8,JVM加锁不再是早古时代的直接底层锁住Object关联的ObjectMonitor对象,而是有一个锁升级过程(不能降级)。这个过程伴随着java对象头的数据变化,Java对象头解析可以参考这文章。一个对象的内存占用大概如下:

object

值得注意的是数组类型的对象与普通对象的的内存占用模型不是完全一样的,并且这是个抽象的模型,并不代表它在真实物理内存分布上长这个模样。在本文中主要关注的是Mark Work(对象头,object header),它在jdk中是一个C++类,源码在这里。它内部使用一个uintptr_t类型(其实就是个无符号的长整型,unsigned long int)的属性value存储位信息,并且在64位机器上面他是64位的而在32位机器上面是32位的。通过对象的这个结构你可以自行计算出一个对象所占用的内存是多大。markword大致位信息含义如下(随着锁升级过程而不断变化):

mark word

注:在一个对象初始化完成后调用Object::hashCode()或者调用System.identityHashCode(object)会获取到该对象的哈希码存到该markword的对应位置,这意味着获取了哈希码的不能变为偏向锁,因为偏向锁mark word存储hash的位被thread id和epoch占用了,一个object的hash是保持不变的,而偏向锁会丢失hash。一个原本带有偏向锁标记的对象在计算hash后会直接膨胀为重量级锁。

偏向锁:偏向锁是为了优化无竞争状态下轻量级锁的复制mark word、cas、自旋步骤,这是一种这样的机制,假如在某个时间段内,某对象O的mark word是线程A,且在这个时间段内一直无其他线程竞争,则该线程A在访问O保护的同步资源时不需要做其他同步操作。这比轻量锁少了复制mark word,自旋等等一些步骤,在无竞争状态下节省了时间和空间。
轻量级锁:轻量级锁为了优化重量级锁的内核态与用户态切换问题,他是这样一种机制,在O存在竞争的情况下,线程A先访问获取锁:在当前线程A栈帧中开辟一个称为锁记录(Lock Record)的空间,将mark word复制到该空间(称为Displaced Mark Word),将该空间的指针存到(CAS)原来O的Mark Word的对应位置(上图有描述),三步完成则获取轻量级锁完成。线程B竞争资源:如果已经被其他锁占用,则自旋等待线程A释放锁,如果两个线程同时复制到Mark Word到自己的锁记录空间,在cas将指针存入O的mark word过程中被A抢占,则等待A解锁后重新执行上述步骤。自适应自旋 + CAS 保证活跃与互斥。如果有线程C竞争O则会升级为重量级锁。
重量级锁:重量级锁与objectMonitor对象相关联,这个类数据结构大致如下:

class ObjectMonitor : public CHeapObj<mtObjectMonitor> {
  void* volatile _owner;            // 当前monitor拥有者
  volatile markWord _header;        // displaced mark word
  volatile intx _recursions;        // 重入次数
  ObjectWaiter* volatile _EntryList;  // 就绪队列
 protected:
  ObjectWaiter* volatile _WaitSet;  // 等待队列
  volatile int  _waiters;           // 等待队列上的线程数
 private:
  volatile int _WaitSetLock;        // protects Wait Queue - simple spinlock
}

_owner存储了此时持有锁的线程;_header存储了monitor指针占用的数据,也就是说原本的hash,age等等并不会丢失,而是存储在了与Java对象O相关联c++的objectMonitor对象中;_recursions存储了该monitor被重入的次数;_EntryList是一个就绪队列,他们正在阻塞等待锁;_WaitSet是一个等待队列,它们正在等待锁和某些条件变量。重锁之所以被称为重是因为线程需要在内核态与用户态切换,且monitor本身与系统互斥量(mutex)相关联。
实际上在底层CAS与其他工具大量使用,下面代码是切换monitor owner的逻辑:

inline void* ObjectMonitor::try_set_owner_from(void* old_value, void* new_value) {
  // cas
  void* prev = Atomic::cmpxchg(&_owner, old_value, new_value);
  if (prev == old_value) {
    log_trace(monitorinflation, owner)("try_set_owner_from(): mid="
                                       INTPTR_FORMAT ", prev=" INTPTR_FORMAT
                                       ", new=" INTPTR_FORMAT, p2i(this),
                                       p2i(prev), p2i(new_value));
  }
  return prev;
}

这里贴出jvm同步相关联的类:objecMonitor.cpp(jvm对管程模型的抽象)basicLock.cppatomic.hpp(原子操作/cas的抽象)synchronizer.cpp(同步器;锁升级过程通过它操作)。还有mutex.cpp是对系统互斥量(mutex的封装)、mutexLocker.cpp包装了集中类型的锁,但是我没有细看在哪使用到了。

锁升级过程

上述Java内置锁各方面都挺好的,使用标准的编程模板就可以保证线程之间可以正确同步,并且jvm内部为了提高吞吐量也做了很多优化。内置锁有两个比较大的劣势,一是锁只能升级不能降级,二是它只有一个WaitSet这意味着等待不同条件变量的线程会互相竞争与干扰。在后文显式锁中会详细描述。

目前synchronized的讨论到此告一段落,JMM相关的内容后续会补充在Java内存模型与JVM内存模型的朴素解释中。现在我们再来讨论volatile。

可见性:前文中提到人们经常忽略可见性这个重要的特性,在Java中,synchronized、volatile和final三个关键字都能能保证可见性。
所谓可见性,就是一个线程修改了一个共享变量V,其他线程可以立即得知这个V被修改的最新值。不同的关键字实现可见性的原理是不一样的,synchronized关键字是因为jvm中lock和unlock指令规则:在对变量执行unlock之前必须先对变量执行store和write将此变量最新值同步到主内存,而对变量执行lock会清空变量的值,一个变量只能同时被一条线程lock。volatile关键字是因为jvm对volatile变量定义了特殊的规则:在使用volatile变量之前必须先从主内存获取最新值,在修改volatile变量时必须立即同步到主内存。而final关键字修饰的变量,在构造器没有将this引用传递出去的情况下,一旦初始化完成,那么其他线程可以看到final变量的值(这是基于final不可变这一语义的)。
在极其特殊的情况下,32位java虚拟机里面的long类型的数据不能保证原子性,long和double类型的普通变量在Java中没有规定原子性,这点在Java语言规范中有说明。这是因为64位的变量的读写的时候可能读写一半(前32位或者后32位)。同样volatile和final修饰的变量可以完全避免这个问题,但其实现在64位系统为主流的今天,已经没有这个烦恼了。

指令重排序:指令重排序其实是一个优化手段,但是在多线程环境下带来了一些意想不到的结果。
Java只定义了在当前线程上下文环境下,所有的执行结果是正确的,但是没有规定其中的执行过程。如下:

// ①
int i = 1;
// ②
int j = 2;
// ③
int x = i;

Java规定了在单线程环境下其执行结果应当是正确的,所以最终结果会是等于1,j等于2,x等于1。而只要满足正确性,jvm可以对代码进行合理的优化,也就是说,②可能在①③之间执行也可能在①③之后执行,也可能在①③之前执行,执行顺序和代码的编写顺序不是一致的。注意:这个例子是不严谨的,变量的初始化、赋值、链接不是一步操作,并没有规定原子性,同时源代码不能代表字节码,这个例子只是为了解释指令重排序这一现象。在Java中volatile与synchronized可以保证有序性。synchronized的实现原理是“一个变量在同一时刻只能有一条线程对其进行lock操作”这个规则衍生的,因为这个规则表示同一时刻只能由一个线程进入同步代码块(这就与我们上文提到的monitor理论契合了),volatile实现有序的原理是禁止指令重排序,底层是通过内存屏障指令实现的禁止重排序,不同处理器的内存屏障指令是不一样的,且有4种不同类型的内存屏障,内存屏障表示不能把屏障之后的指令重排序到屏障之前的位置或者或者不能将屏障之前的指令重排序到屏障之后或者完全禁止重排序。在《深入理解Java虚拟机》一书中提到有lock xxx 这样的带有lock前缀的内存屏障指令,它的作用是将本处理器的缓存写入到内存,同时使其他处理器的缓存失效(无效化),这样也保证了可见性。

注:内存屏障相关的详细内容可以参考这篇文章

volatile修饰的变量是Java提供的最轻的同步机制,它保证该变量的变更会及时同步给所有线程,同时通过禁止指令重排优化保证了volatile变量不会因为重排序优化而使指令有序性失效。这样,结合cas就可以保证对volatile进行原子修改。

CAS:是compare and swap的缩写,是一种需要硬件支持的技术(假如没有硬件支持还是走阻塞同步的话那就没多大意义了),在一般情况下,我们的变量测试操作与设置操作是两个不同的指令不同的操作,没有原子性,而cas将比较与设置值操作作为一步操作执行,属于一条指令,目前,不同的处理器架构都对这样的场景有支持。x86指令集中的cmpxchg指令就是这样的(源码在这里):

template<>
template<typename T>
inline T Atomic::PlatformCmpxchg<8>::operator()(T volatile* dest,
                                                T compare_value,
                                                T exchange_value,
                                                atomic_memory_order /* order */) const {
  STATIC_ASSERT(8 == sizeof(T));
  __asm__ __volatile__ ("lock cmpxchgq %1,(%3)"
                        : "=a" (exchange_value)
                        : "r" (exchange_value), "a" (compare_value), "r" (dest)
                        : "cc", "memory");
  return exchange_value;
}

而在Java中将该操作封装在了sun.misc.Unsafe类里面:

    @CallerSensitive
    public static Unsafe getUnsafe() {
        // 限制调用方的ClassLoader为Bootstrap ClassLoader
        Class<?> caller = Reflection.getCallerClass();
        if (!VM.isSystemDomainLoader(caller.getClassLoader()))
            throw new SecurityException("Unsafe");
        return theUnsafe;
    }
    @ForceInline
    public final boolean compareAndSwapObject(Object o, long offset,
                                              Object expected,
                                              Object x) {
        return theInternalUnsafe.compareAndSetReference(o, offset, expected, x);
    }
    @ForceInline
    public final boolean compareAndSwapInt(Object o, long offset,
                                           int expected,
                                           int x) {
        return theInternalUnsafe.compareAndSetInt(o, offset, expected, x);
    }
    @ForceInline
    public final boolean compareAndSwapLong(Object o, long offset,
                                            long expected,
                                            long x) {
        return theInternalUnsafe.compareAndSetLong(o, offset, expected, x);
    }
    public long objectFieldOffset(Class<?> c, String name) {
        if (c == null || name == null) {
            throw new NullPointerException();
        }

        return objectFieldOffset1(c, name);
    }
    @IntrinsicCandidate
    public final native boolean compareAndSetReference(Object o, long offset,
                                                       Object expected,
                                                       Object x);

对应的compareAndSetXXX方法有四个参数,修改的属性所属的对象,该属性的offset,期望原值,新值。其中offset可以通过unsafe的objectFieldOffset方法获得。注意,由于getUnsafe要求只有Java类库才能调用(如源码所示。通过限制调用方的ClassLoader为Bootstrap ClassLoader),所以你无法使用常规手段直接获取unsafe实例,你需要通过反射或者其他类库间接调用。在JDK 9之后开放了一个VarHandle类来支持用户的CAS操作。
CAS通常结合自旋保证原子更新,下面是AtomicInteger对CAS的典型使用:

    @IntrinsicCandidate
    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset);
        } while (!weakCompareAndSetInt(o, offset, v, v + delta));
        return v;
    }

显式锁
上面内置锁提到内置锁有线程内核态与用户态切换及共享条件变量的问题。同时内置锁必然是非公平锁。在JUC库中的显式锁解决了这些问题,JUC会在JUC的朴素解释中详细说明。
前文中提到volatile变量的有序性与可见性,再加上cas+自旋达到的互斥效果。基于这几个特性Doug Lea创造了JUC,一个高效的同步工具。它实现了锁、线程池、以及以及一些其他的同步工具,具体会在JUC的朴素解释中细说。和synchronized内置锁一样,在JUC的Lock与Condition接口也是为了实现管程以及条件变量。归根到底是为了实现上文提到的那种互斥的编程范式,就像synchronized可以保护代码块一样,Lock也可以保护lock与unlock方法之间的代码是互斥的:

public class TestMain4 {
    public static void main(String[] args) {
        TestMain4 testMain4 = new TestMain4();
        ReentrantLock lock = new ReentrantLock();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(20);
        CountDownLatch downLatch = new CountDownLatch(20);
        for(int i = 0;i<20;i++){
            new Thread(()->{
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                try {
                    if(lock.tryLock(30L, TimeUnit.SECONDS)){
                        testMain4.a();
                    }
                }catch (InterruptedException e){
                    System.out.println(e.getMessage());
                }finally {
                    lock.unlock();
                }
                downLatch.countDown();
            }).start();
        }
        try {
            downLatch.await();
        }catch (InterruptedException e){}
        System.out.println(testMain4.i);
    }
    private int i= 0;
    private void a(){
        int i1 = this.i;
        for (int j=0;j<10000;j++){
            ++this.i;
        }
    }
}

同样的,对于不同的条件变量,可以创建不同的condition对象来表示:

public class TestMain4 {
    public static void main(String[] args) {
        TestMain4 testMain4 = new TestMain4();
        ReentrantLock lock = new ReentrantLock();
        Condition condition0 = lock.newCondition();

        new Thread(()->{
            try {
                if(lock.tryLock()){
                    while (testMain4.i != 200000){
                        System.out.println(testMain4.i);
                        condition0.await();
                    }
                    System.out.println(testMain4.i);
                }
            }catch (Exception e){}finally {
                lock.unlock();
            }
        }).start();

        for(int i = 0;i<20;i++){
            new Thread(()->{
                try {
                    if(lock.tryLock(30L, TimeUnit.SECONDS)){
                        testMain4.a();
                        condition0.signal();
                    }
                }catch (Exception e){
                    System.out.println(e.getMessage());
                }finally {
                    lock.unlock();
                }
            }).start();
        }
    }
    private int i= 0;
    private void a(){
        int i1 = this.i;
        for (int j=0;j<10000;j++){
            ++this.i;
        }
    }
}

上文中提到三种条件变量语义:Mesa语义、Hoare语义与Brinch Hanson语义。Java中的锁都是Mesa语义变形而来,它们都遵循这样一种规则:条件变量(condition)的等待方法(await()/wait())与唤醒方法(notify()/signal())都是需要在持有锁的时候才允许被调用,等待方法调用后会释放锁,而唤醒方法在调用后也不应该做其他的逻辑而是应当尽快释放锁以免信号丢失。由于内置锁只有一个条件变量,且需要更高的切换成本所以内置锁应当严格遵守编程范式的规则。

ReentrantLock与内置锁一样是一个可重入锁,在线程已经持有锁的情况下进入同一个锁保护的代码块的时候可以直接重入。同时它实现了公平锁,默认是非公平锁,可以通过传入构造参数true或者false构造一个公平或者非公平锁。公平锁与非公平锁的区别在于,如果是非公平锁,线程在尝试加锁时,锁是空闲的,那么该线程会直接尝试占有锁(插队),而如果是公平锁,线程会先进入队列排队。绝大多数时候我们都是使用非公平锁。ReentrantReadWriteLock实现了读写锁,读锁是共享锁,写锁是互斥锁。

JUC的同步工具(除了原子类)基本上都会使用到AQS(AbstractQueuedSynchronizer、AbstractQueuedLongSynchronizer),Doug Lea经过精妙的设计将它设计成了一套框架并且开放API是我们可以借助这个框架快速实现自己的同步工具。具体细节这里不便展开,会在其他文章中补充。整个JUC是基于volatile + cas + 自旋的,三者结合相辅相成实现了锁的原子性、可见性与有序性。

至此,我们基本上解决了在常规单机环境下的多线程同步问题(通过内置锁与显式锁)。而在分布式应用大行其道的今天,只在单机环境下进行同步显然是无法让面试官满意的。

分布式锁

前文已经对锁理论进行了详细阐述了,总结起来就是要实现原子性、可见性与有序性。需要以下东西,锁标识,等待队列,阻塞队列,条件变量实现。而分布式问题正如我在面试的逻辑与应对方法文中所阐述的,分布式问题基本的解决思路是要么通过一致性算法保证一致,要么引入第三方服务来保证一致。市面上实现分布式锁的方案大致有两种:基于redis的分布式锁和基于zookeeper的分布式锁。当然使用mysql(数据库),etcd或者你自己写个服务用来处理锁操作请求也是可以的,只要最终能实现monitor的模型都可以。它们都属于引入第三方服务来保证一致。本文主要阐述基于redis的分布式锁的实现。其实就是实现锁理论。

理论基础:redis可以保证key唯一性,而zookeeper可以保证node节点的唯一性,且是放在内存里面的,保证服务端尽可能高效。

一个基于redis setnx指令的分布式锁的基本模式如下:

package com.example;

import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

public class LockRedis {
    public static void main(String[] args) {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setDatabase(0);
        redisStandaloneConfiguration.setHostName("localhost");
        redisStandaloneConfiguration.setPort(6379);
        LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration);
        lettuceConnectionFactory.afterPropertiesSet();
        RedisConnection connection = lettuceConnectionFactory.getConnection();
        CountDownLatch countDownLatch = new CountDownLatch(20);
        byte[] lockKey = "lock".getBytes(StandardCharsets.UTF_8);
        LockRedis testMain6 = new LockRedis();
        for(int i = 0;i<20;i++){
            new Thread(()->{
                try {
                    while (!connection.setNX(lockKey,Long.valueOf(Thread.currentThread().getId()).toString().getBytes(StandardCharsets.UTF_8))){ }
                    if(connection.expire(lockKey,10)){
                        testMain6.a();
                        countDownLatch.countDown();
                    }
                }catch (Exception e){
                    System.out.println(e.getMessage());
                }finally {
                    connection.del(lockKey);
                }
            }).start();
        }
        try {
            countDownLatch.await();
        }catch (Exception e){}
        System.out.println(testMain6.i);
    }
    private int i= 0;
    private void a(){
        for (int j=0;j<10000;j++){
            ++this.i;
        }
    }
}

setnx是SET if Not eXists的缩写(和cas一样将判断与设置值两步操作做成了一个原子指令),通过这条指令配合expire指令、del指令和自旋可以实现互斥上锁,添加锁超时与解锁操作。我在番茄工作的时候公司使用封装setnx做分布式锁,也就是说他是在生产环境基本可用的。上文代码有个bug,解锁应该是只能由占用线程解锁。更合理的方式是使用SET resource_name my_random_value NX PX 30000,可以在一条命令上进行判断设置值和设置超时时间。

从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改:

EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
XX :只在键已经存在时,才对键进行设置操作。

Redisson是基于Redis封装的一个分布式锁的组件。解决了锁占用超时自动而提前释放锁的问题,这种机制在内部封装称为watch dog。Redisson基本用法如下:

package com.example;

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.config.Config;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class LockRedisson {
    public static void main(String[] args) {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        //如果超时还在锁还被占用会延期30s(默认也是30s)
        config.setLockWatchdogTimeout(30000L);
        CountDownLatch countDownLatch = new CountDownLatch(20);
        LockRedisson testMain5 = new LockRedisson();
        for(int i = 0;i<20;i++){
            new Thread(()->{
                try {
                    RLock rLock = Redisson.create(config).getLock("lock-redisson");
                    if(rLock.tryLock(3000L,2L , TimeUnit.SECONDS)){
                        Thread.sleep(TimeUnit.SECONDS.toMillis(3));
                        testMain5.a();
                        countDownLatch.countDown();
                        rLock.unlock();
                    }
                }catch (Exception e){
                }
            }).start();
        }
        try {
            countDownLatch.await();
        }catch (Exception e){}
        System.out.println(testMain5.i);
    }
    private int i= 0;
    private void a(){
        for (int j=0;j<10000;j++){
            ++this.i;
        }
    }
}

Redisson使用JUC中的Lock接口,封装了一个RLock接口,所以在使用上基本与显式锁的用法一致。Redisson是基于Lua脚本的,在redis中使用一个脚本解释器去执行所有脚本,这可以保证每个lua脚本执行的原子性同时为将复杂的锁功能封装提供了可能。加锁与解锁代码如下:

    // 上锁
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }
    // 解锁
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

锁是可重入的,使用hincrby指令记录重入次数,同时publish指令发布消息。Redisson执行脚本依赖redis eval指令,而大多数云服务商提供的redis缓存会禁止该命令。同时Redissson不支持Condition相关的操作。也就是说Redisson没有实现条件变量与等待队列相关的东西。它只提供互斥访问这个monitor的基本要求,理论上你可以组合多个Lock达到条件变量的效果,但是在分布式情况下做等待队列的代价是非常大的基本上没有应用场景。

Redisson的默认使用的RedissonLock是通过事件获取(pub/sub)锁状态变更,RedissonSpinLock使用自旋获取锁状态变更,这两种实现都是非公平锁,RedissonFairLock是公平锁,绝大多时候用不上,RedissonReadLock是一个共享锁(读锁)而RedissonWriteLock是一个互斥锁(写锁),RedissonMultiLock可以关联管理多个RLock锁,RedissonMultiLock一个子类RedissonRedLock实现了RedLock理论(红锁)。红锁详细分析可以参考这篇文章

红锁简述:假设你有N(N最好是奇数)个Redis服务,则客户端对所有服务端发送加锁请求,如果成功上锁服务>=N/2+1且加锁时间未超过设置的锁超时时间,则表示加锁成功。否则视为加锁失败需要立即释放已经成功的节点。

我个人不太理解为什么用红锁,弄多台服务开销大,维护成本高,IO成本也高。

zookeeper作为一个分布式协调组件,天然拥有作为分布式锁的优势。网上大部分是使用zookeeper提供的临时有序节点实现分布式锁,可以保证锁标识在断开连接后删除,同时保证了在一个节点上不会有过多的watcher竞争,这样的实现是一个公平锁。基本实现逻辑如下:

package com.example;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class LockZookeeper {
    public static void main(String[] args) {
        LockZookeeper testMain7 = new LockZookeeper();
        CountDownLatch latch = new CountDownLatch(20);

        for (int i=0;i<20;i++){
            new Thread(()->{
                String lockNode = "/locks/lock";
                List<ACL> acls = ZooDefs.Ids.OPEN_ACL_UNSAFE;
                CreateMode ephemeral = CreateMode.EPHEMERAL_SEQUENTIAL;
                ZooKeeper zooKeeper =null;
                String s = null;
                try {
                    zooKeeper = new ZooKeeper("localhost:2181", 3000, event -> {});
                    s = zooKeeper.create(lockNode, Long.valueOf(Thread.currentThread().getId()).toString().getBytes(StandardCharsets.UTF_8),acls , ephemeral);
                    s = Paths.get(s).getFileName().toString();
                }catch (Exception e){
                    e.printStackTrace();
                    latch.countDown();
                    return;
                }
                try {
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    System.out.println(s);
                    List<String> children = zooKeeper.getChildren("/locks",null);
                    Collections.sort(children);
                    // 本节点是最小节点
                    if(s.equals(children.get(0))){
                        // 则上锁成功
                        countDownLatch.countDown();
                    }else {
                        // 上个节点
                        int index = children.indexOf(s) - 1;
                        String watchPath = children.get(index);
                        zooKeeper.addWatch("/locks/"+watchPath,event -> {
                            System.out.println(event);
                            if(event.getType() == Watcher.Event.EventType.NodeDeleted){
                                countDownLatch.countDown();
                            }
                        }, AddWatchMode.PERSISTENT_RECURSIVE);
                        Stat exists = zooKeeper.exists("/locks/"+watchPath, null);
                        System.out.println("watchPath:/locks/" + watchPath);
                        // 在添加watcher过程中节点被删除了
                        if(exists == null){
                            // 上锁成功
                            countDownLatch.countDown();
                        }
                    }
                    countDownLatch.await();
                    testMain7.a();
                    latch.countDown();
                    System.out.println(s+"--success");
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    // 解锁
                    try {
                        byte[] data = zooKeeper.getData("/locks/"+s, null, null);
                        if(Long.valueOf(Thread.currentThread().getId()).toString().equals(new String(data,StandardCharsets.UTF_8))){
                            zooKeeper.delete("/locks/"+s,-1);
                        }
                    }catch (Exception e){}
                }
            }).start();
        }
        try {
            latch.await();
        }catch (Exception e){}
        System.out.println(testMain7.i);
    }
    private int i= 0;
    private void a(){
        for (int j=0;j<10000;j++){
            ++this.i;
        }
    }
}

和redis一样,zookeeper也有一个开源组件Apache Curator实现了使用zookeeper作为分布式锁,可以大大简化开发。如下:

package com.example;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class LockZookeeperCurator {
    public static void main(String[] args) {
        LockZookeeperCurator testMain8 = new LockZookeeperCurator();
        CountDownLatch countDownLatch = new CountDownLatch(20);
        String lockNode = "/locks/lock2";

        for(int i=0 ; i<20 ; i++){
            new Thread(()->{
                try {
                    CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(10, 1000));
                    client.start();
                    InterProcessMutex interProcessMutex = new InterProcessMutex(client, lockNode);
                    if(interProcessMutex.acquire(30, TimeUnit.SECONDS)){
                        testMain8.a();
                        countDownLatch.countDown();
                        interProcessMutex.release();
                    }
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
        try {
            countDownLatch.await();
        }catch (InterruptedException e){}
        System.out.println(testMain8.i);
    }
    private int i= 0;
    private void a(){
        for (int j=0;j<10000;j++){
            ++this.i;
        }
    }
}

Curator框架实现的分布式锁比我们自己手写的要完善点,内部使用LockData存有所有者线程、锁路径与重入次数:

private static class LockData
    {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);

        private LockData(Thread owningThread, String lockPath)
        {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

Curator框架一样也是使用临时有序节点作为锁标识:

    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

和Redisson一样Curator框架也是没有实现条件变量和等待队列相关的东西。究其原因还是是因为在分布式情况下开销太大、且难以维护。同样的,除了InterProcessMutex实现之外,Curator也提供了InterProcessMultiLock由于管理多个锁(同时加锁,同时释放),InterProcessReadWriteLock分布式读写锁,InterProcessSemaphoreV2分布式信号量等等。

到此,我们从内置锁到显示锁再到分布式锁都涉猎到了。内置锁在长时间阻塞或者等待,竞争激烈的资源上使用其实还是合理的,假如你知道自己的同步代码计算密集的,那你应该优先使用内置锁,减少线程状态切换成本,分布式锁在多系统协调的现在被广泛使用,一般情况下都是使用Redisson,功能更加完善,目前我待过的公司没有使用Curator框架做锁的。但是其实能实现锁效果(monitor理论)都一样,工具只是工具怎么用还是得根据实际情况。条件变量与等待队列在我们生产中其实也非常少用到,内置锁只有一个条件变量,在等待多个条件变量的时候会提高竞争,且会引入不必要的竞争,显示锁通过Condition解决了这一点,分布式锁没有提供条件变量功能(没有wait与signal),因为绝大多是时候分布式锁是业务需要的,而业务一般情况下不会有等待其他系统完成后再抢占的情况,再实际生产中业务也基本上不会遇到需要条件变量的情况。

四、优化

多线程会带来额外的上下文切换、内存同步和阻塞开销。在通常情况下,这种开销是值得的,多线程带来了快的响应与更高的吞吐量,可以同时处理多个问题。前文提到多线程的问题集中在对共享资源的访问带来的竞争,我们要认识这种竞争识别出哪些是必要的哪些是不必要的竞争,同时使用同步机制保证在必要竞争使得程序不出错。正确比执行效率更加重要,保证程序逻辑正确是一切优化的前提。
多线程共享资源问题一句话总结起来就是:在不正确的时机共享了一个不可变对象或者共享了一个缺少同步的对象。前者在双检锁单例和构造方法的编写的时候需要特别注意,前者可能导致单例被构造多次(如果该对象是有状态的,那情况将变得更加糟糕),后者可能导致一个尚未构造完全得对象被发布出去。
前人总结有一些经验来保证多线程得安全:
1.约束对象的生命周期与使用范围:在绝大多数非珍贵对象的使用上,用完即垃圾回收是一个良好的实践。正如Servlet API设计的Application,Session,Request一样,严格控制这几个对象保证了整个Servlet容器的安全性,想象Request复用会带来怎样的糟糕后果。
2.不可变对象:这里我所说的不可变对象包括final修饰的对象、事实不变对象与无状态对象。不可变的数据保证了其被共享出去不会在不同的对象表现出不一样的状态与行为,final对象是通过禁止实例化后修改做到的、事实不变对象依赖各个线程约定不修改其状态做到,而无状态对象则是本身就没有数据修改做到不变的。
注:你可以看出来,之所以会有多线程问题是因为多个线程会对一个共享对象进行修改。假如所有线程都只是读取,自然不会有并发问题。
3.线程封闭:这应该是实践最多也最重要的解决多线程问题的技术。本质上来说就是禁止多线程共享数据。最常用的有三种手段:栈封闭、ThreadLocal和串行线程封闭。栈封闭依赖Java分配在线程栈局部变量的对象对其他线程不可见这一特性(这不是必然的,当形成闭包时也是会共享的,不过会要求局部变量用final修饰)。ThreadLocal将数据与线程对象相绑定保证读写都只能由绑定线程来操作来保证安全性。串行线程封闭要求数据的对象发布线程与使用线程必然在不同的时刻操作数据,典型的场景是阻塞队列的实现,它保证的生产线程和消费线程在操作Task的实际是不同的从而保证安全(这也不是必然的,取决于程序员如何使用共享的对象,当然如果按照规范和语义来自然是没有问题)。
在《Java并发编程实践》一书中总结了一些策略:线程封闭、只读共享封装同步以实现线程安全共享、公开保护对对象的锁与文档让使用方自行构建合适的同步。可以看出来共享一个可被多线程更改的对象是无法避免的,也是十分必要的,所以我们上文花了大量篇幅说明锁同步机制。多线程提高了资源利用率提高了程序性能,也引入了很多问题,锁解决了多线程同步的问题(解决了多线程安全性问题),但是无法解决多线程的活跃性问题。

1.死锁:死锁指的是一种阻塞僵持状态。
最最常见的死锁是顺序死锁,指的是A线程与B线程同时需要Ⅰ和Ⅱ两把锁,而加锁顺序却不一致,同时不放弃已经持有的锁;导致A取得Ⅰ锁在获取Ⅱ锁的时候阻塞了,而B获得的Ⅱ锁却阻塞在了Ⅰ锁。这可能并非有意为之:比如用户A给用户B打钱而同时用户B给用户A打钱,实现上要求先给打款方加锁然后给收款方加锁,逻辑上本没有错误但是顺序死锁在这种情况下还是发生了(动态顺序死锁)。同样的,假如M1方法业务上要求对1进行加锁然后对2进行加锁,而M2方法的业务却恰好相反,这也会发生死锁(协作死锁)。再有,假如A业务获取了文件F的锁等待打印机P的锁,而B业务获取了打印机P的锁却等待文件F的锁这又会发生死锁(资源死锁)。还有,如果我们写了一个类要求这个类的对象需要同时拥有l1和l2两把锁才能使用,而获取锁的逻辑是交给客户端的,这也极有可能导致死锁(开放调用死锁)。正因为导致死锁又多种情况这使得死锁成为了引入同步导致的最常见的问题。死锁是同步保证安全性而引入的活跃性问题。

2.活锁:活锁指的是一直做无用功。
它不会导致阻塞,却也无法使业务往下继续正常执行。比如队列消费者在消费过程中发现task由于不满足无法继续,于是将task推回到队列,如此往复,看似一直在处理这个task却始终在做无用功,再比如线程等待一个必然不满足的条件,导致一直在某处死循环无法推进业务。活锁的问题实际上与死循环问题都是因为没有设置好完善的终止条件,无论是失败还是成功都应该有个终态。

3.饥饿:饥饿指的是一些资源无法得到满足。比如一个低优先级的线程在竞争激烈情况下始终无法获取到CPU的时间片资源,导致该线程始终处于不活跃状态。在绝大多数时候,一个线程被创建出来我们是希望它能够执行,饥饿明显违背这个初衷,不过我们很少会犯这样的错误。参考

4.信号丢失与过早唤醒:过早唤醒指的是线程A唤醒一个等待队列,而后等待队列上的线程却发现条件变量未得到满足于是继续等待。这个现象在Java内置锁是很常见的,因为所有线程共享一个条件变量一个等待队列。信号丢失指的是在进入等待队列之前没有检查条件变量导致线程可能错过这个信号,这可能是永远的错过了,是一种糟糕的情况。所以在wait和notify的标准编程范式是wait在check条件变量的循环中,而notify应该是释放锁前的最后逻辑。

        try{
            if(lock.tryLock(10, TimeUnit.SECONDS)){
                while (conditionV){
                    condition.await(3,TimeUnit.SECONDS);
                }
                // do something

                condition.signalAll();
            }
        }catch (InterruptedException e){
            // handle InterruptedException or other exception
        }finally {
            if(lock.isLocked()){
                lock.unlock();
            }
        }

遵守编程范式可以避开很多不必要的麻烦。

上述问题在实践中是有方法可以优化的:
1.锁顺序一致:
约定加锁的固定顺序规则可以大大减少死锁的发生,比如上文规定先获取Ⅰ锁再获取Ⅱ锁就不会发生死锁,但是其实这是非常难限制的,所以我们需要对对象的加锁与解锁操作就行封装;如ConcurrentHashMap那样,直接使用就行而不需要关注里面的加解锁过程同时它也没有公开锁。你无法限制其他进程加锁顺序所以自然无法处理分布式的情况,当然你可以提供封装好的SDK分发给各进程使用以保证对多个限制资源访问时保证锁顺序一致。这也是我推荐的方式。也是我推荐的标准先行,边界明确的的一个实践。

2.在正确的条件变量上等待:
上文提到过早唤醒可以明确加入等待不太条件变量的等待队列以避免过早唤醒情况的发生。明确线程所等待的条件变量,可以大大减少竞争。JUC的Lock Api的一个优势就是引入了Condition,使得线程不必像内置锁一样等待同一个条件变量。

3.设置超时与最大重试次数:
信号丢失的问题可以通过设置超时时间恢复过来,最大重试次数可以有效解决活锁问题。任何操作都应该是有终态的,不论是等待还是还是重试,无休止的操作时不符合程序定义的。

这里与缓存设置失效时间的目的是不一样的,缓存失效是为了程序可以刷新状态,操作不当设置缓存失效时间可能会发生缓存击穿与雪崩。而这里设置时为了程序不会死去也不会无休止执行。

4.减少锁竞争:
首先我们能想到的是尽可能缩短持有锁的时间,这个道理很浅显,每个人做核酸的时间短了,大家等待时间就短了,自然积压的队伍就短了。程序串行部分越少在多线程环境下自然越有利。在一些你不得不使用一个大同步块保护代码逻辑的时候可以考虑锁分解技术,锁分解就是将一个锁保护的代码分解为多段不同锁保护的代码,这需要我们识别一个锁保护的是哪个具体的互斥资源,这些互斥资源之间应当是相互独立的。结合上文顺序死锁可以预见的时锁分解提高了死锁风险,同时如果在竞争不激烈的资源上进行锁分解会降低效率(因为需要频繁加锁解锁)。 在一些情况下锁分解技术可以进一步变成锁分段技术,在ConcurrentHashMap的实现上就使用到了锁分段技术,内部初始化一个16长度的的锁数组,每个锁保护的哈希桶(bucket),key通过hash之后就会分布在这16个bucket之中的其中一个,于是乎,如果hash合理,可以同时承载16个线程的并发写入,相比传统的HashTable,ConcurrentHashMap的性能要好很多。当然ConcurrentHashMap在扩容的时候则需要获取所有的锁,也就和HashTable一样锁住了整个容器。缓存热点数据也是一个减少竞争的方式,避免了访问一些数据的时候需要反复运算,ConcurrentHashMap使用了16个计数器来保存哈希桶的size信息,如果需要获取整个map的size则将它们加起来,而不需要对整个map进行遍历计数。

做核酸 : 从2019年开始到今天2022年4月12日新冠疫情这几年,核酸检测已经十分普遍了。

5.使用JUC里面的工具:
里面的原子类、显示锁、并发集合、并发工具、线程池等等,懂得都懂。这里就不展开了。

注:尽量做到无锁。

五、总结

本文重心在于说明管程及条件变量理论,基于这个理论java实现了有内置锁与显式锁。至于它们如何选择则需要根据场景,使用显式锁的时候要要注意符合编程范式与中断处理,这可以避免犯错,相比于显式锁内置锁使用更加简单且更容易排查问题(因为内置锁比如锁住一个块,必然与某个栈帧相关联),同时内置锁可以被JVM优化,比如对线程封闭的锁对象进行锁消除优化。在分布式环境下必然得使用分布式锁,同样,分布式锁也没有脱离管程理论,在加锁与解锁这两个操作之间形成了一个概念盒子,在盒子里面的代码时同步的(串行执行的)。锁的的理解与使用只是个开始,要应用到实际则需要分析业务,找出这个业务的前置条件(precondition),不变性条件(Invariant,译为不变式比如X+Y=10)与后置条件(post-condition);以一个简化的转账为例,前者条件有打钱账户不能小于打钱金额、双方账户额度都可用等等,不变性条件包括打款方减少的金额应该等于收款方增加金额等等,后置条件包括转账前后的余额之和不变等等。而我们使用锁就是为了保护这些条件不会在多线程环境下遭到破坏。

注:转载请注明作者与出处。意见或者建议请留言评论。

标签: 多线程, JVM, 并发

添加新评论