详解Java多线程编程中LockSupport类的线程阻塞用法

详解Java多线程编程中LockSupport类的线程阻塞用法

LockSupport类是Java6(JSR166-JUC)引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:

  1. public native void unpark(Thread jthread);  
  2. public native void park(boolean isAbsolute, long time);  

isAbsolute参数是指明时间是*对的,还是相对的。

仅仅两个简单的接口,就为上层提供了强大的同步原语。

先来解析下两个函数是做什么的。

unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。

比如线程B连续调用了三次unpark函数,当线程A调用park函数就使用掉这个“许可”,如果线程A再次调用park,则进入等待状态。

注意,unpark函数可以先于park调用。比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行。

实际上,park函数即使没有“许可”,有时也会无理由地返回,这点等下再解析。

park和unpark的灵活之处

上面已经提到,unpark函数可以先于park调用,这个正是它们的灵活之处。

一个线程它有可能在别的线程unPark之前,或者之后,或者同时调用了park,那么因为park的特性,它可以不用担心自己的park的时序问题,否则,如果park必须要在unpark之前,那么给编程带来很大的麻烦!!

考虑一下,两个线程同步,要如何处理?

在Java5里是用wait/notify/notifyAll来同步的。wait/notify机制有个很蛋疼的地方是,比如线程B要用notify通知线程A,那么线程B要确保线程A已经在wait调用上等待了,否则线程A可能永远都在等待。编程的时候就会很蛋疼。

另外,是调用notify,还是notifyAll?

notify只会唤醒一个线程,如果错误地有两个线程在同一个对象上wait等待,那么又悲剧了。为了安全起见,貌似只能调用notifyAll了。

park/unpark模型真正解耦了线程之间的同步,线程之间不再需要一个Object或者其它变量来存储状态,不再需要关心对方的状态。

 

HotSpot里park/unpark的实现

每个java线程都有一个Parker实例,Parker类是这样定义的:

  1. class Parker : public os::PlatformParker {  
  2. private:  
  3.   volatile int _counter ;  
  4.   …
  5. public:  
  6.   void park(bool isAbsolute, jlong time);  
  7.   void unpark();  
  8.   …
  9. }
  10. class PlatformParker : public CHeapObj<mtInternal> {  
  11.   protected:  
  12.     pthread_mutex_t _mutex [1] ;
  13.     pthread_cond_t  _cond  [1] ;
  14.     …
  15. }

可以看到Parker类实际上用Posix的mutex,condition来实现的。在Parker类里的_counter字段,就是用来记录所谓的“许可”的。

