博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
深入理解Java中的底层阻塞原理及实现
阅读量:6356 次
发布时间:2019-06-23

本文共 11911 字,大约阅读时间需要 39 分钟。

谈到阻塞,相信大家都不会陌生了。阻塞的应用场景真的多得不要不要的,比如 生产-消费模式,限流统计等等。什么 ArrayBlockingQueue、 LinkedBlockingQueue、DelayQueue 等等,都是阻塞队列的实现啊,多简单!

阻塞,一般有两个特性很亮眼:1. 不耗 CPU 等待;2. 线程安全;

额,要这么说也 OK 的。毕竟,我们遇到的问题,到这里就够解决了。但是有没有想过,这容器的阻塞又是如何实现的呢?

好吧,翻开源码,也很简单了:(比如 ArrayBlockingQueue 的 take、put….)

// ArrayBlockingQueue

/**

  • Inserts the specified element at the tail of this queue, waiting

  • for space to become available if the queue is full.

  • @throws InterruptedException {@inheritDoc}

  • @throws NullPointerException {@inheritDoc}

    */

public void put(E e) throws InterruptedException {

checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {    while (count == items.length)        // 阻塞的点        notFull.await();    enqueue(e);} finally {    lock.unlock();}

}

/**

  • Inserts the specified element at the tail of this queue, waiting

  • up to the specified wait time for space to become available if

  • the queue is full.

  • @throws InterruptedException {@inheritDoc}

  • @throws NullPointerException {@inheritDoc}

    */

public boolean offer(E e, long timeout, TimeUnit unit)

throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {    while (count == items.length) {        if (nanos <= 0)            return false;        // 阻塞的点        nanos = notFull.awaitNanos(nanos);    }    enqueue(e);    return true;} finally {    lock.unlock();}

}

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {    while (count == 0)        // 阻塞的点        notEmpty.await();    return dequeue();} finally {    lock.unlock();}

}

看来,最终都是依赖了 AbstractQueuedSynchronizer 类(著名的AQS)的 await 方法,看起来像那么回事。那么这个同步器的阻塞又是如何实现的呢?

Java的代码总是好跟踪的:

// AbstractQueuedSynchronizer.await()

/**

  • Implements interruptible condition wait.

  • <ol>

  • <li> If current thread is interrupted, throw InterruptedException.

  • <li> Save lock state returned by {@link #getState}.

  • <li> Invoke {@link #release} with saved state as argument,

  • throwing IllegalMonitorStateException if it fails.

  • <li> Block until signalled or interrupted.

  • <li> Reacquire by invoking specialized version of

  • {@link #acquire} with saved state as argument.

  • <li> If interrupted while blocked in step 4, throw InterruptedException.

  • </ol>

    */

public final void await() throws InterruptedException {

if (Thread.interrupted())    throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {    // 此处进行真正的阻塞    LockSupport.park(this);    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)        break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)    interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelled    unlinkCancelledWaiters();if (interruptMode != 0)    reportInterruptAfterWait(interruptMode);

}

如上,可以看到,真正的阻塞工作又转交给了另一个工具类: LockSupport 的 park 方法了,这回跟锁扯上了关系,看起来已经越来越接近事实了:

// LockSupport.park()

/**

  • Disables the current thread for thread scheduling purposes unless the

  • permit is available.

  • <p>If the permit is available then it is consumed and the call returns

  • immediately; otherwise

  • the current thread becomes disabled for thread scheduling

  • purposes and lies dormant until one of three things happens:

  • <ul>

  • <li>Some other thread invokes {@link #unpark unpark} with the

  • current thread as the target; or

  • <li>Some other thread {@linkplain Thread#interrupt interrupts}

  • the current thread; or

  • <li>The call spuriously (that is, for no reason) returns.

  • </ul>

  • <p>This method does <em>not</em> report which of these caused the

  • method to return. Callers should re-check the conditions which caused

  • the thread to park in the first place. Callers may also determine,

  • for example, the interrupt status of the thread upon return.

  • @param blocker the synchronization object responsible for this

  • thread parking

  • @since 1.6

    */

public static void park(Object blocker) {

Thread t = Thread.currentThread();setBlocker(t, blocker);UNSAFE.park(false, 0L);setBlocker(t, null);

}

看得出来,这里的实现就比较简洁了,先获取当前线程,设置阻塞对象,阻塞,然后解除阻塞。

好吧,到底什么是真正的阻塞,我们还是不得而知!

UNSAFE.park(false, 0L); 是个什么东西? 看起来就是这一句起到了最关键的作用呢!但由于这里已经是 native 代码,我们已经无法再简单的查看源码了!那咋整呢?

那不行就看C/C++的源码呗,看一下 parker 的定义(park.hpp):

class Parker : public os::PlatformParker {

private:

volatile int _counter ;

Parker * FreeNext ;

JavaThread * AssociatedWith ; // Current association

public:

Parker() : PlatformParker() {

_counter       = 0 ;FreeNext       = NULL ;AssociatedWith = NULL ;

}

protected:

~Parker() { ShouldNotReachHere(); }

public:

// For simplicity of interface with Java, all forms of park (indefinite,

// relative, and absolute) are multiplexed into one call. c中暴露出两个方法给java调用

void park(bool isAbsolute, jlong time);

void unpark();

// Lifecycle operators

static Parker Allocate (JavaThread t) ;

static void Release (Parker * e) ;

private:

static Parker * volatile FreeList ;

static volatile int ListLock ;

};

那 park() 方法到底是如何实现的呢? 其实是继承的 os::PlatformParker 的功能,也就是平台相关的私有实现,以 Linux 平台实现为例(os_linux.hpp):

// Linux中的parker定义

class PlatformParker : public CHeapObj<mtInternal> {

protected:

enum {    REL_INDEX = 0,    ABS_INDEX = 1};int _cur_index;  // which cond is in use: -1, 0, 1pthread_mutex_t _mutex [1] ;pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.

public: // TODO-FIXME: make dtor private

~PlatformParker() { guarantee (0, "invariant") ; }

public:

PlatformParker() {  int status;  status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());  assert_status(status == 0, status, "cond_init rel");  status = pthread_cond_init (&_cond[ABS_INDEX], NULL);  assert_status(status == 0, status, "cond_init abs");  status = pthread_mutex_init (_mutex, NULL);  assert_status(status == 0, status, "mutex_init");  _cur_index = -1; // mark as unused}

};

看到 park.cpp 中没有重写 park() 和 unpark() 方法,也就是说阻塞实现完全交由特定平台代码处理了(os_linux.cpp):

// park方法的实现,依赖于 _counter, _mutex[1], _cond[2]

void Parker::park(bool isAbsolute, jlong time) {

// Ideally we'd do something useful while spinning, such

// as calling unpackTime().

// Optional fast-path check:

// Return immediately if a permit is available.

// We depend on Atomic::xchg() having full barrier semantics

// since we are doing a lock-free update to _counter.

if (Atomic::xchg(0, &_counter) > 0) return;

Thread* thread = Thread::current();

assert(thread->is_Java_thread(), "Must be JavaThread");

JavaThread jt = (JavaThread )thread;

// Optional optimization -- avoid state transitions if there's an interrupt pending.

// Check interrupt before trying to wait

if (Thread::is_interrupted(thread, false)) {

return;

}

// Next, demultiplex/decode time arguments

timespec absTime;

if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all

return;

}

if (time > 0) {

unpackTime(&absTime, isAbsolute, time);

}

// Enter safepoint region

// Beware of deadlocks such as 6317397.

// The per-thread Parker:: mutex is a classic leaf-lock.

// In particular a thread must never block on the Threads_lock while

// holding the Parker:: mutex. If safepoints are pending both the

// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.

ThreadBlockInVM tbivm(jt);

// Don't wait if cannot get lock since interference arises from

// unblocking. Also. check interrupt before trying wait

if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {

return;

}

int status ;

if (_counter > 0) { // no wait needed

_counter = 0;status = pthread_mutex_unlock(_mutex);assert (status == 0, "invariant") ;// Paranoia to ensure our locked and lock-free paths interact// correctly with each other and Java-level accesses.OrderAccess::fence();return;

}

#ifdef ASSERT

// Don't catch signals while blocked; let the running threads have the signals.

// (This allows a debugger to break into the running thread.)

sigset_t oldsigs;

sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();

pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);

#endif

OSThreadWaitState osts(thread->osthread(), false / not Object.wait() /);

jt->set_suspend_equivalent();

// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

assert(_cur_index == -1, "invariant");

if (time == 0) {

_cur_index = REL_INDEX; // arbitrary choice when not timedstatus = pthread_cond_wait (&_cond[_cur_index], _mutex) ;

} else {

_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;if (status != 0 && WorkAroundNPTLTimedWaitHang) {  pthread_cond_destroy (&_cond[_cur_index]) ;  pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());}

}

_cur_index = -1;

assert_status(status == 0 || status == EINTR ||

status == ETIME || status == ETIMEDOUT,            status, "cond_timedwait");

#ifdef ASSERT

pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);

