Index: errai-bus/src/main/java/org/jboss/errai/bus/server/MessageQueue.java =================================================================== --- errai-bus/src/main/java/org/jboss/errai/bus/server/MessageQueue.java (revision 1218) +++ errai-bus/src/main/java/org/jboss/errai/bus/server/MessageQueue.java (working copy) @@ -21,6 +21,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import static java.lang.System.currentTimeMillis; @@ -46,7 +47,7 @@ private int lastQueueSize = 0; private boolean throttleIncoming = false; private boolean queueRunning = true; - private volatile boolean pollActive = false; + //private volatile boolean pollActive = false; private boolean _windowPolling = false; private boolean windowPolling = false; @@ -58,6 +59,8 @@ private ServerMessageBus bus; private volatile TimedTask task; + private final Semaphore lock = new Semaphore(1, true); + /** * Initializes the message queue with an initial size and a specified bus * @@ -84,18 +87,25 @@ checkSession(); try { - MarshalledMessage m; + MarshalledMessage m = null; + + if( lock.tryAcquire(0, TimeUnit.SECONDS)) + { + try + { if (wait) { - if (pollActive) { - throw new RuntimeException("concurrent polling not allowed!"); - } - pollActive = true; m = queue.poll(45, TimeUnit.SECONDS); - pollActive = false; + } else { m = queue.poll(); } + } + finally { + lock.release(); + } + } + int payLoadSize = 0; Payload p = new Payload(m == null ? heartBeat : m); @@ -104,6 +114,11 @@ windowPolling = true; _windowPolling = false; } else if (windowPolling) { + + try + { + if( lock.tryAcquire(0, TimeUnit.SECONDS)) + { while (!queue.isEmpty() && payLoadSize < MAXIMUM_PAYLOAD_SIZE && !isWindowExceeded()) { p.addMessage(queue.poll()); @@ -117,6 +132,12 @@ // just resume. } } + } + } + finally + { + lock.release(); + } if (!throttleIncoming && queue.size() > lastQueueSize) { if (transmissionWindow < MAX_TRANSMISSION_WINDOW) { @@ -282,7 +303,7 @@ * @return true if the queue is stale */ public boolean isStale() { - return !queueRunning || (!pollActive && (currentTimeMillis() - lastTransmission) > TIMEOUT); + return !queueRunning || (!isActive() && (currentTimeMillis() - lastTransmission) > TIMEOUT); } /** @@ -291,7 +312,7 @@ * @return true if the queue is actively polling */ public boolean isActive() { - return pollActive; + return lock.availablePermits()==0; } /**