当前位置 博文首页 > 吃肉不长肉的小灏哥:synchronized的jvm源码分析聊锁的意义

    吃肉不长肉的小灏哥:synchronized的jvm源码分析聊锁的意义

    作者:吃肉不长肉的小灏哥 时间:2021-01-27 21:20

      上篇写完了ReentrantLock源码实现,从我们的角度分析设计锁,在对比大神的实现,顺道拍了一波道哥的马屁,虽然他看不到,哈哈。这一篇我们来聊一聊synchronized的源码实现,并对比reentrantLock的实现,相信认真看完一定会对锁的理解更加深入。

      废话不多说先来一段代码:

    static String s = new String();
        static int a = 1;
    
        public static void main(String[] args) {
            synchronized (s) {
                a++;
            }
        }

    我们一般写加锁也就这么写,为啥一个synchronized关键字就能做到加锁的效果呢,我们来看下这段代码main方法里的字节码:

    0: getstatic     #2                  // Field s:Ljava/lang/String;
           3: dup
           4: astore_1
           5: monitorenter
           6: getstatic     #3                  // Field a:I
           9: iconst_1
          10: iadd
          11: putstatic     #3                  // Field a:I
          14: aload_1
          15: monitorexit
          16: goto          24
          19: astore_2
          20: aload_1
          21: monitorexit
          22: aload_2
          23: athrow
          24: return

    我们主要看一下标红的几行,5,15,21,其中monitorenter是进入临界区的操作,monitorexit是退出临界区时的操作,而为啥又俩monitorexit,其中15行是正常退出的,而21行是异常退出的,毕竟我们之前写lock是放在finally中的,而这里当然也要通过手段保证退出临界区必须要释放锁。知道了synchronized其实就是monitorenter和monitorexit,我们还需要了解的就是对象头,这里我特意补了一篇对象头的介绍:https://www.cnblogs.com/gmt-hao/p/14151951.html,接下来的synchronized源码还是比较依赖对象头的理解的。

      在翻看synchronized的源码时,找入口就花了我很长的时间,有的文章说是InterpreterRuntime::monitorenter方法,有的文章写的是bytecodeInterpreter.cpp里的CASE(_monitorenter)方法后来我发现后者里面其实是调用了前者的,所以倾向于后者看了下去,但是我下载的jdk8里面的代码是这样的:

    CASE(_monitorenter): {
            oop lockee = STACK_OBJECT(-1); //取到锁对象
            // derefing's lockee ought to provoke implicit null check
            CHECK_NULL(lockee); //判空
            // find a free monitor or one already allocated for this object
            // if we find a matching object then we need a new monitor
            // since this is recursive enter
            BasicObjectLock* limit = istate->monitor_base();
            BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
            BasicObjectLock* entry = NULL;
            while (most_recent != limit ) {
              if (most_recent->obj() == NULL) entry = most_recent;
              else if (most_recent->obj() == lockee) break;
              most_recent++;
            }
            if (entry != NULL) {
              entry->set_obj(lockee);
              markOop displaced = lockee->mark()->set_unlocked();  //复制一份锁对象并将其设置为无锁状态
              entry->lock()->set_displaced_header(displaced);
              if (Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
                // Is it simple recursive case?
                if (THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
                  entry->lock()->set_displaced_header(NULL);
                } else {
                  CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
                }
              }
              UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
            } else {
              istate->set_msg(more_monitors);
              UPDATE_PC_AND_RETURN(0); // Re-execute
            }
          }

    对,就这么点就没了,网上也有很多文章是按照这个写的,这让我看的时候很困惑,而且这些文章关于偏向锁的解释我都感觉和代码对不上,有种硬生生强塞进去的感觉,我当时就很好奇,这种文章的作者自己真的理解了吗,写这些文章不怕误导读者吗。这里我要推荐一篇我觉得写的非常好的,我也从中借鉴了很多,看了这篇博客之后,很多之前没理通的逻辑都弄明白了,非常感谢,这里贴上链接:https://www.jianshu.com/p/4758852cbff4(找资料的过程中发现很多博客都是抄的这篇文章的),关于文章中写到的为什么jdk8的某版本及之前的版本里面代码无法解释偏向锁的问题,也是看了文章中推荐的R大的文章才算是有所理解,在此非常感谢这些真正传道授业解惑的人,也贴一下链接:https://book.douban.com/annotation/31407691/,我就不再赘述(主要是理解的比较浅显),希望想了解synchronized源码的小伙伴还是看一下上面提供的链接文章。

      下面才是源码解析正式开始,由于bytecodeInterpreter.cpp中的CASE(_monitorenter)代码和真正模板解释器的汇编代码逻辑基本一致(主要是我太菜了看不懂),这里就用这里的实现来解释synchronized源码,贴上代码:

    1.偏向锁的获取

    CASE(_monitorenter): {
            //获取锁对象
            oop lockee = STACK_OBJECT(-1);//栈帧当中生成一个lock record 记录
            BasicObjectLock* limit = istate->monitor_base();
            BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
            BasicObjectLock* entry = NULL;
            while (most_recent != limit ) {
              if (most_recent->obj() == NULL) entry = most_recent;
              else if (most_recent->obj() == lockee) break;
              most_recent++;
            }
            if (entry != NULL) {
              //将lock record中的obj指向锁对象
              entry->set_obj(lockee);
              int success = false;
              uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
              //获取锁对象头信息
              markOop mark = lockee->mark();
              intptr_t hash = (intptr_t) markOopDesc::no_hash;
              // implies UseBiasedLocking
              //判断是否禁用偏向锁
              if (mark->has_bias_pattern()) {
                uintptr_t thread_ident;
                uintptr_t anticipated_bias_locking_value;
                //获取线程id
                thread_ident = (uintptr_t)istate->thread();
                //计算是否偏向自己
                anticipated_bias_locking_value =
                  (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
                  ~((uintptr_t) markOopDesc::age_mask_in_place);
                //1.判断是否偏向自己
                if  (anticipated_bias_locking_value == 0) {
                  // already biased towards this thread, nothing to do
                  if (PrintBiasedLockingStatistics) {
                    (* BiasedLocking::biased_lock_entry_count_addr())++;
                  }
                  success = true;
                }
                //2.判断是否可偏向,不可偏向则尝试撤销
                else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
                  // 拿到锁对象的原型对象头
                  markOop header = lockee->klass()->prototype_header();
                  if (hash != markOopDesc::no_hash) {
                    header = header->copy_set_hash(hash);
                  }
                  //其实这里是将锁对象的mark word替换为没有偏向的,即撤销偏向
                  if (lockee->cas_set_mark(header, mark) == mark) {
                    if (PrintBiasedLockingStatistics)
                      (*BiasedLocking::revoked_lock_entry_count_addr())++;
                  }
                }
                //3.判断是否过期,即判断对象头中epoch是否不一致,若不一致则尝试重偏向
                else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
                  // try rebias
                  //基于lockee对象构造一个偏向当前线程的mark word
                  markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
                  if (hash != markOopDesc::no_hash) {
                    new_header = new_header->copy_set_hash(hash);
                  }
                  //cas替换mark word为上面偏向当前线程的
                  if (lockee->cas_set_mark(new_header, mark) == mark) {
                    if (PrintBiasedLockingStatistics)
                      (* BiasedLocking::rebiased_lock_entry_count_addr())++;
                  }
                  else { //替换失败则说明有多个线程同时竞争,锁升级
                    CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
                  }
                  success = true;
                }
                else {//4.走到这里要么是偏向其他线程且没有过期的,要么就是匿名偏向(即没有保存线程信息)
                  // try to bias towards thread in case object is anonymously biased
                  //这里构造一个当前mark word的匿名偏向对象头
                  markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
                                                                  (uintptr_t)markOopDesc::age_mask_in_place |
                                                                  epoch_mask_in_place));
                  if (hash != markOopDesc::no_hash) {
                    header = header->copy_set_hash(hash);
                  }
                  //这是将mark word偏向当前线程
                  markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
                  // debugging hint
                  DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
                  //这里会尝试将偏向当前线程的mark word替换到锁对象中,
                  //若是匿名偏向则可以cas成功,若已经偏向其他线程,
                  //或有可能刚好被其他线程先修改了,都说明有多个线程竞争,
                  //则会cas失败
                  if (lockee->cas_set_mark(new_header, header) == header) {
                    if (PrintBiasedLockingStatistics)
                      (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
                  }
                  else {//替换失败则说明有多个线程同时竞争,锁升级
                    CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
                  }
                  success = true;
                }
              }
              // traditional lightweight locking
              if (!success) { //轻量锁,当偏向锁未开启或失败
                //构造无锁mark word
                markOop displaced = lockee->mark()->set_unlocked();
                // 将上面构造的lock record指向该无锁mark word
                entry->lock()->set_displaced_header(displaced);
                bool call_vm = UseHeavyMonitors;
                // 使用重量级锁或者轻量级锁加锁失败,结果都会导致使用重量级锁
                //这里if条件不满足的话则说明轻量级锁加锁成功直接结束
                if (call_vm || lockee->cas_set_mark((markOop)entry, displaced) != displaced) {
                  // Is it simple recursive case?
                  //锁重入
                  if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
                    entry->lock()->set_displaced_header(NULL);
                  } else { //锁升级
                    CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
                  }
                }
              }
              UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
            } else {
              istate->set_msg(more_monitors);
              UPDATE_PC_AND_RETURN(0); // Re-execute
            }
          }

    上面的代码我都加了注释,基本逻辑应该是可以看的懂得,主要解释一下几个难以理解的点,这里我分别用红黄蓝三色标记了,下面一一解释:

    红:这里的BasicObjectLock的定义在basicLock.hpp中:

    class BasicObjectLock {
      friend class VMStructs;
     private:
      BasicLock _lock;                                    // the lock, must be double word aligned
      oop       _obj;  

    里面分别包含了一个BasicLock和一个oop对象,BasicLock的定义也在当前文件中:

    class BasicLock {
      friend class VMStructs;
      friend class JVMCIVMStructs;
     private:
      volatile markOop _displaced_header;
     public:

    其实就是一个markOop对象头,也就是说BasicObjectLock其实就是一个对象本身和对象头的组合,也叫做lock record。

    了解完这些我们再看代码,其实就是从当前调用方法栈的most_recent(栈底)搜索到limit(栈顶)遍历查找,直到找到一个空闲的或者之前就指向当前锁对象的lock record。

    黄:黄色部分代码还是挺复杂的,首先看 (uintptr_t)lockee->klass()->prototype_header() | thread_ident) ,这个其实是那当前锁对象的class原型对象头和当前线程进行或运算(其实相当于对象头记录下当前线程信息),。

    再看后面  ^ (uintptr_t)mark,其实就是那生成的偏向当前线程的mark word和当前锁对象的进行异或运算看两者的区别。

    再看后面& ~((uintptr_t) markOopDesc::age_mask_in_place) ,~((uintptr_t) markOopDesc::age_mask_in_place) 这个是将拿到mark word为...00001111000取反后变为...11110000111,再和前面进行与运算,可以排除掉gc年龄的干扰,就可以将不同点集中到偏向线程、偏向状态以及锁状态上,如果上面代码中步骤1等于0成立则说明和锁对象一样,可以得出偏向自己,步骤2和偏向锁状态与运算不等于0说明偏向状态标志位为0,没有开启偏向模式,步骤3也一样,只是计算epoch值是否相等,可判断是否需要重偏向。

    蓝:上面也提到了这块通过判断epoch值是否相同,但是原因我找过很多资料,最终发现有一篇问题下的一个回答说的比较好,附上链接:https://www.zhihu.com/question/56582060/answer/155398235 防止回答失效,原文摘抄了过来:

    对于存在明显多线程竞争的场景下使用偏向锁是不合适的,比如生产者-消费者队列。生产者线程获得了偏向锁,消费者线程再去获得锁的时候,就涉及到这个偏向锁的撤销(revoke)操作,而这个撤销是比较昂贵的。那么怎么判断这些对象是否适合偏向锁呢?jvm采用以类为单位的做法,其内部为每个类维护一个偏向锁计数器,对其对象进行偏向锁的撤销操作进行计数。当这个值达到指定阈值的时候,jvm就认为这个类的偏向锁有问题,需要进行重偏向(rebias)。对所有属于这个类的对象进行重偏向的操作叫批量重偏向(bulk rebias),之前的做法是对heap进行遍历,后来引入epoch。当需要bulk rebias时,对这个类的epcho值加1,以后分配这个类的对象的时候mark字段里就是这个epoch值了,同时还要对当前已经获得偏向锁的对象的epoch值加1,这些锁数据记录在方法栈里。这样判断这个对象是否获得偏向锁的条件就是:mark字段后3位是101,thread字段跟当前线程相同,epoch字段跟所属类的epoch值相同。如果epoch值不一样,即使thread字段指向当前线程,也是无效的,相当于进行过了rebias,只是没有对对象的mark字段进行更新。如果这个类的revoke计数器继续增加到一个阈值,那个jvm就认为这个类不适合偏向锁了,就要进行bulk revoke。于是多了一个判断条件,要查看所属类的字段,看看是否允许对这个类使用偏向锁。

      上面主要介绍了偏向锁的加锁和轻量锁的部分加锁流程,流程可参考下图:

      

     

    2.偏向锁批量撤销和重偏向

       接下来主要是偏向锁撤销和重偏向流程,先看InterpreterRuntime::monitorenter代码:

    IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))if (PrintBiasedLockingStatistics) {
        Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
      }
      Handle h_obj(thread, elem->obj());
      assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
             "must be NULL or an object");
      if (UseBiasedLocking) { //开启偏向锁
        // Retry fast entry if bias is revoked to avoid unnecessary inflation
        ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
      } else {
        ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
      }

    这里主要就两个方法开启偏向锁时执行的fast_enter和未开启偏向锁执行的slow_enter,我们先看fast_enter方法:

    void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock,
                                        bool attempBiasedLocking::revoke_and_rebiast_rebias, TRAPS) {
      if (UseBiasedLocking) { //使用偏向锁
        if (!SafepointSynchronize::is_at_safepoint()) { //不在安全点(安全点指所有java线程都停在安全点,只有vm线程运行)
          BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
          if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
            return;
          }
        } else {
          BiasedLocking::revoke_at_safepoint(obj);
        }
        assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
      }
    
      slow_enter(obj, lock, THREAD);
    }

    这里再次判断了UseBiasedLocking,由于fast_enter方法里面也会执行slow_enter方法,个人感觉上一层的if-else是多余的,不过也有可能是为了可读性更强吧。

    这里的is_at_safepoint我找了下源码里的解释,在safepoint.cpp文件里可以看到: