//
// ========================================================================
-// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
*/
public class SharedBlockingCallback
{
- private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
+ static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
+
+ final ReentrantLock _lock = new ReentrantLock();
+ final Condition _idle = _lock.newCondition();
+ final Condition _complete = _lock.newCondition();
private static Throwable IDLE = new Throwable()
}
};
- final Blocker _blocker;
+ Blocker _blocker;
public SharedBlockingCallback()
{
- this(new Blocker());
+ _blocker=new Blocker();
}
- protected SharedBlockingCallback(Blocker blocker)
+ protected long getIdleTimeout()
{
- _blocker=blocker;
+ return -1;
}
public Blocker acquire() throws IOException
{
- _blocker._lock.lock();
+ _lock.lock();
+ long idle = getIdleTimeout();
try
{
while (_blocker._state != IDLE)
- _blocker._idle.await();
+ {
+ if (idle>0 && (idle < Long.MAX_VALUE/2))
+ {
+ // Wait a little bit longer than the blocker might block
+ if (!_idle.await(idle*2,TimeUnit.MILLISECONDS))
+ throw new IOException(new TimeoutException());
+ }
+ else
+ _idle.await();
+ }
_blocker._state = null;
}
catch (final InterruptedException e)
{
- throw new InterruptedIOException()
- {
- {
- initCause(e);
- }
- };
+ throw new InterruptedIOException();
}
finally
{
- _blocker._lock.unlock();
+ _lock.unlock();
}
return _blocker;
}
+ protected void notComplete(Blocker blocker)
+ {
+ LOG.warn("Blocker not complete {}",blocker);
+ if (LOG.isDebugEnabled())
+ LOG.debug(new Throwable());
+ }
/* ------------------------------------------------------------ */
/** A Closeable Callback.
* Uses the auto close mechanism to check block has been called OK.
*/
- public static class Blocker implements Callback, Closeable
+ public class Blocker implements Callback, Closeable
{
- final ReentrantLock _lock = new ReentrantLock();
- final Condition _idle = _lock.newCondition();
- final Condition _complete = _lock.newCondition();
Throwable _state = IDLE;
-
- public Blocker()
+
+ protected Blocker()
{
}
_state = SUCCEEDED;
_complete.signalAll();
}
- else if (_state == IDLE)
- throw new IllegalStateException("IDLE");
+ else
+ throw new IllegalStateException(_state);
}
finally
{
{
if (_state == null)
{
- // TODO remove when feedback received on 435322
if (cause==null)
- LOG.warn("null failed cause (please report stack trace) ",new Throwable());
- _state = cause==null?FAILED:cause;
+ _state=FAILED;
+ else if (cause instanceof BlockerTimeoutException)
+ // Not this blockers timeout
+ _state=new IOException(cause);
+ else
+ _state=cause;
_complete.signalAll();
}
- else if (_state == IDLE)
- throw new IllegalStateException("IDLE");
+ else
+ throw new IllegalStateException(_state);
}
finally
{
LOG.warn("Blocking a NonBlockingThread: ",new Throwable());
_lock.lock();
+ long idle = getIdleTimeout();
try
{
while (_state == null)
{
- // TODO remove this debug timout!
- // This is here to help debug 435322,
- if (!_complete.await(10,TimeUnit.MINUTES))
+ if (idle>0 && (idle < Long.MAX_VALUE/2))
{
- IOException x = new IOException("DEBUG timeout");
- LOG.warn("Blocked too long (please report!!!) "+this, x);
- _state=x;
+ // Wait a little bit longer than expected callback idle timeout
+ if (!_complete.await(idle+idle/2,TimeUnit.MILLISECONDS))
+ // The callback has not arrived in sufficient time.
+ // We will synthesize a TimeoutException
+ _state=new BlockerTimeoutException();
}
+ else
+ _complete.await();
}
if (_state == SUCCEEDED)
}
catch (final InterruptedException e)
{
- throw new InterruptedIOException()
- {
- {
- initCause(e);
- }
- };
+ throw new InterruptedIOException();
}
finally
{
if (_state == IDLE)
throw new IllegalStateException("IDLE");
if (_state == null)
- LOG.debug("Blocker not complete",new Throwable());
+ notComplete(this);
}
finally
{
- _state = IDLE;
- _idle.signalAll();
- _lock.unlock();
+ try
+ {
+ // If the blocker timed itself out, remember the state
+ if (_state instanceof BlockerTimeoutException)
+ // and create a new Blocker
+ _blocker=new Blocker();
+ else
+ // else reuse Blocker
+ _state = IDLE;
+ _idle.signalAll();
+ _complete.signalAll();
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
}
_lock.lock();
try
{
- return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state);
+ return String.format("%s@%x{%s}",Blocker.class.getSimpleName(),hashCode(),_state);
}
finally
{
}
}
}
+
+ private static class BlockerTimeoutException extends TimeoutException
+ {
+ }
}