当调用park时,先尝试直接能否直接拿到“许可”,即_counter>0时,如果成功,则把_counter设置为0,并返回:

  1. void Parker::park(bool isAbsolute, jlong time) {  
  2.   // Ideally we’d do something useful while spinning, such  
  3.   // as calling unpackTime().  
  4.   // Optional fast-path check:  
  5.   // Return immediately if a permit is available.  
  6.   // We depend on Atomic::xchg() having full barrier semantics  
  7.   // since we are doing a lock-free update to _counter.  
  8.   if (Atomic::xchg(0, &_counter) > 0) return;  

如果不成功,则构造一个ThreadBlockInVM,然后检查_counter是不是>0,如果是,则把_counter设置为0,unlock mutex并返回:

  1. ThreadBlockInVM tbivm(jt);
  2. if (_counter > 0)  { // no wait needed  
  3.   _counter = 0;
  4.   status = pthread_mutex_unlock(_mutex);

否则,再判断等待的时间,然后再调用pthread_cond_wait函数等待,如果等待返回,则把_counter设置为0,unlock mutex并返回:

  1. if (time == 0) {  
  2.   status = pthread_cond_wait (_cond, _mutex) ;
  3. }
  4. _counter = 0 ;
  5. status = pthread_mutex_unlock(_mutex) ;
  6. assert_status(status == 0, status, “invariant”) ;  
  7. OrderAccess::fence();

当unpark时,则简单多了,直接设置_counter为1,再unlock mutext返回。如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程:

  1. void Parker::unpark() {  
  2.   int s, status ;  
  3.   status = pthread_mutex_lock(_mutex);
  4.   assert (status == 0, “invariant”) ;  
  5.   s = _counter;
  6.   _counter = 1;
  7.   if (s < 1) {  
  8.      if (WorkAroundNPTLTimedWaitHang) {  
  9.         status = pthread_cond_signal (_cond) ;
  10.         assert (status == 0, “invariant”) ;  
  11.         status = pthread_mutex_unlock(_mutex);
  12.         assert (status == 0, “invariant”) ;  
  13.      } else {  
  14.         status = pthread_mutex_unlock(_mutex);
  15.         assert (status == 0, “invariant”) ;  
  16.         status = pthread_cond_signal (_cond) ;
  17.         assert (status == 0, “invariant”) ;  
  18.      }
  19.   } else {  
  20.     pthread_mutex_unlock(_mutex);
  21.     assert (status == 0, “invariant”) ;  
  22.   }
  23. }

简而言之,是用mutex和condition保护了一个_counter的变量,当park时,这个变量置为了0,当unpark时,这个变量置为1。
值得注意的是在park函数里,调用pthread_cond_wait时,并没有用while来判断,所以posix condition里的”Spurious wakeup”一样会传递到上层Java的代码里。关于”Spurious wakeup”,参考上一篇

  1. if (time == 0) {  
  2.   status = pthread_cond_wait (_cond, _mutex) ;
  3. }

这也就是为什么Java dos里提到,当下面三种情况下park函数会返回:

  • Some other thread invokes unpark with the current thread as the target; or
  • Some other thread interrupts the current thread; or
  • The call spuriously (that is, for no reason) returns.

相关的实现代码在:

http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/81d815b05abb/src/share/vm/runtime/park.hpp
http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/81d815b05abb/src/share/vm/runtime/park.cpp
http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/81d815b05abb/src/os/linux/vm/os_linux.hpp
http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/81d815b05abb/src/os/linux/vm/os_linux.cpp

其它的一些东东:

Parker类在分配内存时,使用了一个技巧,重载了new函数来实现了cache line对齐。

  1. // We use placement-new to force ParkEvent instances to be  
  2. // aligned on 256-byte address boundaries.  This ensures that the least  
  3. // significant byte of a ParkEvent address is always 0.  
  4. void * operator new (size_t sz) ;  

Parker里使用了一个无锁的队列在分配释放Parker实例:

  1. volatile int Parker::ListLock = 0 ;  
  2. Parker * volatile Parker::FreeList = NULL ;  
  3. Parker * Parker::Allocate (JavaThread * t) {
  4.   guarantee (t != NULL, “invariant”) ;  
  5.   Parker * p ;
  6.   // Start by trying to recycle an existing but unassociated  
  7.   // Parker from the global free list.  
  8.   for (;;) {  
  9.     p = FreeList ;
  10.     if (p  == NULL) break ;  
  11.     // 1: Detach  
  12.     // Tantamount to p = Swap (&FreeList, NULL)  
  13.     if (Atomic::cmpxchg_ptr (NULL, &FreeList, p) != p) {  
  14.        continue ;  
  15.     }
  16.     // We’ve detached the list.  The list in-hand is now  
  17.     // local to this thread.   This thread can operate on the  
  18.     // list without risk of interference from other threads.  
  19.     // 2: Extract — pop the 1st element from the list.  
  20.     Parker * List = p->FreeNext ;
  21.     if (List == NULL) break ;  
  22.     for (;;) {  
  23.         // 3: Try to reattach the residual list  
  24.         guarantee (List != NULL, “invariant”) ;  
  25.         Parker * Arv =  (Parker *) Atomic::cmpxchg_ptr (List, &FreeList, NULL) ;
  26.         if (Arv == NULL) break ;  
  27.         // New nodes arrived.  Try to detach the recent arrivals.  
  28.         if (Atomic::cmpxchg_ptr (NULL, &FreeList, Arv) != Arv) {  
  29.             continue ;  
  30.         }
  31.         guarantee (Arv != NULL, “invariant”) ;  
  32.         // 4: Merge Arv into List  
  33.         Parker * Tail = List ;
  34.         while (Tail->FreeNext != NULL) Tail = Tail->FreeNext ;  
  35.         Tail->FreeNext = Arv ;
  36.     }
  37.     break ;  
  38.   }
  39.   if (p != NULL) {  
  40.     guarantee (p->AssociatedWith == NULL, “invariant”) ;  
  41.   } else {  
  42.     // Do this the hard way — materialize a new Parker..  
  43.     // In rare cases an allocating thread might detach  
  44.     // a long list — installing null into FreeList –and  
  45.     // then stall.  Another thread calling Allocate() would see  
  46.     // FreeList == null and then invoke the ctor.  In this case we  
  47.     // end up with more Parkers in circulation than we need, but  
  48.     // the race is rare and the outcome is benign.  
  49.     // Ideally, the # of extant Parkers is equal to the  
  50.     // maximum # of threads that existed at any one time.  
  51.     // Because of the race mentioned above, segments of the  
  52.     // freelist can be transiently inaccessible.  At worst  
  53.     // we may end up with the # of Parkers in circulation  
  54.     // slightly above the ideal.  
  55.     p = new Parker() ;  
  56.   }
  57.   p->AssociatedWith = t ;          // Associate p with t  
  58.   p->FreeNext       = NULL ;
  59.   return p ;  
  60. }
  61. void Parker::Release (Parker * p) {  
  62.   if (p == NULL) return ;  
  63.   guarantee (p->AssociatedWith != NULL, “invariant”) ;  
  64.   guarantee (p->FreeNext == NULL      , “invariant”) ;  
  65.   p->AssociatedWith = NULL ;
  66.   for (;;) {  
  67.     // Push p onto FreeList  
  68.     Parker * List = FreeList ;
  69.     p->FreeNext = List ;
  70.     if (Atomic::cmpxchg_ptr (p, &FreeList, List) == List) break ;  
  71.   }
  72. }

总结与扯谈

JUC(Java Util Concurrency)仅用简单的park, unpark和CAS指令就实现了各种高级同步数据结构,而且效率很高,令人惊叹。

在C++程序员各种自制轮子的时候,Java程序员则有很丰富的并发数据结构,如lock,latch,queue,map等信手拈来。

要知道像C++直到C++11才有标准的线程库,同步原语,但离高级的并发数据结构还有很远。boost库有提供一些线程,同步相关的类,但也是很简单的。Intel的tbb有一些高级的并发数据结构,但是国内boost都用得少,更别说tbb了。

*开始研究无锁算法的是C/C++程序员,但是后来很多Java程序员,或者类库开始自制各种高级的并发数据结构,经常可以看到有分析Java并发包的文章。反而C/C++程序员总是在分析无锁的队列算法。高级的并发数据结构,比如并发的HashMap,没有看到有相关的实现或者分析的文章。在C++11之后,这种情况才有好转。

因为正确高效实现一个Concurrent Hash Map是很困难的,要对内存CPU有深刻的认识,而且还要面对CPU不断升级带来的各种坑。

我认为真正值得信赖的C++并发库,只有Intel的tbb和微软的PPL。

 

详解Java多线程编程中LockSupport

详解Java多线程编程中LockSupport

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。
因为park() 和 unpark()有许可的存在;调用 park() 的线程和另一个试图将其 unpark() 的线程之间的竞争将保持活性。

基本用法
LockSupport 很类似于二元信号量(只有1个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继 续 执行;如果许可已经被占用,当前线 程阻塞,等待获取许可。

1
2
3
4
5
public static void main(String[] args)
{
   LockSupport.park();
   System.out.println("block.");
}

运行该代码,可以发现主线程一直处于阻塞状态。因为 许可默认是被占用的 ,调用park()时获取不到许可,所以进入阻塞状态。

如下代码:先释放许可,再获取许可,主线程能够正常终止。LockSupport许可的获取和释放,一般来说是对应的,如果多次unpark,只有一次park也不会出现什么问题,结果是许可处于可用状态。

1
2
3
4
5
6
7
public static void main(String[] args)
{
   Thread thread = Thread.currentThread();
   LockSupport.unpark(thread);//释放许可
   LockSupport.park();// 获取许可
   System.out.println("b");
}

LockSupport是可不重入 的,如果一个线程连续2次调用 LockSupport .park(),那么该线程一定会一直阻塞下去。

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws Exception
{
 Thread thread = Thread.currentThread();
 
 LockSupport.unpark(thread);
 
 System.out.println("a");
 LockSupport.park();
 System.out.println("b");
 LockSupport.park();
 System.out.println("c");
}

这段代码打印出a和b,不会打印c,因为第二次调用park的时候,线程无法获取许可出现死锁。

下面我们来看下LockSupport对应中断的响应性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void t2() throws Exception
{
 Thread t = new Thread(new Runnable()
 {
  private int count = 0;
  @Override
  public void run()
  {
   long start = System.currentTimeMillis();
   long end = 0;
   while ((end - start) <= 1000)
   {
    count++;
    end = System.currentTimeMillis();
   }
   System.out.println("after 1 second.count=" + count);
  //等待或许许可
   LockSupport.park();
   System.out.println("thread over." + Thread.currentThread().isInterrupted());
  }
 });
 t.start();
 Thread.sleep(2000);
 // 中断线程
 t.interrupt();
 
 System.out.println("main over");
}

*终线程会打印出thread over.true。这说明 线程如果因为调用park而阻塞的话,能够响应中断请求(中断状态被设置成true),但是不会抛出InterruptedException 。

LockSupport函数列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 返回提供给*近一次尚未解除阻塞的 park 方法调用的 blocker 对象,如果该调用不受阻塞,则返回 null。
static Object getBlocker(Thread t)
// 为了线程调度,禁用当前线程,除非许可可用。
static void park()
// 为了线程调度,在许可可用之前禁用当前线程。
static void park(Object blocker)
// 为了线程调度禁用当前线程,*多等待指定的等待时间,除非许可可用。
static void parkNanos(long nanos)
// 为了线程调度,在许可可用前禁用当前线程,并*多等待指定的等待时间。
static void parkNanos(Object blocker, long nanos)
// 为了线程调度,在指定的时限前禁用当前线程,除非许可可用。
static void parkUntil(long deadline)
// 为了线程调度,在指定的时限前禁用当前线程,除非许可可用。
static void parkUntil(Object blocker, long deadline)
// 如果给定线程的许可尚不可用,则使其可用。
static void unpark(Thread thread)


LockSupport示例
对比下面的“示例1”和“示例2”可以更清晰的了解LockSupport的用法。
示例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class WaitTest1 {
  public static void main(String[] args) {
    ThreadA ta = new ThreadA("ta");
    synchronized(ta) { // 通过synchronized(ta)获取“对象ta的同步锁”
      try {
        System.out.println(Thread.currentThread().getName()+" start ta");
        ta.start();
        System.out.println(Thread.currentThread().getName()+" block");
        // 主线程等待
        ta.wait();
        System.out.println(Thread.currentThread().getName()+" continue");
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  static class ThreadA extends Thread{
    public ThreadA(String name) {
      super(name);
    }
    public void run() {
      synchronized (this) { // 通过synchronized(this)获取“当前对象的同步锁”
        System.out.println(Thread.currentThread().getName()+" wakup others");
        notify();  // 唤醒“当前对象上的等待线程”
      }
    }
  }
}

示例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest1 {
  private static Thread mainThread;
  public static void main(String[] args) {
    ThreadA ta = new ThreadA("ta");
    // 获取主线程
    mainThread = Thread.currentThread();
    System.out.println(Thread.currentThread().getName()+" start ta");
    ta.start();
    System.out.println(Thread.currentThread().getName()+" block");
    // 主线程阻塞
    LockSupport.park(mainThread);
    System.out.println(Thread.currentThread().getName()+" continue");
  }
  static class ThreadA extends Thread{
    public ThreadA(String name) {
      super(name);
    }
    public void run() {
      System.out.println(Thread.currentThread().getName()+" wakup others");
      // 唤醒“主线程”
      LockSupport.unpark(mainThread);
    }
  }
}

运行结果:

1
2
3
4
main start ta
main block
ta wakup others
main continue

说明:park和wait的区别。wait让线程阻塞前,必须通过synchronized获取同步锁。

LockSupport的park和unpark的基本使用,以及对线程中断的响应性

LockSupport的park和unpark的基本使用,以及对线程中断的响应性

  LockSupport是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语。java锁和同步器框架的核心AQS:AbstractQueuedSynchronizer,就是通过调用LockSupport.park()和LockSupport.unpark()实现线程的阻塞和唤醒的。LockSupport很类似于二元信号量(只有1个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继续执行;如果许可已经被占用,当前线程阻塞,等待获取许可。

  1. public static void main(String[] args)  
  2. {
  3.      LockSupport.park();
  4.      System.out.println(“block.”);  
  5. }

运行该代码,可以发现主线程一直处于阻塞状态。因为许可默认是被占用的,调用park()时获取不到许可,所以进入阻塞状态。

 

如下代码:先释放许可,再获取许可,主线程能够正常终止。LockSupport许可的获取和释放,一般来说是对应的,如果多次unpark,只有一次park也不会出现什么问题,结果是许可处于可用状态。

  1. public static void main(String[] args)  
  2. {
  3.      Thread thread = Thread.currentThread();
  4.      LockSupport.unpark(thread);//释放许可  
  5.      LockSupport.park();// 获取许可  
  6.      System.out.println(“b”);  
  7. }

 

LockSupport是不可重入的,如果一个线程连续2次调用LockSupport.park(),那么该线程一定会一直阻塞下去。

 

  1. public static void main(String[] args) throws Exception  
  2. {
  3.     Thread thread = Thread.currentThread();
  4.     LockSupport.unpark(thread);
  5.     System.out.println(“a”);  
  6.     LockSupport.park();
  7.     System.out.println(“b”);  
  8.     LockSupport.park();
  9.     System.out.println(“c”);  
  10. }

这段代码打印出a和b,不会打印c,因为第二次调用park的时候,线程无法获取许可出现死锁。

 

下面我们来看下LockSupport对应中断的响应性

 

  1. public static void t2() throws Exception  
  2. {
  3.     Thread t = new Thread(new Runnable()  
  4.     {
  5.         private int count = 0;  
  6.         @Override  
  7.         public void run()  
  8.         {
  9.             long start = System.currentTimeMillis();  
  10.             long end = 0;  
  11.             while ((end – start) <= 1000)  
  12.             {
  13.                 count++;
  14.                 end = System.currentTimeMillis();
  15.             }
  16.             System.out.println(“after 1 second.count=” + count);  
  17.         //等待或许许可  
  18.             LockSupport.park();
  19.             System.out.println(“thread over.” + Thread.currentThread().isInterrupted());  
  20.         }
  21.     });
  22.     t.start();
  23.     Thread.sleep(2000);  
  24.     // 中断线程  
  25.     t.interrupt();
  26.     System.out.println(“main over”);  
  27. }

*终线程会打印出thread over.true。这说明线程如果因为调用park而阻塞的话,能够响应中断请求(中断状态被设置成true),但是不会抛出InterruptedException

Java并发编程之锁机制之LockSupport工具

LockSupport类

了解线程的阻塞和唤醒,我们需要查看LockSupport类。具体代码如下:

  1. public class LockSupport {
  2. private LockSupport() {} // Cannot be instantiated.
  3. private static void setBlocker(Thread t, Object arg) {
  4. U.putObject(t, PARKBLOCKER, arg);
  5. }
  6. public static void unpark(Thread thread) {
  7. if (thread != null)
  8. U.unpark(thread);
  9. }
  10. public static void park(Object blocker) {
  11. Thread t = Thread.currentThread();
  12. setBlocker(t, blocker);
  13. U.park(false, 0L);
  14. setBlocker(t, null);
  15. }
  16. public static void parkNanos(Object blocker, long nanos) {
  17. if (nanos > 0) {
  18. Thread t = Thread.currentThread();
  19. setBlocker(t, blocker);
  20. U.park(false, nanos);
  21. setBlocker(t, null);
  22. }
  23. }
  24. public static void parkUntil(Object blocker, long deadline) {
  25. Thread t = Thread.currentThread();
  26. setBlocker(t, blocker);
  27. U.park(true, deadline);
  28. setBlocker(t, null);
  29. }
  30. public static Object getBlocker(Thread t) {
  31. if (t == null)
  32. throw new NullPointerException();
  33. return U.getObjectVolatile(t, PARKBLOCKER);
  34. }
  35. public static void park() {
  36. U.park(false, 0L);
  37. }
  38. public static void parkNanos(long nanos) {
  39. if (nanos > 0)
  40. U.park(false, nanos);
  41. }
  42. public static void parkUntil(long deadline) {
  43. U.park(true, deadline);
  44. }
  45. //省略部分代码
  46. private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
  47. private static final long PARKBLOCKER;
  48. private static final long SECONDARY;
  49. static {
  50. try {
  51. PARKBLOCKER = U.objectFieldOffset
  52. (Thread.class.getDeclaredField(“parkBlocker”));
  53. SECONDARY = U.objectFieldOffset
  54. (Thread.class.getDeclaredField(“threadLocalRandomSecondarySeed”));
  55. } catch (ReflectiveOperationException e) {
  56. throw new Error(e);
  57. }
  58. }
  59. }
  60. 复制代码

从上面的代码中,我们可以知道LockSupport中的对外提供的方法都是静态方法。这些方法提供了*基本的线程阻塞和唤醒功能,在LockSupport类中定义了一组以park开头的方法用来阻塞当前线程。以及unPark(Thread thread)方法来唤醒一个被阻塞的线程。关于park开头的方法具体描述如下表所示:

 

park.png

 

 

其中park(Object blocker)parkNanos(Object blocker, long nanos)parkUntil(Object blocker, long deadline)三个方法是Java 6中新增加的方法。其中参数blocker是用来标识当前线程等待的对象(下文简称为阻塞对象),该对象主要用于问题排查和系统监控

由于在Java 5之前,当线程阻塞时(使用synchronized关键字)在一个对象上时,通过线程dump能够查看到该线程的阻塞对象。方便问题定位,而Java 5退出的Lock等并发工具却遗漏了这一点,致使在线程dump时无法提供阻塞对象的信息。因此,在Java 6中,LockSupport新增了含有阻塞对象的park方法。用以替代原有的park方法。

LockSupport中的blocker

可能有很多读者对Blocker的原理有点好奇,既然线程都被阻塞了,是通过什么办法将阻塞对象设置到线程中去的呢? 不急不急,我们继续查看含有阻塞对象(Object blocker)的park方法。 我们发现内部都调用了setBlocker(Thread t, Object arg)方法。具体代码如下所示:

  1. private static void setBlocker(Thread t, Object arg) {
  2. U.putObject(t, PARKBLOCKER, arg);
  3. }
  4. 复制代码

其中 U为sun.misc.包下的Unsafe类。而其中的PARKBLOCKER是在静态代码块中进行赋值的,也就是如下代码:

  1. private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
  2. static {
  3. try {
  4. PARKBLOCKER = U.objectFieldOffset
  5. (Thread.class.getDeclaredField(“parkBlocker”));
  6. //省略部分代码
  7. } catch (ReflectiveOperationException e) {
  8. throw new Error(e);
  9. }
  10. }
  11. 复制代码

Thread.class.getDeclaredField("parkBlocker")方法其实很好理解,就是获取线程中的parkBlocker字段。如果有则返回其对应的Field字段,如果没有则抛出NoSuchFieldException异常。那么关于Unsafe中的objectFieldOffset(Field f)方法怎么理解呢?

在描述该方法之前,需要给大家讲一个知识点。在JVM中,可以自由选择如何实现Java对象的"布局",也就Java对象的各个部分分别放在内存那个地方,JVM是可以感知和决定的。 在sun.misc.Unsafe中提供了objectFieldOffset()方法用于获取某个字段相对 Java对象的“起始地址”的偏移量,也提供了getInt、getLong、getObject之类的方法可以使用前面获取的偏移量来访问某个Java 对象的某个字段。

有可能大家理解起来比较困难,这里给大家画了一个图,帮助大家理解,具体如下图所示:

blocker.png

 

 

在上图中,我们创建了两个Thread对象,其中Thread对象1在内存中分配的地址为0x10000-0x10100,Thread对象2在内存中分配的地址为0x11000-0x11100,其中parkBlocker对应内存偏移量为2(这里我们假设相对于其对象的“起始位置”的偏移量为2)。那么通过objectFieldOffset(Field f)就能获取该字段的偏移量。需要注意的是某字段在其类中的内存偏移量总是相同的,也就是对于Thread对象1与Thread对象2,parkBlocker字段在其对象所在的内存偏移量始终是相同的。

那么我们再回到setBlocker(Thread t, Object arg)方法,当我们获取到parkBlocker字段在其对象内存偏移量后, 接着会调用U.putObject(t, PARKBLOCKER, arg);,该方法有三个参数,*个参数是操作对象,第二个参数是内存偏移量,第三个参数是实际存储值。该方法理解起来也很简单,就是操作某个对象中某个内存地址下的数据。那么结合我们上面所讲的。该方法的实际操作结果如下图所示:

blocker_set.png

 

 

到现在,我们就应该懂了,尽管当前线程已经阻塞,但是我们还是能直接操控线程中实际存储该字段的内存区域来达到我们想要的结果。

LockSupport底层代码实现

通过阅读源代码我们可以发现,LockSupport中关于线程的阻塞和唤醒,主要调用的是sun.misc.Unsafe 中的park(boolean isAbsolute, long time)unpark(Object thread)方法,也就是如下代码:

  1. private static final jdk.internal.misc.Unsafe theInternalUnsafe =
  2. jdk.internal.misc.Unsafe.getUnsafe();
  3. public void park(boolean isAbsolute, long time) {
  4. theInternalUnsafe.park(isAbsolute, time);
  5. }
  6. public void unpark(Object thread) {
  7. theInternalUnsafe.unpark(thread);
  8. }
  9. 复制代码

查看sun.misc.包下的Unsafe.java文件我们可以看出,内部其实调用的是jdk.internal.misc.Unsafe中的方法。继续查看jdk.internal.misc.中的Unsafe.java中对应的方法:

  1. @HotSpotIntrinsicCandidate
  2. public native void unpark(Object thread);
  3. @HotSpotIntrinsicCandidate
  4. public native void park(boolean isAbsolute, long time);
  5. 复制代码

通过查看方法,我们可以得出*终调用的是JVM中的方法,也就是会调用hotspot.share.parims包下的unsafe.cpp中的方法。继续跟踪。

  1. UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
  2. //省略部分代码
  3. thread->parker()->park(isAbsolute != 0, time);
  4. //省略部分代码
  5. } UNSAFE_END
  6. UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) {
  7. Parker* p = NULL;
  8. //省略部分代码
  9. if (p != NULL) {
  10. HOTSPOT_THREAD_UNPARK((uintptr_t) p);
  11. p->unpark();
  12. }
  13. } UNSAFE_END
  14. 复制代码

通过观察代码我们发现,线程的阻塞和唤醒其实是与hotspot.share.runtime中的Parker类相关。我们继续查看:

  1. class Parker : public os::PlatformParker {
  2. private:
  3. volatile int _counter ;//该变量非常重要,下文我们会具体描述
  4. //省略部分代码
  5. protected:
  6. ~Parker() { ShouldNotReachHere(); }
  7. public:
  8. // For simplicity of interface with Java, all forms of park (indefinite,
  9. // relative, and absolute) are multiplexed into one call.
  10. void park(bool isAbsolute, jlong time);
  11. void unpark();
  12. //省略部分代码
  13. }
  14. 复制代码

在上述代码中,volatile int _counter该字段的值非常重要,一定要注意其用volatile修饰(在下文中会具体描述,接着当我们通过SourceInsight工具(推荐大家阅读代码时,使用该工具)点击其park与unpark方法时,我们会得到如下界面:

 

parker.png

 

 

从图中红色矩形中我们可也看出,针对线程的阻塞和唤醒,不同操作系统有着不同的实现。众所周知Java是跨平台的。针对不同的平台,做出不同的处理。也是非常理解的。因为作者对windows与solaris操作系统不是特别了解。所以这里我选择对Linux下的平台下进行分析。也就是选择hotspot.os.posix包下的os_posix.cpp文件进行分析。

Linux下的park实现

为了方便大家理解Linux下的阻塞实现,在实际代码中我省略了一些不重要的代码,具体如下图所示:

  1. void Parker::park(bool isAbsolute, jlong time) {
  2. //(1)如果_counter的值大于0,那么直接返回
  3. if (Atomic::xchg(0, &_counter) > 0) return;
  4. //获取当前线程
  5. Thread* thread = Thread::current();
  6. JavaThread *jt = (JavaThread *)thread;
  7. //(2)如果当前线程已经中断,直接返回。
  8. if (Thread::is_interrupted(thread, false)) {
  9. return;
  10. }
  11. //(3)判断时间,如果时间小于0,或者在*对时间情况下,时间为0直接返回
  12. struct timespec absTime;
  13. if (time < 0 || (isAbsolute && time == 0)) { // don’t wait at all
  14. return;
  15. }
  16. //如果时间大于0,判断阻塞超时时间或阻塞截止日期,同时将时间赋值给absTime
  17. if (time > 0) {
  18. to_abstime(&absTime, time, isAbsolute);
  19. }
  20. //(4)如果当前线程已经中断,或者申请互斥锁失败,则直接返回
  21. if (Thread::is_interrupted(thread, false) ||
  22. pthread_mutex_trylock(_mutex) != 0) {
  23. return;
  24. }
  25. //(5)如果是时间等于0,那么就直接阻塞线程,
  26. if (time == 0) {
  27. _cur_index = REL_INDEX; // arbitrary choice when not timed
  28. status = pthread_cond_wait(&_cond[_cur_index], _mutex);
  29. assert_status(status == 0, status, “cond_timedwait”);
  30. }
  31. //(6)根据absTime之前计算的时间,阻塞线程相应时间
  32. else {
  33. _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
  34. status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
  35. assert_status(status == 0 || status == ETIMEDOUT,
  36. status, “cond_timedwait”);
  37. }
  38. //省略部分代码
  39. //(7)当线程阻塞超时,或者到达截止日期时,直接唤醒线程
  40. _counter = 0;
  41. status = pthread_mutex_unlock(_mutex);
  42. //省略部分代码
  43. }
  44. 复制代码

从整个代码来看其实关于Linux下的park方法分为以下七个步骤:

  • (1)调用Atomic::xchg方法,将_counter的值赋值为0,其方法的返回值为之前_counter的值,如果返回值大于0(因为有其他线程操作过_counter的值,也就是其他线程调用过unPark方法),那么就直接返回。
  • (2)如果当前线程已经中断,直接返回。也就是说如果当前线程已经中断了,那么调用park()方法来阻塞线程就会无效。
  • (3) 判断其设置的时间是否合理,如果合理,判断阻塞超时时间阻塞截止日期,同时将时间赋值给absTime
  • (4) 在实际对线程进行阻塞前,再一次判断如果当前线程已经中断,或者申请互斥锁失败,则直接返回
  • (5) 如果是时间等于0(时间为0,表示一直阻塞线程,除非调用unPark方法唤醒),那么就直接阻塞线程,
  • (6)根据absTime之前计算的时间,并调用pthread_cond_timedwait方法阻塞线程相应的时间。
  • (7) 当线程阻塞相应时间后,通过pthread_mutex_unlock方法直接唤醒线程,同时将_counter赋值为0。

因为关于Linux的阻塞涉及到其内部函数,这里将用到的函数都进行了声明。大家可以根据下表所介绍的方法进行理解。具体方法如下表所示:

linux方法.png

 

 

Linux下的unpark实现

在了解了Linux的park实现后,再来理解Linux的唤醒实现就非常简单了,查看相应方法:

  1. void Parker::unpark() {
  2. int status = pthread_mutex_lock(_mutex);
  3. assert_status(status == 0, status, “invariant”);
  4. const int s = _counter;
  5. //将_counter的值赋值为1
  6. _counter = 1;
  7. // must capture correct index before unlocking
  8. int index = _cur_index;
  9. status = pthread_mutex_unlock(_mutex);
  10. assert_status(status == 0, status, “invariant”);
  11. //省略部分代码
  12. }
  13. 复制代码

其实从代码整体逻辑来讲,*终唤醒其线程的方法为pthread_mutex_unlock(_mutex)(关于该函数的作用,我已经在上表进行介绍了。大家可以参照Linux下的park实现中的图表进行理解)。同时将_counter的值赋值为1, 那么结合我们上文所讲的park(将线程进行阻塞)方法,那么我们可以得知整个线程的唤醒与阻塞,在Linux系统下,其实是受到Parker类中的_counter的值的影响的

LockSupport的使用

现在我们基本了解了LockSupport的基本原理。现在我们来看看它的基本使用吧。在例子中,为了方便大家顺便弄清blocker的作用,这里我调用了带blocker的park方法。具体代码如下所示:

  1. class LockSupportDemo {
  2. public static void main(String[] args) throws InterruptedException {
  3. Thread a = new Thread(new Runnable() {
  4. @Override
  5. public void run() {
  6. LockSupport.park(“线程a的blocker数据”);
  7. System.out.println(“我是被线程b唤醒后的操作”);
  8. }
  9. });
  10. a.start();
  11. //让当前主线程睡眠1秒,保证线程a在线程b之前执行
  12. Thread.sleep(1000);
  13. Thread b = new Thread(new Runnable() {
  14. @Override
  15. public void run() {
  16. String before = (String) LockSupport.getBlocker(a);
  17. System.out.println(“阻塞时从线程a中获取的blocker——>” + before);
  18. LockSupport.unpark(a);
  19. //这里睡眠是,保证线程a已经被唤醒了
  20. try {
  21. Thread.sleep(1000);
  22. String after = (String) LockSupport.getBlocker(a);
  23. System.out.println(“唤醒时从线程a中获取的blocker——>” + after);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. });
  29. b.start();
  30. }
  31. }
  32. 复制代码

代码中,创建了两个线程,线程a与线程b(线程a优先运行与线程b),在线程a中,通过调用LockSupport.park("线程a的blocker数据");给线程a设置了一个String类型的blocker,当线程a运行的时候,直接将线程a阻塞。在线程b中,先会获取线程a中的blocker,打印输出后。再通过LockSupport.unpark(a);唤醒线程a。当唤醒线程a后。*后输出并打印线程a中的blocker。 实际代码运行结果如下:

  1. 阻塞时从线程a中获取的blocker——>线程a的blocker数据
  2. 我是被线程b唤醒后的操作
  3. 唤醒时从线程a中获取的blocker——>null
  4. 复制代码

从结果中,我们可以看出,线程a被阻塞时,后续就不会再进行操作了。当线程a被线程b唤醒后。之前设置的blocker也变为null了。同时如果在线程a中park语句后还有额外的操作。那么会继续运行。关于为毛之前的blocker之前变为null,具体原因如下:

  1. public static void park(Object blocker) {
  2. Thread t = Thread.currentThread();
  3. setBlocker(t, blocker);
  4. U.park(false, 0L);//当线程被阻塞时,会阻塞在这里
  5. setBlocker(t, null);//线程被唤醒时,会将blocer置为null
  6. }
  7. 复制代码

通过上述例子,我们完全知道了blocker可以在线程阻塞的时候,获取数据。也就证明了当我们对线程进行问题排查和系统监控的时候blocker的有着非常重要的作用。