#endif

_counter = 0 ;

status = pthread_mutex_unlock(_mutex) ;

assert_status(status == 0, status, "invariant") ;

// Paranoia to ensure our locked and lock-free paths interact

// correctly with each other and Java-level accesses.

OrderAccess::fence();

// If externally suspended while waiting, re-suspend

if (jt->handle_special_suspend_equivalent_condition()) {

jt->java_suspend_self();

}

}

// unpark 实现,相对简单些

void Parker::unpark() {

int s, status ;

status = pthread_mutex_lock(_mutex);

assert (status == 0, "invariant") ;

s = _counter;

_counter = 1;

if (s < 1) {

// thread might be parkedif (_cur_index != -1) {  // thread is definitely parked  if (WorkAroundNPTLTimedWaitHang) {    status = pthread_cond_signal (&_cond[_cur_index]);    assert (status == 0, "invariant");    status = pthread_mutex_unlock(_mutex);    assert (status == 0, "invariant");  } else {    // must capture correct index before unlocking    int index = _cur_index;    status = pthread_mutex_unlock(_mutex);    assert (status == 0, "invariant");    status = pthread_cond_signal (&_cond[index]);    assert (status == 0, "invariant");  }} else {  pthread_mutex_unlock(_mutex);  assert (status == 0, "invariant") ;}

} else {

pthread_mutex_unlock(_mutex);assert (status == 0, "invariant") ;

}

}

