[Jboss-cvs] JBoss Messaging SVN: r1317 - in	branches/Branch_1_0/src/main/org/jboss/jms: client/container	client/state tx
    jboss-cvs-commits at lists.jboss.org 
    jboss-cvs-commits at lists.jboss.org
       
    Tue Sep 19 00:12:52 EDT 2006
    
    
  
Author: ovidiu.feodorov at jboss.com
Date: 2006-09-19 00:12:49 -0400 (Tue, 19 Sep 2006)
New Revision: 1317
Modified:
   branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_1_0/src/main/org/jboss/jms/tx/MessagingXAResource.java
Log:
fix for http://jira.jboss.org/jira/browse/JBMESSAGING-410, http://jira.jboss.org/jira/browse/JBMESSAGING-520
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-09-19 04:08:23 UTC (rev 1316)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-09-19 04:12:49 UTC (rev 1317)
@@ -44,6 +44,7 @@
  * This aspect is PER_VM
  *
  * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.com>Ovidiu Feodorov</a>
  *
  * $Id$
  */
@@ -79,24 +80,25 @@
    public Object handlePreDeliver(Invocation invocation) throws Throwable
    { 
       MethodInvocation mi = (MethodInvocation)invocation;
-      
       SessionState state = getState(invocation);
       
       int ackMode = state.getAcknowledgeMode();
       
-      if (ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == Session.AUTO_ACKNOWLEDGE ||
-          ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+      if (ackMode == Session.CLIENT_ACKNOWLEDGE ||
+          ackMode == Session.AUTO_ACKNOWLEDGE ||
+          ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
+          state.getCurrentTxId() == null)
       {
+         // We collect acknowledgments (and not transact them) for CLIENT, AUTO and DUPS_OK, and
+         // also for XA sessions not enrolled in a global transaction.
+
          SessionDelegate del = (SessionDelegate)mi.getTargetObject();
          
-         //We store the ack in a list for later acknowledgement or recovery
+         // We store the ack in a list for later acknowledgement or recovery
     
          Object[] args = mi.getArguments();
-         
          MessageProxy mp = (MessageProxy)args[0];
-         
          int consumerID = ((Integer)args[1]).intValue();
-         
          AckInfo info = new AckInfo(mp, consumerID);
          
          state.getToAck().add(info);
@@ -110,9 +112,7 @@
    public Object handleAcknowledgeAll(Invocation invocation) throws Throwable
    {    
       MethodInvocation mi = (MethodInvocation)invocation;
-      
       SessionState state = getState(invocation);
-      
       SessionDelegate del = (SessionDelegate)mi.getTargetObject();
     
       if (!state.getToAck().isEmpty())
@@ -128,42 +128,34 @@
    public Object handlePostDeliver(Invocation invocation) throws Throwable
    { 
       MethodInvocation mi = (MethodInvocation)invocation;
-      
       SessionState state = getState(invocation);
       
       int ackMode = state.getAcknowledgeMode();
       
-      if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+      if (ackMode == Session.AUTO_ACKNOWLEDGE ||
+          ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
+          ackMode != Session.CLIENT_ACKNOWLEDGE && state.getCurrentTxId() == null)
       {
-         SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-         
-         // We acknowledge immediately
-         
+         // We acknowledge immediately on a non-transacted session that does not want to
+         // CLIENT_ACKNOWLEDGE, or an XA session not enrolled in a global transaction.
+
+         SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
+
          if (!state.isRecoverCalled())
          {
-            //We don't acknowledge the message if recover() was called
-            
-            //Object[] args = mi.getArguments();
-            
-            //MessageProxy proxy = (MessageProxy)args[0];
-                   
-            //int consumerID = ((Integer)args[1]).intValue();
+            if (trace) { log.trace("acknowledging NON-transactionally"); }
 
-            //AckInfo ack = new AckInfo(proxy, consumerID);
-            
             List acks = state.getToAck();
-            
             AckInfo ack = (AckInfo)acks.get(0);
-            
-            del.acknowledge(ack);   
-               
-            state.getToAck().clear();            
+            sd.acknowledge(ack);
+            state.getToAck().clear();
          }
          else
          {
+            if (trace) { log.trace("recover called, so NOT acknowledging"); }
+
             state.setRecoverCalled(false);
          }
-         if (trace) { log.trace("ack mode is " + Util.acknowledgmentModeToString(ackMode)+ ", acknowledged on " + del); }
       }
 
       return null;
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java	2006-09-19 04:08:23 UTC (rev 1316)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java	2006-09-19 04:12:49 UTC (rev 1317)
@@ -37,6 +37,7 @@
 import org.jboss.jms.tx.LocalTx;
 import org.jboss.jms.tx.TxState;
 import org.jboss.jms.tx.ResourceManager;
+import org.jboss.logging.Logger;
 
 /**
  * This aspect handles transaction related logic
@@ -44,6 +45,7 @@
  * This aspect is PER_VM.
  * 
  * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.com>Ovidiu Feodorov</a>
  *
  * $Id$
  */
@@ -51,8 +53,12 @@
 {
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(TransactionAspect.class);
+
    // Attributes ----------------------------------------------------
 
+   private boolean trace = log.isTraceEnabled();
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -151,62 +157,50 @@
 
    public Object handleSend(Invocation invocation) throws Throwable
    {
-      SessionState sessionState = (SessionState)getState(invocation);
+      SessionState state = (SessionState)getState(invocation);
+      Object txID = state.getCurrentTxId();
 
-      if (sessionState.isTransacted())
+      if (txID != null)
       {
-         // Session is transacted - so we add message to tx instead of sending now
+         // the session is non-XA and transacted, or XA and enrolled in a global transaction, so
+         // we add the message to a transaction instead of sending it now. An XA session that has
+         // not been enrolled in a global transaction behaves as a non-transacted session.
 
-         Object txID = sessionState.getCurrentTxId();
-
-         if (txID == null)
-         {
-            throw new IllegalStateException("Attempt to send message in tx, but txId is null, XA?" + sessionState.isXA());
-         }
-
-         ConnectionState connState = (ConnectionState)sessionState.getParent();
-
+         ConnectionState connState = (ConnectionState)state.getParent();
          MethodInvocation mi = (MethodInvocation)invocation;
-
          Message m = (Message)mi.getArguments()[0];
 
+         if (trace) { log.trace("sending message " + m + " transactionally, queueing on resource manager"); }
+
          connState.getResourceManager().addMessage(txID, m);
 
          // ... and we don't invoke any further interceptors in the stack
          return null;
       }
-      else
-      {
-         return invocation.invokeNext();
-      }
+
+      if (trace) { log.trace("sending message NON-transactionally"); }
+
+      return invocation.invokeNext();
    }
 
    public Object handlePreDeliver(Invocation invocation) throws Throwable
    {
       SessionState state = (SessionState)getState(invocation);
+      Object txID = state.getCurrentTxId();
 
-      if (state.isTransacted())
+      if (txID != null)
       {
-         MethodInvocation mi = (MethodInvocation)invocation;
+         // the session is non-XA and transacted, or XA and enrolled in a global transaction. An
+         // XA session that has not been enrolled in a global transaction behaves as a
+         // non-transacted session.
 
+         MethodInvocation mi = (MethodInvocation)invocation;
          MessageProxy proxy = (MessageProxy)mi.getArguments()[0];
-
-         //long messageID = proxy.getMessage().getMessageID();
-
          int consumerID = ((Integer)mi.getArguments()[1]).intValue();
-
          AckInfo info = new AckInfo(proxy, consumerID);
-
-         Object txID = state.getCurrentTxId();
-
-         if (txID == null)
-         {
-            throw new IllegalStateException("Attempt to send message in tx, but txId is null, XA?" + state.isXA());
-         }
-
          ConnectionState connState = (ConnectionState)state.getParent();
 
-         //Add the acknowledgement to the transaction
+         if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
 
          connState.getResourceManager().addAck(txID, info);
       }
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java	2006-09-19 04:08:23 UTC (rev 1316)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java	2006-09-19 04:12:49 UTC (rev 1317)
@@ -73,25 +73,27 @@
       children = new HashSet();
       this.acknowledgeMode = ackMode;
       this.transacted = transacted;
-      this.xa = xa;      
+      this.xa = xa;
+
       if (xa)
       {
          // Create an XA resource
          xaResource = new MessagingXAResource(parent.getResourceManager(), this);                            
       }
-      if (transacted)
+
+      // If session is transacted and XA, the currentTxId will be updated when the XAResource will
+      // be enrolled with a global transaction.
+
+      if (transacted & !xa)
       {
          // Create a local tx
          currentTxId = parent.getResourceManager().createLocalTx();        
       }
+
       executor = new QueuedExecutor(new LinkedQueue());
       
-      if (ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == Session.AUTO_ACKNOWLEDGE ||
-          ackMode == Session.DUPS_OK_ACKNOWLEDGE)
-      {
-         toAck = new ArrayList();
-      }
-      
+      toAck = new ArrayList();
+
       // TODO could optimise this to use the same map of callbackmanagers (which holds refs
       // to callbackhandlers) in the connection, instead of maintaining another map
       callbackHandlers = new HashMap();
Modified: branches/Branch_1_0/src/main/org/jboss/jms/tx/MessagingXAResource.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/tx/MessagingXAResource.java	2006-09-19 04:08:23 UTC (rev 1316)
+++ branches/Branch_1_0/src/main/org/jboss/jms/tx/MessagingXAResource.java	2006-09-19 04:12:49 UTC (rev 1317)
@@ -114,6 +114,11 @@
       if (trace) { log.trace(this + " committing " + xid + (onePhase ? " (one phase)" : " (two phase)")); }
       
       rm.commit(xid, onePhase, connection);
+
+      // leave the session in a 'clean' state, the currentTxId will be set when the XAResource will
+      // be enrolled with a new transaction.
+
+      setCurrentTransactionId(null);
    }
 
    public void end(Xid xid, int flags) throws XAException
@@ -162,8 +167,8 @@
    {
       if (trace) { log.trace(this + " rolling back " + xid); }
 
-      //TODO Hmmm on rollback should we also stop and start the consumers to remove any transient messages
-      //Like we do on local session rollback??
+      // TODO Hmmm on rollback should we also stop and start the consumers to remove any transient
+      // messages, like we do on local session rollback??
       
       rm.rollback(xid, connection);
    }
@@ -190,10 +195,11 @@
             case TMNOFLAGS :
                if (convertTx)
                {    
-                  //If I commit/rollback the tx, then there is a short period of time between the AS (or whoever)
-                  //calling commit on the tx and calling start to enrolling the session in a new tx.
-                  //If the session has any listeners then in that period, messages can be received asychronously
-                  //but we want them to be received in the context of a tx, so we convert.
+                  // If I commit/rollback the tx, then there is a short period of time between the
+                  // AS (or whoever) calling commit on the tx and calling start to enrolling the
+                  // session in a new tx. If the session has any listeners then in that period,
+                  // messages can be received asychronously but we want them to be received in the
+                  // context of a tx, so we convert.
                   setCurrentTransactionId(rm.convertTx((LocalTx)sessionState.getCurrentTxId(), xid));
                }
                else
@@ -237,11 +243,6 @@
    
    private void setCurrentTransactionId(final Xid xid)
    {
-      if (xid == null)
-      {
-         throw new NullPointerException("null xid");
-      }
-
       if (trace) { log.trace(this + " setting current xid to " + xid + ",  previous " + sessionState.getCurrentTxId()); }
 
       sessionState.setCurrentTxId(xid);
    
    
More information about the jboss-cvs-commits
mailing list