[jboss-cvs] jboss-jms/src/main/org/jboss/jms/client/container ...
Timothy Fox
tim.fox at jboss.com
Mon Jul 17 13:14:44 EDT 2006
User: timfox
Date: 06/07/17 13:14:44
Modified: src/main/org/jboss/jms/client/container AsfAspect.java
ConsumerAspect.java ProducerAspect.java
SessionAspect.java StateCreationAspect.java
TransactionAspect.java
Log:
Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
Revision Changes Path
1.12 +4 -2 jboss-jms/src/main/org/jboss/jms/client/container/AsfAspect.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: AsfAspect.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/container/AsfAspect.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -b -r1.11 -r1.12
--- AsfAspect.java 6 May 2006 05:05:39 -0000 1.11
+++ AsfAspect.java 17 Jul 2006 17:14:44 -0000 1.12
@@ -51,7 +51,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
*
- * $Id: AsfAspect.java,v 1.11 2006/05/06 05:05:39 ovidiu Exp $
+ * $Id: AsfAspect.java,v 1.12 2006/07/17 17:14:44 timfox Exp $
*/
public class AsfAspect
{
@@ -162,7 +162,9 @@
if (trace) { log.trace("sending " + holder.msg + " to the message listener" ); }
- MessageCallbackHandler.callOnMessage(holder.consumerDelegate, del, sessionListener, holder.consumerID, false, holder.msg, ackMode);
+ MessageCallbackHandler.callOnMessage(holder.consumerDelegate, del,
+ sessionListener, holder.consumerID, false,
+ holder.msg, ackMode);
}
return null;
1.13 +34 -10 jboss-jms/src/main/org/jboss/jms/client/container/ConsumerAspect.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ConsumerAspect.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/container/ConsumerAspect.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -b -r1.12 -r1.13
--- ConsumerAspect.java 6 May 2006 05:05:39 -0000 1.12
+++ ConsumerAspect.java 17 Jul 2006 17:14:44 -0000 1.13
@@ -24,13 +24,17 @@
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.remoting.MessageCallbackHandler;
import org.jboss.jms.client.remoting.CallbackManager;
+import org.jboss.jms.client.remoting.MessageCallbackHandler;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.state.ConsumerState;
import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.server.endpoint.ServerBrowserEndpoint;
+import org.jboss.logging.Logger;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
*
@@ -40,14 +44,17 @@
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.12 $</tt>
+ * @version <tt>$Revision: 1.13 $</tt>
*
- * $Id: ConsumerAspect.java,v 1.12 2006/05/06 05:05:39 ovidiu Exp $
+ * $Id: ConsumerAspect.java,v 1.13 2006/07/17 17:14:44 timfox Exp $
*/
public class ConsumerAspect
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ConsumerAspect.class);
+
+
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
@@ -71,32 +78,50 @@
SessionDelegate sessionDelegate = (SessionDelegate)invocation.getTargetObject();
ConsumerState consumerState = (ConsumerState)((DelegateSupport)consumerDelegate).getState();
int consumerID = consumerState.getConsumerID();
+ int prefetchSize = consumerState.getPrefetchSize();
+ QueuedExecutor sessionExecutor = sessionState.getExecutor();
MessageCallbackHandler messageHandler =
new MessageCallbackHandler(isCC, sessionState.getAcknowledgeMode(),
- sessionState.getExecutor(), connectionState.getPooledExecutor(),
- sessionDelegate, consumerDelegate, consumerID);
+ sessionDelegate, consumerDelegate, consumerID,
+ prefetchSize, sessionExecutor);
+
+ sessionState.addCallbackHandler(messageHandler);
CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
cm.registerHandler(consumerID, messageHandler);
consumerState.setMessageCallbackHandler(messageHandler);
+ //Now we have finished creating the client consumer, we can tell the SCD
+ //we are ready
+ consumerDelegate.more();
+
return consumerDelegate;
}
public Object handleClosing(Invocation invocation) throws Throwable
{
+ //First we make sure closing is called on the ServerConsumerEndpoint
+ //This ensures that any in transit messages are flushed out to the client side
+ Object res = invocation.invokeNext();
+
ConsumerState consumerState = getState(invocation);
- ConnectionState connectionState = (ConnectionState)consumerState.getParent().getParent();
+ SessionState sessionState = (SessionState)consumerState.getParent();
+
+ ConnectionState connectionState = (ConnectionState)sessionState.getParent();
+ //Then we call close on the messagecallbackhandler which waits for onMessage invocations
+ //to complete and then cancels anything in the client buffer
consumerState.getMessageCallbackHandler().close();
+ sessionState.removeCallbackHandler(consumerState.getMessageCallbackHandler());
+
CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
cm.unregisterHandler(consumerState.getConsumerID());
- return invocation.invokeNext();
+ return res;
}
public Object handleGetDestination(Invocation invocation) throws Throwable
@@ -114,7 +139,6 @@
return getState(invocation).getSelector();
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
1.18 +18 -2 jboss-jms/src/main/org/jboss/jms/client/container/ProducerAspect.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ProducerAspect.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/container/ProducerAspect.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -b -r1.17 -r1.18
--- ProducerAspect.java 6 May 2006 05:05:39 -0000 1.17
+++ ProducerAspect.java 17 Jul 2006 17:14:44 -0000 1.18
@@ -53,9 +53,9 @@
* This aspect is PER_INSTANCE.
*
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
- * @version <tt>$Revision: 1.17 $</tt>
+ * @version <tt>$Revision: 1.18 $</tt>
*
- * $Id: ProducerAspect.java,v 1.17 2006/05/06 05:05:39 ovidiu Exp $
+ * $Id: ProducerAspect.java,v 1.18 2006/07/17 17:14:44 timfox Exp $
*/
public class ProducerAspect
{
@@ -169,6 +169,8 @@
JBossMessage messageToSend;
boolean foreign = false;
+ boolean doCopy = false;
+
if (!(m instanceof MessageProxy))
{
// it's a foreign message
@@ -211,6 +213,12 @@
// get the actual message
MessageProxy proxy = (MessageProxy)m;
messageToSend = proxy.getMessage();
+
+ if (proxy.isSent())
+ {
+ doCopy = true;
+ }
+
messageToSend.doAfterSend();
proxy.setSent();
}
@@ -224,6 +232,14 @@
messageToSend.setJMSMessageID(null);
messageToSend.setMessageId(id);
+ //If the message has already been sent we need to make a shallow copy since if we are invm then we do not
+ //want to change the ids of messages already sent - which would happen if we were sending the same
+ //underlying instance
+ if (doCopy)
+ {
+ messageToSend = messageToSend.doShallowCopy();
+ }
+
// now that we know the messageID, set it also on the foreign message, if is the case
if (foreign)
{
1.10 +181 -14 jboss-jms/src/main/org/jboss/jms/client/container/SessionAspect.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: SessionAspect.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/container/SessionAspect.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -b -r1.9 -r1.10
--- SessionAspect.java 27 Apr 2006 17:38:09 -0000 1.9
+++ SessionAspect.java 17 Jul 2006 17:14:44 -0000 1.10
@@ -21,14 +21,20 @@
*/
package org.jboss.jms.client.container;
+import java.util.LinkedList;
+import java.util.List;
+
import javax.jms.IllegalStateException;
import javax.jms.Session;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.client.delegate.DelegateSupport;
+import org.jboss.jms.client.remoting.MessageCallbackHandler;
+import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.message.MessageProxy;
+import org.jboss.jms.tx.AckInfo;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.Util;
@@ -39,7 +45,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
*
- * $Id: SessionAspect.java,v 1.9 2006/04/27 17:38:09 ovidiu Exp $
+ * $Id: SessionAspect.java,v 1.10 2006/07/17 17:14:44 timfox Exp $
*/
public class SessionAspect
{
@@ -57,47 +63,208 @@
// Public --------------------------------------------------------
+ public Object handlePreDeliver(Invocation invocation) throws Throwable
+ {
+ MethodInvocation mi = (MethodInvocation)invocation;
+
+ SessionState state = getState(invocation);
+
+ int ackMode = state.getAcknowledgeMode();
+
+ if (ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == Session.AUTO_ACKNOWLEDGE ||
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ {
+ SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+
+ //We store the ack in a list for later acknowledgement or recovery
+
+ Object[] args = mi.getArguments();
+
+ MessageProxy mp = (MessageProxy)args[0];
+
+ int consumerID = ((Integer)args[1]).intValue();
+
+ AckInfo info = new AckInfo(mp, consumerID);
+
+ state.getToAck().add(info);
+
+ if (trace) { log.trace("ack mode is " + Util.acknowledgmentModeToString(ackMode)+ ", acknowledged on " + del); }
+ }
+
+ return invocation.invokeNext();
+ }
+
+ public Object handleAcknowledgeAll(Invocation invocation) throws Throwable
+ {
+ MethodInvocation mi = (MethodInvocation)invocation;
+
+ SessionState state = getState(invocation);
+
+ SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+
+ if (!state.getToAck().isEmpty())
+ {
+ del.acknowledgeBatch(state.getToAck());
+
+ state.getToAck().clear();
+ }
+
+ return null;
+ }
public Object handlePostDeliver(Invocation invocation) throws Throwable
{
MethodInvocation mi = (MethodInvocation)invocation;
- int ackMode = getState(invocation).getAcknowledgeMode();
+ SessionState state = getState(invocation);
+
+ int ackMode = state.getAcknowledgeMode();
- if (ackMode != Session.SESSION_TRANSACTED && ackMode != Session.CLIENT_ACKNOWLEDGE)
+ if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
{
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
//We acknowledge immediately
- del.acknowledge();
+
+ if (!state.isRecoverCalled())
+ {
+ //We don't acknowledge the message if recover() was called
+
+ Object[] args = mi.getArguments();
+
+ MessageProxy proxy = (MessageProxy)args[0];
+
+ int consumerID = ((Integer)args[1]).intValue();
+
+ AckInfo ack = new AckInfo(proxy, consumerID);
+
+ del.acknowledge(ack);
+
+ state.getToAck().clear();
+ }
+ else
+ {
+ state.setRecoverCalled(false);
+ }
if (trace) { log.trace("ack mode is " + Util.acknowledgmentModeToString(ackMode)+ ", acknowledged on " + del); }
}
return null;
}
-
+ /*
+ * Called when session.recover is called
+ */
public Object handleRecover(Invocation invocation) throws Throwable
{
if (trace) { log.trace("recover called"); }
- int ackMode = getState(invocation).getAcknowledgeMode();
+ MethodInvocation mi = (MethodInvocation)invocation;
+
+ SessionState state = getState(invocation);
+
+ int ackMode = state.getAcknowledgeMode();
if (ackMode == Session.SESSION_TRANSACTED)
{
throw new IllegalStateException("Cannot recover a transacted session");
}
- //Tell the server to redeliver any un-acked messages
- if (trace) { log.trace("redelivering messages"); }
+ if (trace) { log.trace("recovering the session"); }
+
+ //Call redeliver
+ SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+
+ del.redeliver(state.getToAck());
+
+ state.getToAck().clear();
+
+ state.setRecoverCalled(true);
+
+ return null;
+ }
+
+ /*
+ * Redelivery occurs in two situations:
+ * 1) When session.recover() is called (JMS1.1 4.4.11)
+ * "A sessions recover method is used to stop a session and restart it with its first
+ * unacknowledged message. In effect, the sessions series of delivered messages
+ * is reset to the point after its last acknowledged message."
+ * An important note here is that session recovery is LOCAL to the session.
+ * Session recovery DOES NOT result in delivered messages being cancelled back
+ * to the channel where they can be redelivered - since that may result in them being
+ * picked up by another session, which would break the semantics of recovery as described
+ * in the spec.
+ * 2) When session rollback occurs (JMS1.1 4.4.7)
+ * On rollback of a session the spec is clear that session recovery occurs:
+ * "If a transaction rollback is done, its produced messages
+ * are destroyed and its consumed messages are automatically recovered. For
+ * more information on session recovery, see Section 4.4.11 Message
+ * Acknowledgment."
+ * So on rollback we do session recovery (local redelivery) in the same as if
+ * session.recover() was called.
+ *
+ * There is a conflict here though. It seems a CTS test requires messages to be available to OTHER
+ * sessions on rollback - see CTSMiscellaneousTest.testContestedQueueOnRollback()
+ * Which seems in direct contradiction to the spec.
+ *
+ * In order to satisfy the test, on session recovery, if there are no local consumers available
+ * to consume the message, we cancel the message back to the channel.
+ */
+ public Object handleRedeliver(Invocation invocation) throws Throwable
+ {
+ if (trace) { log.trace("redeliver called"); }
MethodInvocation mi = (MethodInvocation)invocation;
- SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+ SessionState state = getState(invocation);
- if (trace) { log.trace("Calling sessiondelegate.redeliver()"); }
+ //We put the messages back in the front of their appropriate consumer buffers and
+ //set JMSRedelivered to true
- del.cancelDeliveries();
+ List toRedeliver = (List)mi.getArguments()[0];
+
+ LinkedList toCancel = new LinkedList();
+
+ //Need to be recovered in reverse order
+ for (int i = toRedeliver.size() - 1; i >= 0; i--)
+ {
+ AckInfo info = (AckInfo)toRedeliver.get(i);
+
+ MessageProxy proxy = info.getMessage();
+
+ proxy.setJMSRedelivered(true);
+
+ //TODO delivery count although optional should be global
+ //so we need to send it back to the server
+ //but this has performance hit so perhaps we just don't support it?
+ proxy.incDeliveryCount();
+
+ MessageCallbackHandler handler = state.getCallbackHandler(info.getConsumerID());
+
+ if (handler == null)
+ {
+ //This is ok.
+
+ //The original consumer has closed, this message wil get cancelled
+ //back to the channel
+
+ toCancel.addFirst(info);
+ }
+ else
+ {
+ handler.addToFrontOfBuffer(proxy);
+ }
+ }
+
+ if (!toCancel.isEmpty())
+ {
+ //Cancel the messages that can't be redelivered locally
+
+ SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+
+ del.cancelDeliveries(toCancel);
+ }
return null;
}
1.18 +9 -4 jboss-jms/src/main/org/jboss/jms/client/container/StateCreationAspect.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateCreationAspect.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/container/StateCreationAspect.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -b -r1.17 -r1.18
--- StateCreationAspect.java 23 May 2006 18:25:07 -0000 1.17
+++ StateCreationAspect.java 17 Jul 2006 17:14:44 -0000 1.18
@@ -26,6 +26,7 @@
import org.jboss.aop.Advised;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.aop.metadata.SimpleMetaData;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.ClientProducerDelegate;
@@ -58,7 +59,7 @@
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
* @author <a href="mailto:ovidiu at jboss.org>Ovidiu Feodorov</a>
*
- * $Id: StateCreationAspect.java,v 1.17 2006/05/23 18:25:07 ovidiu Exp $
+ * $Id: StateCreationAspect.java,v 1.18 2006/07/17 17:14:44 timfox Exp $
*/
public class StateCreationAspect
{
@@ -131,13 +132,17 @@
boolean noLocal = ((Boolean)mi.getArguments()[2]).booleanValue();
boolean connectionConsumer = ((Boolean)mi.getArguments()[4]).booleanValue();
+ SimpleMetaData md = ((Advised)consumerDelegate)._getInstanceAdvisor().getMetaData();
+
int consumerID =
- ((Integer)((Advised)consumerDelegate)._getInstanceAdvisor().getMetaData().
- getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID)).intValue();
+ ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID)).intValue();
+
+ int prefetchSize =
+ ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.PREFETCH_SIZE)).intValue();
ConsumerState consumerState =
new ConsumerState(sessionState, consumerDelegate, dest, selector,
- noLocal, consumerID, connectionConsumer);
+ noLocal, consumerID, connectionConsumer, prefetchSize);
delegate.setState(consumerState);
return consumerDelegate;
1.15 +21 -8 jboss-jms/src/main/org/jboss/jms/client/container/TransactionAspect.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: TransactionAspect.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/container/TransactionAspect.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -b -r1.14 -r1.15
--- TransactionAspect.java 6 May 2006 05:05:39 -0000 1.14
+++ TransactionAspect.java 17 Jul 2006 17:14:44 -0000 1.15
@@ -32,8 +32,10 @@
import org.jboss.jms.client.state.HierarchicalState;
import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.delegate.ConnectionDelegate;
+import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.tx.AckInfo;
import org.jboss.jms.tx.LocalTx;
+import org.jboss.jms.tx.TxState;
/**
* This aspect handles transaction related logic
@@ -42,7 +44,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
*
- * $Id: TransactionAspect.java,v 1.14 2006/05/06 05:05:39 ovidiu Exp $
+ * $Id: TransactionAspect.java,v 1.15 2006/07/17 17:14:44 timfox Exp $
*/
public class TransactionAspect
{
@@ -124,6 +126,13 @@
ConnectionState connState = (ConnectionState)state.getParent();
ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
+ TxState tx = connState.getResourceManager().getTx(state.getCurrentTxId());
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx:" + state.getCurrentTxId());
+ }
+
try
{
connState.getResourceManager().rollbackLocal((LocalTx)state.getCurrentTxId(), conn);
@@ -179,10 +188,14 @@
{
MethodInvocation mi = (MethodInvocation)invocation;
- long messageID = ((Long)mi.getArguments()[0]).longValue();
+ MessageProxy proxy = (MessageProxy)mi.getArguments()[0];
+
+ //long messageID = proxy.getMessage().getMessageID();
int consumerID = ((Integer)mi.getArguments()[1]).intValue();
+ AckInfo info = new AckInfo(proxy, consumerID);
+
Object txID = state.getCurrentTxId();
if (txID == null)
@@ -194,7 +207,7 @@
//Add the acknowledgement to the transaction
- connState.getResourceManager().addAck(txID, new AckInfo(messageID, consumerID));
+ connState.getResourceManager().addAck(txID, info);
}
return null;
More information about the jboss-cvs-commits
mailing list