从上面代码可以看出,阻塞主要借助于三个变量,_cond、_mutex、_counter, 调用 Linux 系统的 pthread_cond_wait、pthread_mutex_lock、pthread_mutex_unlock (一组 POSIX 标准的阻塞接口)等平台相关的方法进行阻塞了!

而 park.cpp 中,则只有 Allocate、Release 等的一些常规操作!

// 6399321 As a temporary measure we copied & modified the ParkEvent::

// allocate() and release() code for use by Parkers. The Parker:: forms

// will eventually be removed as we consolide and shift over to ParkEvents

// for both builtin synchronization and JSR166 operations.

volatile int Parker::ListLock = 0 ;

Parker * volatile Parker::FreeList = NULL ;

Parker Parker::Allocate (JavaThread t) {

guarantee (t != NULL, "invariant") ;

Parker * p ;

// Start by trying to recycle an existing but unassociated

// Parker from the global free list.

// 8028280: using concurrent free list without memory management can leak

// pretty badly it turns out.

Thread::SpinAcquire(&ListLock, "ParkerFreeListAllocate");

{

p = FreeList;if (p != NULL) {  FreeList = p->FreeNext;}

}

Thread::SpinRelease(&ListLock);

if (p != NULL) {

guarantee (p->AssociatedWith == NULL, "invariant") ;

} else {

// Do this the hard way -- materialize a new Parker..p = new Parker() ;

}

p->AssociatedWith = t ; // Associate p with t

p->FreeNext = NULL ;

return p ;

}

void Parker::Release (Parker * p) {

if (p == NULL) return ;

guarantee (p->AssociatedWith != NULL, "invariant") ;

guarantee (p->FreeNext == NULL , "invariant") ;

p->AssociatedWith = NULL ;

Thread::SpinAcquire(&ListLock, "ParkerFreeListRelease");

{

p->FreeNext = FreeList;FreeList = p;

}

Thread::SpinRelease(&ListLock);

}

综上源码,在进行阻塞的时候,底层并没有(并不一定)要用 while 死循环来阻塞,更多的是借助于操作系统的实现来进行阻塞的。当然,这也更符合大家的猜想!

从上的代码我们也发现一点,底层在做许多事的时候,都不忘考虑线程中断,也就是说,即使在阻塞状态也是可以接收中断信号的,这为上层语言打开了方便之门。

如果要细说阻塞,其实还远没完,不过再往操作系统层面如何实现,就得再下点功夫,去翻翻资料了,把底线压在操作系统层面,大多数情况下也够用了!

转载于:https://blog.51cto.com/14158311/2357080

你可能感兴趣的文章
HTML
查看>>
CENTOS7下编译安装PHP-5.4以及配置phpMyAdmin
查看>>
磁盘显示无法访问拒绝访问,里面的资料怎样找到
查看>>
Java之品优购课程讲义_day07(5)
查看>>
Java的新项目学成在线笔记-day3(八)
查看>>
路由简单的实验
查看>>
好程序员web前端教程分享js reduce方法使用教程
查看>>
零基础学习大数据Hadoop需要什么准备?Hadoop如何发展起来的?
查看>>
前端程序员需要具备的几个软实力,你具备了吗
查看>>
RHEL系列网络配置2015083101
查看>>
c# 基本值类型及其默认值
查看>>
服务器端解决JS跨域调用问题
查看>>
MySql中添加用户,新建数据库,用户授权,删除用户,修改密码
查看>>
雨巷-戴望舒
查看>>
OpenCms创建网站过程图解——献给OpenCms的初学者们
查看>>
C++ 异常处理机制的实现
查看>>
Freebsd的ports命令
查看>>
分布式系统---幂等性设计
查看>>
【转】时钟周期,机器周期,指令周期的区别
查看>>
MYSQL 更新时间自己主动同步与创建时间默认值共存问题
查看>>