当前位置 博文首页 > 吃肉不长肉的小灏哥:synchronized的jvm源码分析聊锁的意义
上篇写完了ReentrantLock源码实现,从我们的角度分析设计锁,在对比大神的实现,顺道拍了一波道哥的马屁,虽然他看不到,哈哈。这一篇我们来聊一聊synchronized的源码实现,并对比reentrantLock的实现,相信认真看完一定会对锁的理解更加深入。
废话不多说先来一段代码:
static String s = new String(); static int a = 1; public static void main(String[] args) { synchronized (s) { a++; } }
我们一般写加锁也就这么写,为啥一个synchronized关键字就能做到加锁的效果呢,我们来看下这段代码main方法里的字节码:
0: getstatic #2 // Field s:Ljava/lang/String; 3: dup 4: astore_1 5: monitorenter 6: getstatic #3 // Field a:I 9: iconst_1 10: iadd 11: putstatic #3 // Field a:I 14: aload_1 15: monitorexit 16: goto 24 19: astore_2 20: aload_1 21: monitorexit 22: aload_2 23: athrow 24: return
我们主要看一下标红的几行,5,15,21,其中monitorenter是进入临界区的操作,monitorexit是退出临界区时的操作,而为啥又俩monitorexit,其中15行是正常退出的,而21行是异常退出的,毕竟我们之前写lock是放在finally中的,而这里当然也要通过手段保证退出临界区必须要释放锁。知道了synchronized其实就是monitorenter和monitorexit,我们还需要了解的就是对象头,这里我特意补了一篇对象头的介绍:https://www.cnblogs.com/gmt-hao/p/14151951.html,接下来的synchronized源码还是比较依赖对象头的理解的。
在翻看synchronized的源码时,找入口就花了我很长的时间,有的文章说是InterpreterRuntime::monitorenter方法,有的文章写的是bytecodeInterpreter.cpp里的CASE(_monitorenter)方法后来我发现后者里面其实是调用了前者的,所以倾向于后者看了下去,但是我下载的jdk8里面的代码是这样的:
CASE(_monitorenter): { oop lockee = STACK_OBJECT(-1); //取到锁对象 // derefing's lockee ought to provoke implicit null check CHECK_NULL(lockee); //判空 // find a free monitor or one already allocated for this object // if we find a matching object then we need a new monitor // since this is recursive enter BasicObjectLock* limit = istate->monitor_base(); BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base(); BasicObjectLock* entry = NULL; while (most_recent != limit ) { if (most_recent->obj() == NULL) entry = most_recent; else if (most_recent->obj() == lockee) break; most_recent++; } if (entry != NULL) { entry->set_obj(lockee); markOop displaced = lockee->mark()->set_unlocked(); //复制一份锁对象并将其设置为无锁状态 entry->lock()->set_displaced_header(displaced); if (Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) { // Is it simple recursive case? if (THREAD->is_lock_owned((address) displaced->clear_lock_bits())) { entry->lock()->set_displaced_header(NULL); } else { CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception); } } UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1); } else { istate->set_msg(more_monitors); UPDATE_PC_AND_RETURN(0); // Re-execute } }
对,就这么点就没了,网上也有很多文章是按照这个写的,这让我看的时候很困惑,而且这些文章关于偏向锁的解释我都感觉和代码对不上,有种硬生生强塞进去的感觉,我当时就很好奇,这种文章的作者自己真的理解了吗,写这些文章不怕误导读者吗。这里我要推荐一篇我觉得写的非常好的,我也从中借鉴了很多,看了这篇博客之后,很多之前没理通的逻辑都弄明白了,非常感谢,这里贴上链接:https://www.jianshu.com/p/4758852cbff4(找资料的过程中发现很多博客都是抄的这篇文章的),关于文章中写到的为什么jdk8的某版本及之前的版本里面代码无法解释偏向锁的问题,也是看了文章中推荐的R大的文章才算是有所理解,在此非常感谢这些真正传道授业解惑的人,也贴一下链接:https://book.douban.com/annotation/31407691/,我就不再赘述(主要是理解的比较浅显),希望想了解synchronized源码的小伙伴还是看一下上面提供的链接文章。
下面才是源码解析正式开始,由于bytecodeInterpreter.cpp中的CASE(_monitorenter)代码和真正模板解释器的汇编代码逻辑基本一致(主要是我太菜了看不懂),这里就用这里的实现来解释synchronized源码,贴上代码:
1.偏向锁的获取
CASE(_monitorenter): { //获取锁对象 oop lockee = STACK_OBJECT(-1);//栈帧当中生成一个lock record 记录 BasicObjectLock* limit = istate->monitor_base(); BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base(); BasicObjectLock* entry = NULL; while (most_recent != limit ) { if (most_recent->obj() == NULL) entry = most_recent; else if (most_recent->obj() == lockee) break; most_recent++; } if (entry != NULL) { //将lock record中的obj指向锁对象 entry->set_obj(lockee); int success = false; uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place; //获取锁对象头信息 markOop mark = lockee->mark(); intptr_t hash = (intptr_t) markOopDesc::no_hash; // implies UseBiasedLocking //判断是否禁用偏向锁 if (mark->has_bias_pattern()) { uintptr_t thread_ident; uintptr_t anticipated_bias_locking_value; //获取线程id thread_ident = (uintptr_t)istate->thread(); //计算是否偏向自己 anticipated_bias_locking_value = (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) & ~((uintptr_t) markOopDesc::age_mask_in_place); //1.判断是否偏向自己 if (anticipated_bias_locking_value == 0) { // already biased towards this thread, nothing to do if (PrintBiasedLockingStatistics) { (* BiasedLocking::biased_lock_entry_count_addr())++; } success = true; } //2.判断是否可偏向,不可偏向则尝试撤销 else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) { // 拿到锁对象的原型对象头 markOop header = lockee->klass()->prototype_header(); if (hash != markOopDesc::no_hash) { header = header->copy_set_hash(hash); } //其实这里是将锁对象的mark word替换为没有偏向的,即撤销偏向 if (lockee->cas_set_mark(header, mark) == mark) { if (PrintBiasedLockingStatistics) (*BiasedLocking::revoked_lock_entry_count_addr())++; } } //3.判断是否过期,即判断对象头中epoch是否不一致,若不一致则尝试重偏向 else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) { // try rebias //基于lockee对象构造一个偏向当前线程的mark word markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident); if (hash != markOopDesc::no_hash) { new_header = new_header->copy_set_hash(hash); } //cas替换mark word为上面偏向当前线程的 if (lockee->cas_set_mark(new_header, mark) == mark) { if (PrintBiasedLockingStatistics) (* BiasedLocking::rebiased_lock_entry_count_addr())++; } else { //替换失败则说明有多个线程同时竞争,锁升级 CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception); } success = true; } else {//4.走到这里要么是偏向其他线程且没有过期的,要么就是匿名偏向(即没有保存线程信息) // try to bias towards thread in case object is anonymously biased //这里构造一个当前mark word的匿名偏向对象头 markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place | (uintptr_t)markOopDesc::age_mask_in_place | epoch_mask_in_place)); if (hash != markOopDesc::no_hash) { header = header->copy_set_hash(hash); } //这是将mark word偏向当前线程 markOop new_header = (markOop) ((uintptr_t) header | thread_ident); // debugging hint DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);) //这里会尝试将偏向当前线程的mark word替换到锁对象中, //若是匿名偏向则可以cas成功,若已经偏向其他线程, //或有可能刚好被其他线程先修改了,都说明有多个线程竞争, //则会cas失败 if (lockee->cas_set_mark(new_header, header) == header) { if (PrintBiasedLockingStatistics) (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++; } else {//替换失败则说明有多个线程同时竞争,锁升级 CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception); } success = true; } } // traditional lightweight locking if (!success) { //轻量锁,当偏向锁未开启或失败 //构造无锁mark word markOop displaced = lockee->mark()->set_unlocked(); // 将上面构造的lock record指向该无锁mark word entry->lock()->set_displaced_header(displaced); bool call_vm = UseHeavyMonitors; // 使用重量级锁或者轻量级锁加锁失败,结果都会导致使用重量级锁 //这里if条件不满足的话则说明轻量级锁加锁成功直接结束 if (call_vm || lockee->cas_set_mark((markOop)entry, displaced) != displaced) { // Is it simple recursive case? //锁重入 if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) { entry->lock()->set_displaced_header(NULL); } else { //锁升级 CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception); } } } UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1); } else { istate->set_msg(more_monitors); UPDATE_PC_AND_RETURN(0); // Re-execute } }
上面的代码我都加了注释,基本逻辑应该是可以看的懂得,主要解释一下几个难以理解的点,这里我分别用红黄蓝三色标记了,下面一一解释:
红:这里的BasicObjectLock的定义在basicLock.hpp中:
class BasicObjectLock { friend class VMStructs; private: BasicLock _lock; // the lock, must be double word aligned oop _obj;
里面分别包含了一个BasicLock和一个oop对象,BasicLock的定义也在当前文件中:
class BasicLock { friend class VMStructs; friend class JVMCIVMStructs; private: volatile markOop _displaced_header; public:
其实就是一个markOop对象头,也就是说BasicObjectLock其实就是一个对象本身和对象头的组合,也叫做lock record。
了解完这些我们再看代码,其实就是从当前调用方法栈的most_recent(栈底)搜索到limit(栈顶)遍历查找,直到找到一个空闲的或者之前就指向当前锁对象的lock record。
黄:黄色部分代码还是挺复杂的,首先看 (uintptr_t)lockee->klass()->prototype_header() | thread_ident) ,这个其实是那当前锁对象的class原型对象头和当前线程进行或运算(其实相当于对象头记录下当前线程信息),。
再看后面 ^ (uintptr_t)mark,其实就是那生成的偏向当前线程的mark word和当前锁对象的进行异或运算看两者的区别。
再看后面& ~((uintptr_t) markOopDesc::age_mask_in_place) ,~((uintptr_t) markOopDesc::age_mask_in_place) 这个是将拿到mark word为...00001111000取反后变为...11110000111,再和前面进行与运算,可以排除掉gc年龄的干扰,就可以将不同点集中到偏向线程、偏向状态以及锁状态上,如果上面代码中步骤1等于0成立则说明和锁对象一样,可以得出偏向自己,步骤2和偏向锁状态与运算不等于0说明偏向状态标志位为0,没有开启偏向模式,步骤3也一样,只是计算epoch值是否相等,可判断是否需要重偏向。
蓝:上面也提到了这块通过判断epoch值是否相同,但是原因我找过很多资料,最终发现有一篇问题下的一个回答说的比较好,附上链接:https://www.zhihu.com/question/56582060/answer/155398235 防止回答失效,原文摘抄了过来:
上面主要介绍了偏向锁的加锁和轻量锁的部分加锁流程,流程可参考下图:
2.偏向锁批量撤销和重偏向
接下来主要是偏向锁撤销和重偏向流程,先看InterpreterRuntime::monitorenter代码:
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))if (PrintBiasedLockingStatistics) { Atomic::inc(BiasedLocking::slow_path_entry_count_addr()); } Handle h_obj(thread, elem->obj()); assert(Universe::heap()->is_in_reserved_or_null(h_obj()), "must be NULL or an object"); if (UseBiasedLocking) { //开启偏向锁 // Retry fast entry if bias is revoked to avoid unnecessary inflation ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK); } else { ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK); }
这里主要就两个方法开启偏向锁时执行的fast_enter和未开启偏向锁执行的slow_enter,我们先看fast_enter方法:
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempBiasedLocking::revoke_and_rebiast_rebias, TRAPS) { if (UseBiasedLocking) { //使用偏向锁 if (!SafepointSynchronize::is_at_safepoint()) { //不在安全点(安全点指所有java线程都停在安全点,只有vm线程运行) BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD); if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) { return; } } else { BiasedLocking::revoke_at_safepoint(obj); } assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now"); } slow_enter(obj, lock, THREAD); }
这里再次判断了UseBiasedLocking,由于fast_enter方法里面也会执行slow_enter方法,个人感觉上一层的if-else是多余的,不过也有可能是为了可读性更强吧。
这里的is_at_safepoint我找了下源码里的解释,在safepoint.cpp文件里可以看到: