[jboss-cvs] jboss-jms/src/main/org/jboss/jms/client/remoting ...
Timothy Fox
tim.fox at jboss.com
Mon Jul 17 13:14:44 EDT 2006
User: timfox
Date: 06/07/17 13:14:44
Modified: src/main/org/jboss/jms/client/remoting
CallbackManager.java MessageCallbackHandler.java
Added: src/main/org/jboss/jms/client/remoting
HandleMessageResponse.java
Log:
Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
Revision Changes Path
1.6 +7 -10 jboss-jms/src/main/org/jboss/jms/client/remoting/CallbackManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: CallbackManager.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/remoting/CallbackManager.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -b -r1.5 -r1.6
--- CallbackManager.java 24 Jun 2006 09:05:36 -0000 1.5
+++ CallbackManager.java 17 Jul 2006 17:14:44 -0000 1.6
@@ -21,12 +21,12 @@
*/
package org.jboss.jms.client.remoting;
+import java.util.List;
import java.util.Map;
import javax.management.MBeanServer;
-import org.jboss.jms.message.MessageProxy;
-import org.jboss.jms.server.endpoint.DeliveryRunnable;
+import org.jboss.jms.server.endpoint.ClientDelivery;
import org.jboss.jms.server.remoting.MessagingMarshallable;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.ServerInvocationHandler;
@@ -74,11 +74,11 @@
{
MessagingMarshallable mm = (MessagingMarshallable)ir.getParameter();
- DeliveryRunnable dr = (DeliveryRunnable)mm.getLoad();
+ ClientDelivery dr = (ClientDelivery)mm.getLoad();
int consumerID = dr.getConsumerID();
- MessageProxy del = dr.getMessageProxy();
+ List msgs = dr.getMessages();
MessageCallbackHandler handler =
(MessageCallbackHandler)callbackHandlers.get(new Integer(consumerID));
@@ -88,14 +88,11 @@
throw new IllegalStateException("Cannot find handler for consumer: " + consumerID);
}
- handler.handleMessage(del);
-
- return null;
+ return new MessagingMarshallable(mm.getVersion(), handler.handleMessage(msgs));
}
public void removeListener(InvokerCallbackHandler arg0)
{
-
}
public void setInvoker(ServerInvoker arg0)
1.71 +361 -458 jboss-jms/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: MessageCallbackHandler.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java,v
retrieving revision 1.70
retrieving revision 1.71
diff -u -b -r1.70 -r1.71
--- MessageCallbackHandler.java 26 Jun 2006 18:41:21 -0000 1.70
+++ MessageCallbackHandler.java 17 Jul 2006 17:14:44 -0000 1.71
@@ -26,6 +26,7 @@
import java.util.LinkedList;
import java.util.List;
+import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -33,18 +34,19 @@
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.message.MessageProxy;
+import org.jboss.jms.tx.AckInfo;
import org.jboss.logging.Logger;
+import org.jboss.messaging.util.Future;
import org.jboss.remoting.callback.HandleCallbackException;
-import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox/a>
- * @version <tt>$Revision: 1.70 $</tt>
+ * @version <tt>$Revision: 1.71 $</tt>
*
- * $Id: MessageCallbackHandler.java,v 1.70 2006/06/26 18:41:21 timfox Exp $
+ * $Id: MessageCallbackHandler.java,v 1.71 2006/07/17 17:14:44 timfox Exp $
*/
public class MessageCallbackHandler
{
@@ -52,10 +54,6 @@
private static final Logger log;
- //TODO Make configurable
- private static final int CLOSE_TIMEOUT = 20000;
-
-
// Static --------------------------------------------------------
private static boolean trace;
@@ -66,6 +64,9 @@
trace = log.isTraceEnabled();
}
+ //Hardcoded for now
+ private static final int MAX_REDELIVERIES = 10;
+
public static void callOnMessage(ConsumerDelegate cons,
SessionDelegate sess,
MessageListener listener,
@@ -77,9 +78,15 @@
{
preDeliver(sess, consumerID, m, isConnectionConsumer);
+ int tries = 0;
+
+ while (true)
+ {
try
{
listener.onMessage(m);
+
+ break;
}
catch (RuntimeException e)
{
@@ -91,15 +98,35 @@
if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
{
- // cancel the delivery - this means it will be immediately redelivered
- if (trace) { log.trace("cancelling " + id); }
- cons.cancelDelivery(id);
+ //We redeliver at certain number of times
+ if (tries < MAX_REDELIVERIES)
+ {
+ m.setJMSRedelivered(true);
+
+ //TODO delivery count although optional should be global
+ //so we need to send it back to the server
+ //but this has performance hit so perhaps we just don't support it?
+ m.incDeliveryCount();
+
+ tries++;
+ }
+ else
+ {
+ log.error("Max redeliveries has occurred for message: " + m.getJMSMessageID());
+
+ //TODO - Send to DLQ
+
+ break;
+ }
}
else
{
// Session is either transacted or CLIENT_ACKNOWLEDGE
// We just deliver next message
if (trace) { log.trace("ignoring exception on " + id); }
+
+ break;
+ }
}
}
@@ -116,7 +143,7 @@
// add anything to the tx for this session.
if (!isConnectionConsumer)
{
- sess.preDeliver(m.getMessage().getMessageID(), consumerID);
+ sess.preDeliver(m, consumerID);
}
}
@@ -130,151 +157,115 @@
// add anything to the tx for this session
if (!isConnectionConsumer)
{
- sess.postDeliver(m.getMessage().getMessageID(), consumerID);
+ sess.postDeliver(m, consumerID);
}
}
// Attributes ----------------------------------------------------
- protected LinkedList buffer;
+ private LinkedList buffer;
- protected SessionDelegate sessionDelegate;
+ private SessionDelegate sessionDelegate;
- protected ConsumerDelegate consumerDelegate;
+ private ConsumerDelegate consumerDelegate;
- protected int consumerID;
+ private int consumerID;
- protected boolean isConnectionConsumer;
+ private boolean isConnectionConsumer;
- protected volatile Thread receiverThread;
+ private volatile Thread receiverThread;
- protected MessageListener listener;
+ private MessageListener listener;
- protected int deliveryAttempts;
+ private int ackMode;
- protected int ackMode;
+ private boolean closed;
- // Executor used for executing onMessage methods - there is one per session
- protected QueuedExecutor onMessageExecutor;
+ private Object mainLock;
- // Executor for executing activateConsumer methods asynchronously, there is one pool per connection
- protected PooledExecutor activateConsumerExecutor;
+ private boolean serverSending;
- protected Object mainLock;
+ private int bufferSize;
- protected Object onMessageLock;
+ private QueuedExecutor sessionExecutor;
- protected boolean closed;
-
- protected volatile boolean closing;
-
- protected boolean gotLastMessage;
-
- //The id of the last message we received
- protected long lastMessageId = -1;
-
- protected volatile int activationCount;
-
- protected volatile boolean onMessageExecuting;
+ private boolean listenerRunning;
// Constructors --------------------------------------------------
- public MessageCallbackHandler(boolean isCC, int ackMode, QueuedExecutor onMessageExecutor,
- PooledExecutor activateConsumerExecutor,
- SessionDelegate sess, ConsumerDelegate cons, int consumerID)
+ public MessageCallbackHandler(boolean isCC, int ackMode,
+ SessionDelegate sess, ConsumerDelegate cons, int consumerID,
+ int bufferSize, QueuedExecutor sessionExecutor)
{
+ if (bufferSize < 1)
+ {
+ throw new IllegalArgumentException(this + " bufferSize must be > 0");
+ }
+
+ this.bufferSize = bufferSize;
+
buffer = new LinkedList();
isConnectionConsumer = isCC;
this.ackMode = ackMode;
- this.onMessageExecutor = onMessageExecutor;
-
- this.activateConsumerExecutor = activateConsumerExecutor;
-
this.sessionDelegate = sess;
this.consumerDelegate = cons;
this.consumerID = consumerID;
+ this.serverSending = true;
+
mainLock = new Object();
- onMessageLock = new Object();
+ this.sessionExecutor = sessionExecutor;
}
// Public --------------------------------------------------------
- public void handleMessage(MessageProxy md) throws HandleCallbackException
- {
- if (trace) { log.trace("receiving message " + md + " from the remoting layer"); }
- md = processMessage(md);
+ /**
+ * Handles a list of messages sent from the server
+ * @param msgs The list of messages
+ * @return The number of messages handled (placeholder for future - now we always accept all messages)
+ * or -1 if closed
+ */
+ public HandleMessageResponse handleMessage(List msgs) throws HandleCallbackException
+ {
+ if (trace) { log.trace(this + " receiving " + msgs.size() + " messages from the remoting layer"); }
synchronized (mainLock)
{
if (closed)
{
- //Sanity check
- //This should never happen
- //Part of the close procedure is to ensure that no more messages will be sent
- //If this happens it implies the close() procedure is not functioning correctly
- throw new IllegalStateException("Message has arrived after consumer is closed!");
+ //Ignore
+ return new HandleMessageResponse(false, 0);
}
- if (closing && gotLastMessage)
- {
- //Sanity check - this should never happen
- //No messages should arrive after the last one sent by the server consumer endpoint
- throw new IllegalStateException("Message has arrived after we have received the last one");
- }
+ //Put the messages in the buffer
+ //And notify any waiting receive()
- // We record the last message we received
- this.lastMessageId = md.getMessage().getMessageID();
+ processMessages(msgs);
- if (listener != null)
- {
- // Queue the message to be delivered by the session
- ClientDeliveryRunnable cdr = new ClientDeliveryRunnable(md);
+ buffer.addAll(msgs);
- onMessageExecuting = true;
+ if (trace) { log.trace(this + " added messages to the buffer"); }
- try
- {
- onMessageExecutor.execute(cdr);
- }
- catch (InterruptedException e)
- {
- //This should never happen
- throw new IllegalStateException("Thread interrupted in client delivery executor");
- }
- }
- else
- {
- //Put the message in the buffer
- //And notify any waiting receive()
- //On close any remaining messages will be cancelled
- //We do not wait for the message to be received before returning
+ boolean full = buffer.size() >= bufferSize;
- buffer.add(md);
- }
+ if (trace) { log.trace(this + " receiving messages from the remoting layer"); }
- if (closing)
- {
- //If closing then we may have the close() thread waiting for the last message as well as a receive
- //thread
- mainLock.notifyAll();
- }
- else
- {
- //Otherwise we will only have at most one receive thread waiting
- //We don't want to do notifyAll in both cases since notifyAll can have a perf penalty
- if (receiverThread != null)
+ messagesAdded();
+
+ if (full)
{
- mainLock.notify();
- }
+ serverSending = false;
}
+
+ //For now we always accept all messages - in the future this may change
+ return new HandleMessageResponse(full, msgs.size());
}
}
@@ -282,35 +273,24 @@
{
synchronized (mainLock)
{
- if (closed)
- {
- throw new JMSException("Cannot set MessageListener - consumer is closed");
- }
-
- // JMS consumer is single threaded, so it shouldn't be possible to set a MessageListener
- // while another thread is receiving
-
if (receiverThread != null)
{
// Should never happen
- throw new javax.jms.IllegalStateException("Consumer is currently in receive(..) Cannot set MessageListener");
+ throw new IllegalStateException("Consumer is currently in receive(..) Cannot set MessageListener");
}
- synchronized (onMessageLock)
- {
this.listener = listener;
- }
- log.debug("installed listener " + listener);
-
- activateConsumer();
+ if (!buffer.isEmpty())
+ {
+ listenerRunning = true;
+ this.queueRunner(new ListenerRunner());
+ }
}
}
public void close() throws JMSException
{
- try
- {
synchronized (mainLock)
{
log.debug(this + " closing");
@@ -320,107 +300,60 @@
return;
}
- closing = true;
-
- //We wait for any activation in progress to complete and the resulting message
- //(if any) to be returned and processed.
- //The ensures a clean, gracefully closure of the client side consumer, without
- //any messages in transit which might arrive after the consumer is closed and which
- //subsequently might be cancelled out of sequence causing message ordering problems
-
- if (activationCount > 0)
- {
- long waitTime = CLOSE_TIMEOUT;
-
- while (activationCount > 0 && waitTime > 0)
- {
- waitTime = waitOnLock(mainLock, waitTime);
- }
-
- if (activationCount > 0)
- {
- log.warn("Timed out waiting for activations to complete");
- }
- }
-
- //Now we know there are no activations in progress but the consumer may still be active so we call
- //deactivate which returns the id of the last message we should have received
- //if we have received this message then we know there is no possibility of any message still in
- //transit and we can close down with confidence
- //otherwise we wait for this message and timeout if it doesn't arrive which might be the case
- //if the connection to the server has been lost
-
- long lastMessageIDToExpect = deactivateConsumer();
-
- if (lastMessageIDToExpect != -1)
- {
- long waitTime = CLOSE_TIMEOUT;
-
- while (lastMessageIDToExpect != lastMessageId && waitTime > 0)
- {
- waitTime = waitOnLock(mainLock, waitTime);
- }
+ closed = true;
- if (lastMessageIDToExpect != lastMessageId)
+ if (receiverThread != null)
{
- log.warn("Timed out waiting for last message to arrive, last=" + lastMessageId +" expected=" + lastMessageIDToExpect);
- }
- }
-
- //We set this even if we timed out waiting since we do not want any more to arrive now
- gotLastMessage = true;
-
//Wake up any receive() thread that might be waiting
- if (trace) { log.trace("Notifying main lock"); }
mainLock.notify();
- if (trace) { log.trace("Notified main lock"); }
+ }
- //Now make sure that any onMessage of a listener has finished executing
+ //Wait for any on message executions to complete
- long waitTime = CLOSE_TIMEOUT;
+ Future result = new Future();
- synchronized (onMessageLock)
- {
- while (onMessageExecuting && waitTime > 0)
+ try
{
- waitTime = waitOnLock(onMessageLock, waitTime);
+ this.sessionExecutor.execute(new Closer(result));
+
+ result.getResult();
}
- if (onMessageExecuting)
+ catch (InterruptedException e)
{
- //Timed out waiting for last onMessage to be processed
- log.warn("Timed out waiting for last onMessage to be executed");
- }
+ log.warn("Thread interrupted", e);
}
- //Now we know that all messages have been received and processed
+ //Now we cancel anything left in the buffer
+ //The reason we do this now is that otherwise the deliveries wouldn't get cancelled
+ //until session close (since we don't cancel consumer's deliveries until then)
+ //which is too late - since we need to preserve the order of messages delivered in a session.
if (!buffer.isEmpty())
{
//Now we cancel any deliveries that might be waiting in our buffer
+ //This is because, otherwise the messages wouldn't get cancelled until
+ //the corresponding session died.
+ //So if another consumer in another session tried to consume from the channel
+ //before that session died it wouldn't receive those messages
Iterator iter = buffer.iterator();
- List ids = new ArrayList();
+ List ackInfos = new ArrayList();
while (iter.hasNext())
{
MessageProxy mp = (MessageProxy)iter.next();
- ids.add(new Long(mp.getMessage().getMessageID()));
- }
- cancelDeliveries(ids);
- }
-
- //Now we are done
- listener = null;
+ AckInfo ack = new AckInfo(mp, consumerID);
- receiverThread = null;
+ ackInfos.add(ack);
- closed = true;
}
+
+ sessionDelegate.cancelDeliveries(ackInfos);
+
+ buffer.clear();
}
- catch (InterruptedException e)
- {
- //Ignore
}
+
if (trace) { log.trace(this + " closed"); }
}
@@ -434,11 +367,13 @@
*/
public MessageProxy receive(long timeout) throws JMSException
{
+ MessageProxy m = null;
+
synchronized (mainLock)
{
if (trace) { log.trace(this + " receiving, timeout = " + timeout); }
- if (closed || closing)
+ if (closed)
{
//If consumer is closed or closing calling receive returns null
return null;
@@ -453,8 +388,6 @@
long startTimestamp = System.currentTimeMillis();
- MessageProxy m = null;
-
try
{
while(true)
@@ -511,7 +444,7 @@
{
if (trace) { log.trace("message " + m + " is not expired, pushing it to the caller"); }
- return m;
+ break;
}
if (trace)
@@ -524,11 +457,6 @@
{
timeout -= System.currentTimeMillis() - startTimestamp;
}
-
- if (closing)
- {
- return null;
- }
}
}
finally
@@ -536,21 +464,44 @@
receiverThread = null;
}
}
+
+ //This needs to be outside the lock
+ if (buffer.isEmpty() && !serverSending)
+ {
+ //The server has previously stopped sending because the buffer was full
+ //but now it is empty, so we tell the server to start sending again
+ consumerDelegate.more();
+ }
+
+ return m;
}
+
public MessageListener getMessageListener()
{
- synchronized (onMessageLock)
- {
return listener;
}
- }
public String toString()
{
return "MessageCallbackHandler[" + consumerID + "]";
}
+ public int getConsumerId()
+ {
+ return consumerID;
+ }
+
+ public void addToFrontOfBuffer(MessageProxy proxy)
+ {
+ synchronized (mainLock)
+ {
+ buffer.addFirst(proxy);
+
+ messagesAdded();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -576,87 +527,22 @@
}
}
- protected void cancelDeliveries(List ids)
- {
- try
- {
- consumerDelegate.cancelDeliveries(ids);
- }
- catch (Exception e)
- {
- String msg = "Failed to cancel deliveries";
- log.warn(msg, e);
- }
- }
-
- protected void activateConsumer() throws JMSException
- {
- // We execute this on a separate thread to avoid the case where the asynchronous delivery
- // arrives before we have returned from the synchronus call, which would cause us to lose
- // the message.
-
- try
- {
- if (trace) { log.trace("initiating consumer endpoint activation"); }
- activationCount++;
- activateConsumerExecutor.execute(new ConsumerActivationRunnable());
- }
- catch (InterruptedException e)
- {
- // This should never happen
- throw new IllegalStateException("Activation executor thread interrupted");
- }
- }
-
- protected long deactivateConsumer() throws JMSException
- {
- return consumerDelegate.deactivate();
- }
-
- protected MessageProxy getMessageNow() throws JMSException
- {
- MessageProxy del = (MessageProxy)consumerDelegate.getMessageNow(false);
-
- if (del != null)
- {
- //We record the id of the last message delivered
- //No need to notify here since this will never be called while we
- //are closing
- lastMessageId = del.getMessage().getMessageID();
-
- return processMessage(del);
- }
- else
- {
- return null;
- }
- }
-
protected MessageProxy getMessage(long timeout) throws JMSException
{
- MessageProxy m = null;
-
- // If it's receiveNoWait then get the message directly
if (timeout == -1)
{
- m = getMessageNow();
+ //receiveNoWait so don't wait
}
else
{
- // ... otherwise we activate the server side consumer and wait for a message to arrive
- // asynchronously
- activateConsumer();
-
try
{
if (timeout == 0)
{
//Wait for ever potentially
- while (!closing && buffer.isEmpty())
+ while (!closed && buffer.isEmpty())
{
- if (trace) { log.trace("waiting on main lock"); }
mainLock.wait();
- if (trace) { log.trace("done waiting on main lock"); }
}
}
else
@@ -664,166 +550,182 @@
//Wait with timeout
long toWait = timeout;
- while (!closing && buffer.isEmpty() && toWait > 0)
+ while (!closed && buffer.isEmpty() && toWait > 0)
{
+ if (trace) { log.trace("Waiting on lock"); }
toWait = waitOnLock(mainLock, toWait);
+ if (trace) { log.trace("Done waiting on lock, empty?" + buffer.isEmpty()); }
}
}
-
- if (closing)
+ }
+ catch (InterruptedException e)
{
- m = null;
+ return null;
}
- else
+ }
+
+ if (closed)
{
+ return null;
+ }
+
+ MessageProxy m = null;
+
if (!buffer.isEmpty())
{
m = (MessageProxy)buffer.removeFirst();
+
+ if (trace) { log.trace("Got message:" + m); }
}
else
{
m = null;
}
- }
- }
- catch (InterruptedException e)
- {
- //interrupting receive thread should make it return null
- m = null;
- }
- finally
- {
- // We only need to call this if we timed out
- if (m == null)
- {
- deactivateConsumer();
- }
- }
- }
return m;
}
- protected MessageProxy processMessage(MessageProxy del)
+ protected void processMessages(List msgs)
{
+ Iterator iter = msgs.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageProxy msg = (MessageProxy)iter.next();
+
//if this is the handler for a connection consumer we don't want to set the session delegate
//since this is only used for client acknowledgement which is illegal for a session
//used for an MDB
- if (!this.isConnectionConsumer)
- {
- del.setSessionDelegate(sessionDelegate);
- }
- del.setReceived();
+ msg.setSessionDelegate(sessionDelegate, isConnectionConsumer);
- return del;
+ msg.setReceived();
+ }
}
// Private -------------------------------------------------------
- // Inner classes -------------------------------------------------
-
- private class ClientDeliveryRunnable implements Runnable
+ private void queueRunner(ListenerRunner runner)
{
- private MessageProxy message;
-
- private ClientDeliveryRunnable(MessageProxy message)
+ try
{
- this.message = message;
+ this.sessionExecutor.execute(runner);
}
-
- public void run()
+ catch (InterruptedException e)
{
- // We synchronize here to prevent the message listener being set with a different one
- // between callOnMessage and activate being called
- synchronized (onMessageLock)
+ log.warn("Thread interrupted", e);
+ }
+ }
+
+ private void messagesAdded()
{
- if (closed)
+ //If we have a thread waiting on receive() we notify it
+ if (receiverThread != null)
{
- // Sanity check. This should never happen. Part of the close procedure is to ensure
- // there are no messages in the executor queue for delivery to the MessageListener.
- // If this happens it implies the close() procedure is not working properly.
- throw new IllegalStateException("Calling onMessage() but the consumer is closed!");
+ if (trace) { log.trace(this + " notifying receiver thread"); }
+ mainLock.notify();
}
- else
+ else if (listener != null)
{
- try
- {
- MessageCallbackHandler.callOnMessage(consumerDelegate, sessionDelegate, listener,
- consumerID, isConnectionConsumer, message, ackMode);
- if (!closing)
+ //We have a message listener
+ if (!listenerRunning)
{
- consumerDelegate.activate();
+ listenerRunning = true;
+ this.queueRunner(new ListenerRunner());
}
- onMessageExecuting = false;
-
- //The close() thread may be waiting for us to finish executing, so wake it up
- onMessageLock.notify();
- }
- catch (JMSException e)
- {
- log.error("Failed to deliver message", e);
+ //TODO - Execute onMessage on same thread for even better throughput
}
}
+
+ // Inner classes -------------------------------------------------
+
+ /*
+ * This class is used to put on the listener executor to wait for onMessage
+ * invocations to complete when closing
+ */
+ private class Closer implements Runnable
+ {
+ Future result;
+
+ Closer(Future result)
+ {
+ this.result = result;
}
+
+ public void run()
+ {
+ result.setResult(null);
}
}
- private class ConsumerActivationRunnable implements Runnable
+ /*
+ * This class handles the execution of onMessage methods
+ */
+ private class ListenerRunner implements Runnable
{
public void run()
{
- try
- {
- // We always try and return the message immediately, if available. This prevents an
- // extra network call to deliver the message. If the message is not available,
- // the consumer will stay active and the message will delivered asynchronously (pushed)
- // (that is what the boolean param is for)
+ MessageProxy mp = null;
- if (trace) { log.trace("activation runnable running, getting message now"); }
+ boolean again = false;
- try
+ synchronized (mainLock)
{
- MessageProxy m = (MessageProxy)consumerDelegate.getMessageNow(true);
-
- if (trace) { log.trace("got message " + m); }
+ //remove a message from the buffer
- if (m != null)
+ if (buffer.isEmpty())
{
- if (trace) { log.trace("handling " + m); }
- handleMessage(m);
+ listenerRunning = false;
}
+ else
+ {
+ mp = (MessageProxy)buffer.removeFirst();
- if (trace) { log.trace("activation runnable done"); }
+ if (mp == null)
+ {
+ throw new java.lang.IllegalStateException("Cannot find message in buffer!");
}
- finally
+
+ again = !buffer.isEmpty();
+
+ if (!again)
{
- activationCount--;
- // closing is volatile so we don't have to do the check inside the synchronized
- // (mainLock) {} block which should aid concurrency
- if (closing)
+ listenerRunning = false;
+ }
+ }
+ }
+
+ if (mp != null)
{
- synchronized (mainLock)
+ try
{
- mainLock.notifyAll();
+ callOnMessage(consumerDelegate, sessionDelegate, listener, consumerID, false, mp, ackMode);
}
+ catch (JMSException e)
+ {
+ log.error("Failed to deliver message", e);
}
}
+
+ if (again)
+ {
+ //Queue it up again
+ queueRunner(this);
}
- catch(Throwable t)
+ else
{
- log.error("Consumer endpoint activation failed", t);
- if (t.getCause() != null)
+ if (!serverSending)
{
- log.error("Cause:" + t.getCause());
- }
+ //Ask server for more messages
try
{
- close();
+ consumerDelegate.more();
}
catch (JMSException e)
{
- log.error("Failed to close consumer", e);
+ log.error("Failed to execute more()", e);
+ }
+ return;
}
}
}
@@ -831,3 +733,4 @@
}
+
1.1 date: 2006/07/17 17:14:44; author: timfox; state: Exp;jboss-jms/src/main/org/jboss/jms/client/remoting/HandleMessageResponse.java
Index: HandleMessageResponse.java
===================================================================
/*
* JBoss, Home of Professional Open Source
* Copyright 2005, JBoss Inc., and individual contributors as indicated
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.jms.client.remoting;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* A HandleMessageResponse
*
* This is the response the server gets after delivering messages to a client consumer
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id: HandleMessageResponse.java,v 1.1 2006/07/17 17:14:44 timfox Exp $
*
*/
public class HandleMessageResponse implements Externalizable
{
private static final long serialVersionUID = 2500443290413453569L;
private boolean full;
private int messagesAccepted;
public HandleMessageResponse()
{
}
public HandleMessageResponse(boolean full, int messagesAccepted)
{
this.full = full;
this.messagesAccepted = messagesAccepted;
}
public boolean clientIsFull()
{
return full;
}
public int getNumberAccepted()
{
return messagesAccepted;
}
// Externalizable implementation
// ---------------------------------------------------------------
public void writeExternal(ObjectOutput out) throws IOException
{
out.writeBoolean(full);
out.writeInt(messagesAccepted);
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
full = in.readBoolean();
messagesAccepted = in.readInt();
}
}
More information about the jboss-cvs-commits
mailing list