Java 中自旋锁的实现


Java中初始是使用mutex互斥锁,因为互斥锁是会线程等待挂起,而对获取锁后的操作时间比较短暂的应用场景来说,这样的锁会让竞争锁的线程不停的park,unpark 的操作,这样的系统的调用性能是非常糟糕的,为了提高锁的性能,java 在6 默认使用了自旋锁。

在Linux中本身就已经提供了自旋锁的系统调用,在glibc-2.9中就有它的比较简单的实现方法

  1. int pthread_spin_lock (lock) pthread_spinlock_t *lock;  
  2. {  
  3. asm ("\n"  
  4. "1:\t" LOCK_PREFIX "decl %0\n\t"  
  5. "jne 2f\n\t"  
  6. ".subsection 1\n\t"  
  7. ".align 16\n"  
  8. "2:\trep; nop\n\t"  
  9. "cmpl $0, %0\n\t"  
  10. "jg 1b\n\t"  
  11. "jmp 2b\n\t"  
  12. ".previous"  
  13. "=m" (*lock)  
  14. "m" (*lock));  
  15. return 0;  
  16. }  
通过总线锁把参数-1保证了减法的原子性,如果减后的值是(0)的代表获得锁,其他线程的线程自旋直到参数变成初始值(1),继续竞争锁,直到获得这把锁。

Java 并没有使用系统自带的自旋锁,自己重写了自旋锁的逻辑,并且增加了自旋的次数的控制。详细见-XX:+UseSpinning 和 -XX:PreBlockSpin=xx

让我们具体来看是如何实现的,注意这是mutex锁中所实现的lock,而并不是synchinized 的锁的spin lock的实现(这个你可以参考synchronizer.cpp里的方法TrySpin_VaryDuration)

  1. int Monitor::TrySpin (Thread * const Self) {  
  2.   if (TryLock())    return 1 ;  
  3.   if (!os::is_MP()) return 0 ;    
  4.   
  5.   int Probes  = 0 ;  
  6.   int Delay   = 0 ;  
  7.   int Steps   = 0 ;  
  8.   int SpinMax = NativeMonitorSpinLimit ;  
  9.   int flgs    = NativeMonitorFlags ;  
  10.   for (;;) {  
  11.     intptr_t v = _LockWord.FullWord;  
  12.     if ((v & _LBIT) == 0) {  
  13.       if (CASPTR (&_LockWord, v, v|_LBIT) == v) {  
  14.         return 1 ;  
  15.       }  
  16.       continue ;  
  17.     }  
  18.   
  19.     if ((flgs & 8) == 0) {  
  20.       SpinPause () ;  
  21.     }  
  22.   
  23.     // Periodically increase Delay -- variable Delay form   
  24.     // conceptually: delay *= 1 + 1/Exponent   
  25.     ++ Probes;  
  26.     if (Probes > SpinMax) return 0 ;  
  27.   
  28.     if ((Probes & 0x7) == 0) {  
  29.       Delay = ((Delay << 1)|1) & 0x7FF ;  
  30.       // CONSIDER: Delay += 1 + (Delay/4); Delay &= 0x7FF ;   
  31.     }  
  32.   
  33.     if (flgs & 2continue ;  
  34.   
  35.     // Consider checking _owner's schedctl state, if OFFPROC abort spin.   
  36.     // If the owner is OFFPROC then it's unlike that the lock will be dropped   
  37.     // in a timely fashion, which suggests that spinning would not be fruitful   
  38.     // or profitable.   
  39.   
  40.     // Stall for "Delay" time units - iterations in the current implementation.   
  41.     // Avoid generating coherency traffic while stalled.   
  42.     // Possible ways to delay:   
  43.     //   PAUSE, SLEEP, MEMBAR #sync, MEMBAR #halt,   
  44.     //   wr %g0,%asi, gethrtime, rdstick, rdtick, rdtsc, etc. ...   
  45.     // Note that on Niagara-class systems we want to minimize STs in the   
  46.     // spin loop.  N1 and brethren write-around the L1$ over the xbar into the L2$.   
  47.     // Furthermore, they don't have a W$ like traditional SPARC processors.   
  48.     // We currently use a Marsaglia Shift-Xor RNG loop.   
  49.     Steps += Delay ;  
  50.     if (Self != NULL) {  
  51.       jint rv = Self->rng[0] ;  
  52.       for (int k = Delay ; --k >= 0; ) {  
  53.         rv = MarsagliaXORV (rv) ;  
  54.         if ((flgs & 4) == 0 && SafepointSynchronize::do_call_back()) return 0 ;  
  55.       }  
  56.       Self->rng[0] = rv ;  
  57.     } else {  
  58.       Stall (Delay) ;  
  59.     }  
  60.   }  
  61. }  

a. os::is_MP() 判断系统是否是多核的系统,在单核下,自旋锁是没有意义的。

b. CASPTR 使用了 Atomic::cmpxchg_ptr 原子语义 cmpxchg 比较替换,如果比较的值相等就替换成需要的值并且返回去比较的值,如果不相同返回被比较的值的内容。

在这里的语义是比较_LockWord.FullWord 和 _Lockword 的值是否相同,如果相同就把_Lockword 的值置换成v|_LBIT(_LBIT的值是1)。

自旋锁的逻辑:判断_LockWord.FullWord bit 0 是否是0,如果是0代表没有占有锁,那就尝试去占有锁,通过原子替换置bit0 为1,如果置换成功那么代表拥有锁,没有则进入自旋。

SpinPause ()  函数
在linux_x86 64位机器上 定义了
 .globl SpinPause
        .align 16
        .type  SpinPause,@function
SpinPause:
        rep
        nop
        movq   $1, %rax
        ret

主要在rep, nop 的指令经过编译器后的指令是pause,是用于提高cpu性能的,在官方上描述pase指令是为了避免memory order violation ,有种说法就是cpu是流水线的处理指令的,当原子指令store的时候,而如果有线程同时也在load他的值,那么load 必须等到store 执行成功,这样cpu就无法进行流水线作业了。但我更觉的这是个加强版的nop 也就是多增加几个空的机器周期,一来省电,二来本身spin lock就需要cpu空运行,并且不需要访问内存。

c. SafepointSynchronize::do_call_back()这是一个安全点,提供一个停止自旋锁的切入点,比如vm thread,在做线程dump, 内存 dump的时候,是需要让 自旋锁提前停止的。
 
d. if (Probes > SpinMax) return 0 ; 当大于自旋的次数的时候,自旋自动退出,也就是前面所说的参数 -XX:PreBlockSpin

最后这里还有个比较有意思的方法MarsagliaXORV (rv) ; 是算随机数的,不清楚为什么java让cpu自旋的过程中计算随机数的意义何在,为了不让cpu空转?感觉用spinpase 更合理一点。

相关内容