Author: david.lloyd(a)jboss.com
Date: 2008-03-21 18:13:09 -0400 (Fri, 21 Mar 2008)
New Revision: 3730
Added:
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/State.java
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AtomicStateMachine.java
Log:
Enhance atomic state machine to use new lock, and detect invalid transitions
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-03-21
22:12:18 UTC (rev 3729)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-03-21
22:13:09 UTC (rev 3730)
@@ -5,6 +5,7 @@
import java.util.List;
import java.util.Set;
import java.util.Iterator;
+import java.util.EnumSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.Endpoint;
@@ -49,10 +50,21 @@
Logger.getLogger("org.jboss.cx.remoting").info("JBoss Remoting
version %s", Version.VERSION);
}
- private enum State {
+ private enum State implements org.jboss.cx.remoting.util.State<State> {
INITIAL,
UP,
- DOWN,
+ DOWN;
+
+ public boolean isReachable(final State dest) {
+ switch (this) {
+ case INITIAL:
+ return dest != INITIAL;
+ case UP:
+ return dest == DOWN;
+ default:
+ return false;
+ }
+ }
}
public CoreEndpoint(final String name, final RequestListener<?, ?>
rootListener) {
@@ -158,6 +170,12 @@
}
public Session openSession(final URI uri, final AttributeMap attributeMap) throws
RemotingException {
+ if (uri == null) {
+ throw new NullPointerException("uri is null");
+ }
+ if (attributeMap == null) {
+ throw new NullPointerException("attributeMap is null");
+ }
final String scheme = uri.getScheme();
if (scheme == null) {
throw new RemotingException("No scheme on remote endpoint
URI");
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java 2008-03-21
22:12:18 UTC (rev 3729)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java 2008-03-21
22:13:09 UTC (rev 3730)
@@ -6,6 +6,7 @@
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.util.AtomicStateMachine;
+import org.jboss.cx.remoting.util.State;
import static org.jboss.cx.remoting.util.CollectionUtil.synchronizedHashSet;
import static org.jboss.cx.remoting.util.AtomicStateMachine.start;
@@ -23,11 +24,15 @@
private ContextClient contextClient;
- private enum State {
+ private enum State implements org.jboss.cx.remoting.util.State<State> {
NEW,
UP,
STOPPING,
- DOWN,
+ DOWN;
+
+ public boolean isReachable(final State dest) {
+ return compareTo(dest) < 0;
+ }
}
public CoreInboundContext(final RequestListener<I, O> requestListener, final
Executor executor) {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java 2008-03-21
22:12:18 UTC (rev 3729)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java 2008-03-21
22:13:09 UTC (rev 3730)
@@ -6,6 +6,7 @@
import org.jboss.cx.remoting.RequestContext;
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.util.AtomicStateMachine;
+import org.jboss.cx.remoting.util.State;
import org.jboss.cx.remoting.log.Logger;
import java.util.concurrent.Executor;
import java.util.Set;
@@ -51,11 +52,15 @@
this.executor = executor;
}
- private enum State {
+ private enum State implements org.jboss.cx.remoting.util.State<State> {
INITIAL,
UNSENT,
SENT,
- TERMINATED,
+ TERMINATED;
+
+ public boolean isReachable(final State dest) {
+ return compareTo(dest) < 0;
+ }
}
public void initialize(final RequestClient<O> requestClient) {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java 2008-03-21
22:12:18 UTC (rev 3729)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java 2008-03-21
22:13:09 UTC (rev 3730)
@@ -4,6 +4,7 @@
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.util.AtomicStateMachine;
+import org.jboss.cx.remoting.util.State;
import java.util.concurrent.Executor;
/**
@@ -16,9 +17,13 @@
private final AtomicStateMachine<State> state = start(State.INITIAL);
- private enum State {
+ private enum State implements org.jboss.cx.remoting.util.State<State> {
INITIAL,
- UP,
+ UP;
+
+ public boolean isReachable(final State dest) {
+ return compareTo(dest) < 0;
+ }
}
public CoreInboundService(final RequestListener<I, O> requestListener, final
Executor executor) {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java 2008-03-21
22:12:18 UTC (rev 3729)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java 2008-03-21
22:13:09 UTC (rev 3730)
@@ -12,6 +12,7 @@
import org.jboss.cx.remoting.core.util.QueueExecutor;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.util.CollectionUtil;
+import org.jboss.cx.remoting.util.State;
import org.jboss.cx.remoting.log.Logger;
/**
@@ -39,11 +40,15 @@
state.releaseExclusive();
}
- private enum State {
+ private enum State implements org.jboss.cx.remoting.util.State<State> {
INITIAL,
UP,
STOPPING,
- DOWN,
+ DOWN,;
+
+ public boolean isReachable(final State dest) {
+ return compareTo(dest) < 0;
+ }
}
// Getters
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java 2008-03-21
22:12:18 UTC (rev 3729)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java 2008-03-21
22:13:09 UTC (rev 3730)
@@ -12,6 +12,7 @@
import org.jboss.cx.remoting.RequestCompletionHandler;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.util.AtomicStateMachine;
+import org.jboss.cx.remoting.util.State;
import org.jboss.cx.remoting.log.Logger;
/**
@@ -53,12 +54,22 @@
return requestClient;
}
- private enum State {
+ private enum State implements org.jboss.cx.remoting.util.State<State> {
WAITING,
DONE,
EXCEPTION,
CANCELLED,
- TERMINATED,
+ TERMINATED,;
+
+ public boolean isReachable(final State dest) {
+ switch (this) {
+ case WAITING:
+ case DONE:
+ return compareTo(dest) < 0;
+ default:
+ return false;
+ }
+ }
}
/**
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-03-21
22:12:18 UTC (rev 3729)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-03-21
22:13:09 UTC (rev 3730)
@@ -5,6 +5,7 @@
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.CloseHandler;
import org.jboss.cx.remoting.util.AtomicStateMachine;
+import org.jboss.cx.remoting.util.State;
import org.jboss.cx.remoting.log.Logger;
import java.util.concurrent.Executor;
import java.io.Serializable;
@@ -26,11 +27,15 @@
this.executor = executor;
}
- private enum State {
+ private enum State implements org.jboss.cx.remoting.util.State<State> {
INITIAL,
UP,
CLOSING,
- DOWN
+ DOWN;
+
+ public boolean isReachable(final State dest) {
+ return compareTo(dest) < 0;
+ }
}
// Getters
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-03-21
22:12:18 UTC (rev 3729)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-03-21
22:13:09 UTC (rev 3730)
@@ -28,6 +28,7 @@
import org.jboss.cx.remoting.spi.ByteMessageOutput;
import org.jboss.cx.remoting.spi.ObjectMessageInput;
import org.jboss.cx.remoting.util.CollectionUtil;
+import org.jboss.cx.remoting.util.State;
import org.jboss.cx.remoting.spi.ObjectMessageOutput;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
@@ -135,11 +136,16 @@
if (protocolHandler == null) {
throw new NullPointerException("protocolHandler is null");
}
+ boolean ok = false;
state.requireTransitionExclusive(State.NEW, State.CONNECTING);
try {
doInitialize(protocolHandler, rootContext);
+ ok = true;
} finally {
state.releaseExclusive();
+ if (! ok) {
+ state.transition(State.DOWN);
+ }
}
}
@@ -147,11 +153,16 @@
if (protocolHandlerFactory == null) {
throw new NullPointerException("protocolHandlerFactory is null");
}
+ boolean ok = false;
state.requireTransitionExclusive(State.NEW, State.CONNECTING);
try {
doInitialize(protocolHandlerFactory.createHandler(protocolContext, remoteUri,
attributeMap), rootContext);
+ ok = true;
} finally {
state.releaseExclusive();
+ if (! ok) {
+ state.transition(State.DOWN);
+ }
}
}
@@ -165,12 +176,16 @@
// State mgmt
- private enum State {
+ private enum State implements org.jboss.cx.remoting.util.State<State> {
NEW,
CONNECTING,
UP,
STOPPING,
- DOWN,
+ DOWN;
+
+ public boolean isReachable(final State dest) {
+ return compareTo(dest) < 0;
+ }
}
// Context mgmt
Modified:
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java
===================================================================
---
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java 2008-03-21
22:12:18 UTC (rev 3729)
+++
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java 2008-03-21
22:13:09 UTC (rev 3730)
@@ -9,6 +9,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.EnumMap;
+import java.util.EnumSet;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.AttributeKey;
@@ -26,6 +28,7 @@
import org.jboss.cx.remoting.spi.ObjectMessageInput;
import org.jboss.cx.remoting.util.CollectionUtil;
import org.jboss.cx.remoting.util.WeakHashSet;
+import org.jboss.cx.remoting.util.State;
import org.jboss.cx.remoting.jrpp.id.JrppContextIdentifier;
import org.jboss.cx.remoting.jrpp.id.JrppRequestIdentifier;
import org.jboss.cx.remoting.jrpp.id.JrppServiceIdentifier;
@@ -100,7 +103,7 @@
return connection.getIoHandler();
}
- private enum State {
+ private enum State implements org.jboss.cx.remoting.util.State<State> {
/** Initial state - unconnected */
NEW,
/** Client side, waiting to receive protocol version info */
@@ -118,7 +121,12 @@
/** Session is shutting down or closed */
CLOSED,
/** Session failed to connect */
- FAILED,
+ FAILED;
+
+ public boolean isReachable(final State dest) {
+ // not perfect but close enough for now
+ return compareTo(dest) < 0;
+ }
}
private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.NEW);
@@ -351,8 +359,9 @@
}
public void waitForUp() throws IOException {
+
while (! state.in(State.UP, State.FAILED)) {
- state.waitForAny();
+// state.waitForAny(); todo
}
if (state.in(State.FAILED)) {
throw failureReason;
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
---
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-03-21
22:12:18 UTC (rev 3729)
+++
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-03-21
22:13:09 UTC (rev 3730)
@@ -6,6 +6,7 @@
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.core.CoreEndpoint;
import org.jboss.cx.remoting.core.protocol.LocalProtocolHandlerFactory;
+import org.jboss.cx.remoting.jrpp.JrppProtocolSupport;
/**
*
@@ -22,6 +23,11 @@
try {
final Endpoint userEndpoint = coreEndpoint.getUserEndpoint();
LocalProtocolHandlerFactory.addTo(userEndpoint);
+ final JrppProtocolSupport jrppProtocolSupport = new JrppProtocolSupport();
+ jrppProtocolSupport.setEndpoint(userEndpoint);
+ jrppProtocolSupport.setExecutor(executorService);
+ jrppProtocolSupport.create();
+ jrppProtocolSupport.start();
userEndpoint.addCloseHandler(new CloseHandler<Endpoint>() {
public void handleClose(final Endpoint closed) {
executorService.shutdown();
Modified:
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AtomicStateMachine.java
===================================================================
---
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AtomicStateMachine.java 2008-03-21
22:12:18 UTC (rev 3729)
+++
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/AtomicStateMachine.java 2008-03-21
22:13:09 UTC (rev 3730)
@@ -10,16 +10,13 @@
/**
*
*/
-public final class AtomicStateMachine<T extends Enum<T>> {
+public final class AtomicStateMachine<T extends Enum<T> & State<T>>
{
// protected by {@code lock}
private T state;
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private final Lock readLock = lock.readLock();
- private final Lock writeLock = lock.writeLock();
- private final Condition cond = writeLock.newCondition();
+ private final StateLock stateLock = new StateLock();
- public static <T extends Enum<T>> AtomicStateMachine<T> start(final
T initialState) {
+ public static <T extends Enum<T> & State<T>>
AtomicStateMachine<T> start(final T initialState) {
return new AtomicStateMachine<T>(initialState);
}
@@ -31,19 +28,18 @@
}
public boolean transition(final T state) {
- writeLock.lock();
+ if (state == null) {
+ throw new NullPointerException("state is null");
+ }
+ stateLock.lockExclusive();
try {
- if (state == null) {
- throw new NullPointerException("state is null");
- }
if (this.state == state) {
return false;
}
this.state = state;
- cond.signalAll();
return true;
} finally {
- writeLock.unlock();
+ stateLock.unlockExclusive();
}
}
@@ -67,17 +63,16 @@
if (state == null) {
throw new NullPointerException("state is null");
}
- writeLock.lock();
+ stateLock.lockExclusive();
try {
if (this.state == state) {
return false;
}
this.state = state;
- cond.signalAll();
- readLock.lock();
+ stateLock.lockShared();
return true;
} finally {
- writeLock.unlock();
+ stateLock.unlockExclusive();
}
}
@@ -85,12 +80,11 @@
if (state == null) {
throw new NullPointerException("state is null");
}
- writeLock.lock();
+ stateLock.lockExclusive();
if (this.state == state) {
return false;
}
this.state = state;
- cond.signalAll();
return true;
}
@@ -98,16 +92,16 @@
* Release a held state. Must be called from the same thread that is holding the
state.
*/
public void release() {
- readLock.unlock();
+ stateLock.unlockShared();
}
public void releaseExclusive() {
- writeLock.unlock();
+ stateLock.unlockExclusive();
}
public void releaseDowngrade() {
- readLock.lock();
- writeLock.unlock();
+ stateLock.lockShared();
+ stateLock.unlockExclusive();
}
public boolean transition(final T fromState, final T toState) {
@@ -117,16 +111,15 @@
if (toState == null) {
throw new NullPointerException("toState is null");
}
- writeLock.lock();
+ stateLock.lockExclusive();
try {
if (state != fromState) {
return false;
}
state = toState;
- cond.signalAll();
return true;
} finally {
- writeLock.unlock();
+ stateLock.unlockExclusive();
}
}
@@ -137,17 +130,16 @@
if (toState == null) {
throw new NullPointerException("toState is null");
}
- writeLock.lock();
+ stateLock.lockExclusive();
try {
if (state != fromState) {
return false;
}
state = toState;
- cond.signalAll();
- readLock.lock();
+ stateLock.lockShared();
return true;
} finally {
- writeLock.unlock();
+ stateLock.unlockExclusive();
}
}
@@ -158,19 +150,18 @@
if (toState == null) {
throw new NullPointerException("toState is null");
}
- writeLock.lock();
+ stateLock.lockExclusive();
boolean ok = false;
try {
if (state != fromState) {
return false;
}
state = toState;
- cond.signalAll();
ok = true;
return true;
} finally {
if (! ok) {
- writeLock.unlock();
+ stateLock.unlockExclusive();
}
}
}
@@ -205,348 +196,151 @@
}
}
-
public void waitInterruptiblyFor(final T state) throws InterruptedException {
- if (in(state)) {
- return;
- }
- writeLock.lockInterruptibly();
- try {
- while (this.state != state) {
- cond.await();
+ stateLock.lockShared();
+ while (this.state != state) {
+ if (this.state.isReachable(state)) {
+ stateLock.yieldShared();
+ } else try {
+ throw new IllegalStateException("Destination state " + state +
" is unreachable from " + this.state);
+ } finally {
+ stateLock.unlockShared();
}
- } finally {
- writeLock.unlock();
}
+ stateLock.unlockShared();
+ return;
}
public void waitFor(final T state) {
- if (in(state)) {
- return;
+ if (state == null) {
+ throw new NullPointerException("state is null");
}
- writeLock.lock();
- try {
- while (this.state != state) {
- cond.awaitUninterruptibly();
+ stateLock.lockShared();
+ while (this.state != state) {
+ if (this.state.isReachable(state)) {
+ stateLock.yieldShared();
+ } else try {
+ throw new IllegalStateException("Destination state " + state +
" is unreachable from " + this.state);
+ } finally {
+ stateLock.unlockShared();
}
- } finally {
- writeLock.unlock();
}
+ stateLock.unlockShared();
+ return;
}
public void waitForHold(final T state) {
- if (inHold(state)) {
- return;
+ if (state == null) {
+ throw new NullPointerException("state is null");
}
- writeLock.lock();
- try {
- while (this.state != state) {
- cond.awaitUninterruptibly();
+ stateLock.lockShared();
+ while (this.state != state) {
+ if (this.state.isReachable(state)) {
+ stateLock.yieldShared();
+ } else try {
+ throw new IllegalStateException("Destination state " + state +
" is unreachable from " + this.state);
+ } finally {
+ stateLock.unlockShared();
}
- readLock.lock();
- } finally {
- writeLock.unlock();
}
+ return;
}
- public void waitForAny() {
- writeLock.lock();
- try {
- waitForNot(state);
- } finally {
- writeLock.unlock();
- }
- }
-
- public boolean waitInterruptiblyFor(final T state, final long timeout, final TimeUnit
timeUnit) throws InterruptedException {
- if (in(state)) {
- return true;
- }
- final long timeoutMillis = timeUnit.toMillis(timeout);
- final long startTime = System.currentTimeMillis();
- final long endTime = startTime + timeoutMillis < 0 ? Long.MAX_VALUE :
startTime + timeoutMillis;
- final Date deadline = new Date(endTime);
- writeLock.lockInterruptibly();
- try {
- while (this.state != state) {
- if (! cond.awaitUntil(deadline)) {
- return false;
- }
- }
- return true;
- } finally {
- writeLock.unlock();
- }
- }
-
- public T waitInterruptiblyForNot(final T state) throws InterruptedException {
- final T current = getState();
- if (current != state) {
- return current;
- }
- writeLock.lockInterruptibly();
- try {
- while (this.state == state) {
- cond.await();
- }
- return this.state;
- } finally {
- writeLock.unlock();
- }
- }
-
public T waitInterruptiblyForNotHold(final T state) throws InterruptedException {
- final T current = getStateHold();
- if (current != state) {
- return current;
+ if (state == null) {
+ throw new NullPointerException("state is null");
}
- release();
- writeLock.lockInterruptibly();
- try {
- while (this.state == state) {
- cond.await();
- }
- readLock.lockInterruptibly();
- return this.state;
- } finally {
- writeLock.unlock();
+ stateLock.lockShared();
+ while (this.state == state) {
+ stateLock.yieldShared();
}
+ return this.state;
}
public T waitForNot(final T state) {
- final T current = getState();
- if (current != state) {
- return current;
+ if (state == null) {
+ throw new NullPointerException("state is null");
}
- writeLock.lock();
+ stateLock.lockShared();
+ while (this.state == state) {
+ stateLock.yieldShared();
+ }
try {
- while (this.state == state) {
- cond.awaitUninterruptibly();
- }
return this.state;
} finally {
- writeLock.unlock();
+ stateLock.unlockShared();
}
}
public T waitForNotHold(final T state) {
- final T current = getStateHold();
- if (current != state) {
- return current;
+ if (state == null) {
+ throw new NullPointerException("state is null");
}
- release();
- writeLock.lock();
- try {
- while (this.state == state) {
- cond.awaitUninterruptibly();
- }
- readLock.lock();
- return this.state;
- } finally {
- writeLock.unlock();
+ stateLock.lockShared();
+ while (this.state == state) {
+ stateLock.yieldShared();
}
+ return this.state;
}
+ @Deprecated
public T waitForNotExclusive(final T state) {
- writeLock.lock();
+ if (state == null) {
+ throw new NullPointerException("state is null");
+ }
+ stateLock.lockExclusive();
while (this.state == state) {
- cond.awaitUninterruptibly();
+ stateLock.awaitExclusive();
}
return this.state;
}
- public T waitInterruptiblyForNot(final T state, final long timeout, final TimeUnit
timeUnit) throws InterruptedException {
- final T current = getState();
- if (current != state) {
- return current;
- }
- final long timeoutMillis = timeUnit.toMillis(timeout);
- final long startTime = System.currentTimeMillis();
- final long endTime = startTime + timeoutMillis < 0 ? Long.MAX_VALUE :
startTime + timeoutMillis;
- final Date deadLine = new Date(endTime);
- writeLock.lockInterruptibly();
- try {
- while (this.state == state) {
- cond.awaitUntil(deadLine);
- }
- return this.state;
- } finally {
- writeLock.unlock();
- }
- }
-
-
public T waitInterruptiblyForNotHold(final T state, final long timeout, final
TimeUnit timeUnit) throws InterruptedException {
- final T current = getStateHold();
- if (current != state) {
- return current;
- }
- release();
- final long timeoutMillis = timeUnit.toMillis(timeout);
- final long startTime = System.currentTimeMillis();
- final long endTime = startTime + timeoutMillis < 0 ? Long.MAX_VALUE :
startTime + timeoutMillis;
- final Date deadLine = new Date(endTime);
- boolean waiting = true;
- writeLock.lockInterruptibly();
- try {
- while (this.state == state) {
- if (waiting) {
- waiting = cond.awaitUntil(deadLine);
- } else {
- break;
- }
- }
- readLock.lockInterruptibly();
- return this.state;
- } finally {
- writeLock.unlock();
- }
+ throw new RuntimeException("TODO - Implement");
}
public T waitForNotHold(final T state, final long timeout, final TimeUnit timeUnit)
{
- final T current = getStateHold();
- if (current != state) {
- return current;
- }
- release();
- final long timeoutMillis = timeUnit.toMillis(timeout);
- final long startTime = System.currentTimeMillis();
- final long endTime = startTime + timeoutMillis < 0 ? Long.MAX_VALUE :
startTime + timeoutMillis;
- final Date deadLine = new Date(endTime);
- boolean intr = false;
- try {
- boolean waiting = true;
- writeLock.lock();
- try {
- while (this.state == state) {
- if (waiting) {
- try {
- waiting = cond.awaitUntil(deadLine);
- } catch (InterruptedException e) {
- intr = Thread.currentThread().isInterrupted();
- }
- } else {
- break;
- }
- }
- readLock.lock();
- return this.state;
- } finally {
- writeLock.unlock();
- }
- } finally {
- if (intr) Thread.currentThread().interrupt();
- }
+ throw new RuntimeException("TODO - Implement");
}
- public T waitForNot(final T state, final long timeout, final TimeUnit timeUnit) {
- final T current = getState();
- if (current != state) {
- return current;
- }
- final long timeoutMillis = timeUnit.toMillis(timeout);
- final long startTime = System.currentTimeMillis();
- final long endTime = startTime + timeoutMillis < 0 ? Long.MAX_VALUE :
startTime + timeoutMillis;
- final Date deadLine = new Date(endTime);
- boolean intr = false;
- writeLock.lock();
- try {
- while (this.state == state) {
- try {
- if (! cond.awaitUntil(deadLine)) {
- break;
- }
- } catch (InterruptedException e) {
- intr = true;
- }
- }
- return this.state;
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
public T getState() {
- readLock.lock();
+ stateLock.lockShared();
try {
return state;
} finally {
- readLock.unlock();
+ stateLock.unlockShared();
}
}
public T getStateHold() {
- readLock.lock();
+ stateLock.lockShared();
return state;
}
public T getStateExclusive() {
- writeLock.lock();
+ stateLock.lockExclusive();
return state;
}
- public boolean inHoldExclusive(T state) {
- writeLock.lock();
- boolean ok = false;
- try {
- ok = this.state == state;
- return ok;
- } finally {
- if (! ok) {
- writeLock.unlock();
- }
- }
- }
-
- public boolean inHoldExclusive(T... states) {
- if (states == null) {
- throw new NullPointerException("states is null");
- }
- writeLock.lock();
- for (T state : states) {
- if (this.state == state) {
- return true;
- }
- }
- writeLock.unlock();
- return false;
- }
-
public boolean inHold(T state) {
- readLock.lock();
+ stateLock.lockShared();
boolean ok = false;
try {
ok = this.state == state;
return ok;
} finally {
if (! ok) {
- readLock.unlock();
+ stateLock.unlockShared();
}
}
}
- public boolean inHold(T... states) {
- if (states == null) {
- throw new NullPointerException("states is null");
- }
- readLock.lock();
- for (T state : states) {
- if (this.state == state) {
- return true;
- }
- }
- readLock.unlock();
- return false;
- }
-
public boolean in(T state) {
- readLock.lock();
+ stateLock.lockShared();
try {
return this.state == state;
} finally {
- readLock.unlock();
+ stateLock.unlockShared();
}
}
@@ -554,7 +348,7 @@
if (states == null) {
throw new NullPointerException("states is null");
}
- readLock.lock();
+ stateLock.lockShared();
try {
for (T state : states) {
if (this.state == state) {
@@ -563,62 +357,32 @@
}
return false;
} finally {
- readLock.unlock();
+ stateLock.unlockShared();
}
}
- public void require(T state) {
- if (state == null) {
- throw new NullPointerException("state is null");
- }
- readLock.lock();
- try {
- if (this.state != state) {
- throw new IllegalStateException("Invalid state (expected " +
state + ", but current state is " + this.state + ")");
- }
- } finally {
- readLock.unlock();
- }
- }
-
public void requireHold(T state) {
if (state == null) {
throw new NullPointerException("state is null");
}
boolean ok = false;
- readLock.lock();
+ stateLock.lockShared();
try {
if (this.state != state) {
throw new IllegalStateException("Invalid state (expected " +
state + ", but current state is " + this.state + ")");
}
ok = true;
} finally {
- if (! ok) readLock.unlock();
+ if (! ok) stateLock.unlockShared();
}
}
- public void requireExclusive(T state) {
- if (state == null) {
- throw new NullPointerException("state is null");
- }
- boolean ok = false;
- writeLock.lock();
- try {
- if (this.state != state) {
- throw new IllegalStateException("Invalid state (expected " +
state + ", but current state is " + this.state + ")");
- }
- ok = true;
- } finally {
- if (! ok) writeLock.unlock();
- }
- }
-
public String toString() {
- readLock.lock();
+ stateLock.lockShared();
try {
return "State = " + state;
} finally {
- readLock.unlock();
+ stateLock.unlockShared();
}
}
}
Added: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/State.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/State.java
(rev 0)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/State.java 2008-03-21
22:13:09 UTC (rev 3730)
@@ -0,0 +1,8 @@
+package org.jboss.cx.remoting.util;
+
+/**
+ *
+ */
+public interface State<T extends Enum<T> & State<T>> {
+ boolean isReachable(T dest);
+}