[jboss-cvs] JBoss Messaging SVN: r3772 - in trunk/src/main/org/jboss: messaging/util and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 22 11:48:44 EST 2008
Author: timfox
Date: 2008-02-22 11:48:43 -0500 (Fri, 22 Feb 2008)
New Revision: 3772
Removed:
trunk/src/main/org/jboss/messaging/util/Future.java
Modified:
trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
Log:
Some improvements to ClientConsumerImpl and remove old Future
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java 2008-02-22 15:34:18 UTC (rev 3771)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java 2008-02-22 16:48:43 UTC (rev 3772)
@@ -22,6 +22,8 @@
package org.jboss.jms.client.impl;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.jboss.jms.client.api.MessageHandler;
import org.jboss.jms.client.remoting.MessagingRemotingConnection;
@@ -32,7 +34,6 @@
import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
-import org.jboss.messaging.util.Future;
import org.jboss.messaging.util.Logger;
import org.jboss.messaging.util.MessagingException;
@@ -54,6 +55,8 @@
private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
private static final boolean trace = log.isTraceEnabled();
+
+ private static final long CLOSE_TIMEOUT_SECONDS = 10;
// Attributes
// -----------------------------------------------------------------------------------
@@ -66,16 +69,12 @@
private volatile Thread receiverThread;
- private MessageHandler handler;
+ private volatile MessageHandler handler;
private volatile boolean closed;
-
- private Object mainLock = new Object();
-
+
private ExecutorService sessionExecutor;
- private boolean listenerRunning;
-
private MessagingRemotingConnection remotingConnection;
private long ignoreDeliveryMark = -1;
@@ -110,85 +109,82 @@
// ClientConsumer implementation
// -----------------------------------------------------------------
- public Message receive(long timeout) throws MessagingException
+ public synchronized Message receive(long timeout) throws MessagingException
{
checkClosed();
- synchronized (mainLock)
+ if (handler != null)
{
- if (handler != null)
- {
- throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot call receive(...) - a MessageHandler is set");
- }
+ throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot call receive(...) - a MessageHandler is set");
+ }
- receiverThread = Thread.currentThread();
+ receiverThread = Thread.currentThread();
- if (timeout == 0)
- {
- //Effectively infinite
- timeout = Long.MAX_VALUE;
- }
-
- long start = System.currentTimeMillis();
-
- long toWait = timeout;
+ if (timeout == 0)
+ {
+ //Effectively infinite
+ timeout = Long.MAX_VALUE;
+ }
+
+ long start = System.currentTimeMillis();
+
+ long toWait = timeout;
- try
+ try
+ {
+ while (true)
{
- while (true)
+ while (!closed && buffer.isEmpty() && toWait > 0)
{
- while (!closed && buffer.isEmpty() && toWait > 0)
+ try
{
- try
+ wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ if (!closed && !buffer.isEmpty())
+ {
+ DeliverMessage m = buffer.removeFirst();
+
+ boolean expired = m.getMessage().isExpired();
+
+ session.delivered(m.getDeliveryID(), expired);
+
+ flowControl();
+
+ if (expired)
+ {
+ if (toWait > 0)
{
- mainLock.wait(toWait);
+ continue;
}
- catch (InterruptedException e)
+ else
{
+ return null;
}
-
- long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
}
-
- if (!closed && !buffer.isEmpty())
- {
- DeliverMessage m = buffer.removeFirst();
-
- boolean expired = m.getMessage().isExpired();
-
- session.delivered(m.getDeliveryID(), expired);
-
- flowControl();
-
- if (expired)
- {
- if (toWait > 0)
- {
- continue;
- }
- else
- {
- return null;
- }
- }
-
- return m.getMessage();
- }
- else
- {
- return null;
- }
+
+ return m.getMessage();
}
+ else
+ {
+ return null;
+ }
}
- finally
- {
- receiverThread = null;
- }
}
+ finally
+ {
+ receiverThread = null;
+ }
}
public Message receiveImmediate() throws MessagingException
@@ -211,18 +207,10 @@
{
throw new MessagingException(MessagingException.ILLEGAL_STATE,"Cannot set MessageHandler - consumer is in receive(...)");
}
-
- synchronized (mainLock)
- {
- this.handler = handler;
-
- if (handler != null && !buffer.isEmpty())
- {
- listenerRunning = true;
-
- queueRunner();
- }
- }
+
+ waitForOnMessageToComplete();
+
+ this.handler = handler;
}
public void close() throws MessagingException
@@ -231,25 +219,26 @@
{
return;
}
-
+
try
{
- // We set the handler to null so the next ListenerRunner won't run
- handler = null;
-
// Now we wait for any current handler runners to run.
waitForOnMessageToComplete();
- synchronized (mainLock)
+ closed = true;
+
+ if (receiverThread != null)
{
- closed = true;
-
- if (receiverThread != null)
- {
+ synchronized (this)
+ {
// Wake up any receive() thread that might be waiting
- mainLock.notify();
+ notify();
}
}
+
+ handler = null;
+
+ receiverThread = null;
remotingConnection.send(id, new CloseMessage());
@@ -258,7 +247,7 @@
finally
{
session.removeConsumer(this);
- }
+ }
}
public boolean isClosed()
@@ -276,76 +265,74 @@
public void handleMessage(final DeliverMessage message) throws Exception
{
- synchronized (mainLock)
+ if (closed)
{
- if (closed)
+ // This is ok - we just ignore the message
+ return;
+ }
+
+ if (ignoreDeliveryMark >= 0)
+ {
+ long delID = message.getDeliveryID();
+
+ if (delID > ignoreDeliveryMark)
{
- // This is ok - we just ignore the message
+ // Ignore - the session is recovering and these are inflight
+ // messages
return;
}
-
- if (ignoreDeliveryMark >= 0)
+ else
{
- long delID = message.getDeliveryID();
-
- if (delID > ignoreDeliveryMark)
- {
- // Ignore - the session is recovering and these are inflight
- // messages
- return;
- }
- else
- {
- // We have hit the begining of the recovered messages - we can
- // stop ignoring
- ignoreDeliveryMark = -1;
- }
+ // We have hit the begining of the recovered messages - we can
+ // stop ignoring
+ ignoreDeliveryMark = -1;
}
-
- // Add it to the buffer
- Message coreMessage = message.getMessage();
-
- buffer.addLast(message, coreMessage.getPriority());
-
- if (receiverThread != null)
+ }
+
+ if (handler != null)
+ {
+ if (direct)
{
- mainLock.notify();
- }
- else if (handler != null)
- {
- if (direct)
- {
- //Dispatch it directly on remoting thread
-
- boolean expired = message.getMessage().isExpired();
+ //Dispatch it directly on remoting thread
+
+ boolean expired = message.getMessage().isExpired();
- session.delivered(message.getDeliveryID(), expired);
-
- flowControl();
+ session.delivered(message.getDeliveryID(), expired);
+
+ flowControl();
- if (!expired)
- {
- handler.onMessage(message.getMessage());
- }
- }
- else if (!listenerRunning)
+ if (!expired)
{
- listenerRunning = true;
-
- queueRunner();
+ handler.onMessage(message.getMessage());
}
}
+ else
+ {
+ //Execute using executor
+
+ buffer.addLast(message, message.getMessage().getPriority());
+
+ sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
+ }
}
+ else
+ {
+ // Add it to the buffer
+
+ buffer.addLast(message, message.getMessage().getPriority());
+
+ synchronized (this)
+ {
+ notify();
+ }
+ }
}
- public void recover(long lastDeliveryID)
+ public synchronized void recover(long lastDeliveryID)
{
- synchronized (mainLock)
- {
- ignoreDeliveryMark = lastDeliveryID;
+ ignoreDeliveryMark = lastDeliveryID;
- buffer.clear();
- }
+ buffer.clear();
}
// Public
@@ -377,26 +364,36 @@
private void waitForOnMessageToComplete()
{
- // Wait for any onMessage() executions to complete
-
+ if (handler == null)
+ {
+ return;
+ }
+
if (Thread.currentThread() == onMessageThread)
{
// If called from inside onMessage then return immediately - otherwise would block forever
return;
}
- Future result = new Future();
+ Future<?> future = sessionExecutor.submit(new Runnable() { public void run() {} });
- sessionExecutor.execute(new Closer(result));
-
- result.getResult();
+ long start = System.currentTimeMillis();
+ try
+ {
+ future.get(CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ long end = System.currentTimeMillis();
+
+ if (end - start >= CLOSE_TIMEOUT_SECONDS * 1000)
+ {
+ log.warn("Timed out waiting for handler to complete processing");
+ }
}
- private void queueRunner()
- {
- sessionExecutor.execute(new ListenerRunner());
- }
-
private void checkClosed() throws MessagingException
{
if (closed)
@@ -405,89 +402,48 @@
}
}
- private void onMessageLoop()
+ private void callOnMessage()
{
- try
- {
- onMessageThread = Thread.currentThread();
-
- DeliverMessage msg = null;
-
- MessageHandler theListener = null;
-
- synchronized (mainLock)
- {
- if (handler == null || buffer.isEmpty())
- {
- listenerRunning = false;
-
- return;
- }
-
- theListener = handler;
-
- msg = buffer.removeFirst();
- }
-
- if (msg != null)
- {
- boolean expired = msg.getMessage().isExpired();
-
- session.delivered(msg.getDeliveryID(), expired);
+ try
+ {
+ if (closed)
+ {
+ return;
+ }
+
+ //We pull the message from the buffer from inside the Runnable so we can ensure priority
+ //ordering. If we just added a Runnable with the message to the executor immediately as we get it
+ //we could not do that
+
+ DeliverMessage message = buffer.removeFirst();
+
+ if (message != null)
+ {
+ boolean expired = message.getMessage().isExpired();
+
+ session.delivered(message.getDeliveryID(), expired);
flowControl();
-
+
if (!expired)
{
- theListener.onMessage(msg.getMessage());
+ onMessageThread = Thread.currentThread();
+
+ handler.onMessage(message.getMessage());
}
- }
-
- synchronized (mainLock)
- {
- if (!buffer.isEmpty())
- {
- queueRunner();
- }
- else
- {
- listenerRunning = false;
- }
- }
- }
- catch (MessagingException e)
- {
- log.error("Failure in ListenerRunner", e);
- }
+ }
+ }
+ catch (MessagingException e)
+ {
+ log.error("Failed to execute", e);
+ }
+ catch (RuntimeException e)
+ {
+ log.error("RuntimeException thrown from handler", e);
+ }
}
// Inner classes
// --------------------------------------------------------------------------------
- /*
- * This class is used to put on the handler executor to wait for onMessage
- * invocations to complete when closing
- */
- private class Closer implements Runnable
- {
- Future result;
-
- Closer(Future result)
- {
- this.result = result;
- }
-
- public void run()
- {
- result.setResult(null);
- }
- }
-
- private class ListenerRunner implements Runnable
- {
- public void run()
- {
- onMessageLoop();
- }
- }
}
Deleted: trunk/src/main/org/jboss/messaging/util/Future.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/Future.java 2008-02-22 15:34:18 UTC (rev 3771)
+++ trunk/src/main/org/jboss/messaging/util/Future.java 2008-02-22 16:48:43 UTC (rev 3772)
@@ -1,85 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.util;
-
-
-/**
- * A Future
-
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-public class Future
-{
- private static final Logger log = Logger.getLogger(Future.class);
-
- private Object result;
-
- private boolean isException;
-
- private boolean resultSet;
-
- public synchronized Object getResult()
- {
- while (!resultSet)
- {
- try
- {
- wait();
- }
- catch (InterruptedException e)
- {
- log.warn("Thread interrupted", e);
- }
- }
- return result;
- }
-
- public synchronized void setResult(Object theResult)
- {
- result = theResult;
-
- resultSet = true;
-
- notify();
- }
-
- public synchronized void setException(Throwable t)
- {
- result = t;
-
- isException = true;
-
- resultSet = true;
-
- notify();
- }
-
- public boolean isException()
- {
- return isException;
- }
-}
-
More information about the jboss-cvs-commits
mailing list