[jboss-cvs] JBoss Messaging SVN: r3930 - branches/Branch_Stable/src/main/org/jboss/jms/client/container.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Mar 25 11:58:18 EDT 2008


Author: timfox
Date: 2008-03-25 11:58:18 -0400 (Tue, 25 Mar 2008)
New Revision: 3930

Modified:
   branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1253


Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java	2008-03-25 15:46:12 UTC (rev 3929)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java	2008-03-25 15:58:18 UTC (rev 3930)
@@ -227,71 +227,75 @@
       MethodInvocation mi = (MethodInvocation)invocation;
       SessionState state = getState(invocation);
       
-      int ackMode = state.getAcknowledgeMode();
+      synchronized (state)
+      {
       
-      Object[] args = mi.getArguments();
-      DeliveryInfo info = (DeliveryInfo)args[0];
-      
-      if (ackMode == Session.CLIENT_ACKNOWLEDGE)
-      {
-         // We collect acknowledgments in the list
+         int ackMode = state.getAcknowledgeMode();
          
-         if (trace) { log.trace(this + " added to CLIENT_ACKNOWLEDGE list delivery " + info); }
+         Object[] args = mi.getArguments();
+         DeliveryInfo info = (DeliveryInfo)args[0];
          
-         // Sanity check
-         if (info.getConnectionConsumerSession() != null)
+         if (ackMode == Session.CLIENT_ACKNOWLEDGE)
          {
-            throw new IllegalStateException(
-               "CLIENT_ACKNOWLEDGE cannot be used with a connection consumer");
+            // We collect acknowledgments in the list
+            
+            if (trace) { log.trace(this + " added to CLIENT_ACKNOWLEDGE list delivery " + info); }
+            
+            // Sanity check
+            if (info.getConnectionConsumerSession() != null)
+            {
+               throw new IllegalStateException(
+                  "CLIENT_ACKNOWLEDGE cannot be used with a connection consumer");
+            }
+                     
+            state.getClientAckList().add(info);
          }
-                  
-         state.getClientAckList().add(info);
-      }
-      // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
-      // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
-      else if (ackMode == Session.AUTO_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
-      {
-         // We collect the single acknowledgement in the state. 
-                           
-         if (trace) { log.trace(this + " added " + info + " to session state"); }
-         
-         state.setAutoAckInfo(info);         
-      }
-      else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
-      {
-         if (trace) { log.trace(this + " added to DUPS_OK_ACKNOWLEDGE list delivery " + info); }
-         
-         state.getClientAckList().add(info);
-         
-         //Also set here - this would be used for recovery in a message listener
-         state.setAutoAckInfo(info);
-      }
-      else
-      {             
-         Object txID = state.getCurrentTxId();
-   
-         if (txID != null)
+         // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
+         // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
+         else if (ackMode == Session.AUTO_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
          {
-            // the session is non-XA and transacted, or XA and enrolled in a global transaction. An
-            // XA session that has not been enrolled in a global transaction behaves as a
-            // transacted session.
+            // We collect the single acknowledgement in the state. 
+                              
+            if (trace) { log.trace(this + " added " + info + " to session state"); }
             
-            ConnectionState connState = (ConnectionState)state.getParent();
-   
-            if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
-   
-            // If the ack is for a delivery that came through via a connection consumer then we use
-            // the connectionConsumer session as the session id, otherwise we use this sessions'
-            // session ID
+            state.setAutoAckInfo(info);         
+         }
+         else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+         {
+            if (trace) { log.trace(this + " added to DUPS_OK_ACKNOWLEDGE list delivery " + info); }
             
-            ClientSessionDelegate connectionConsumerDelegate =
-               (ClientSessionDelegate)info.getConnectionConsumerSession();
+            state.getClientAckList().add(info);
             
-            String sessionId = connectionConsumerDelegate != null ?
-               connectionConsumerDelegate.getID() : state.getSessionID();
-            
-            connState.getResourceManager().addAck(txID, sessionId, info);
-         }        
+            //Also set here - this would be used for recovery in a message listener
+            state.setAutoAckInfo(info);
+         }
+         else
+         {             
+            Object txID = state.getCurrentTxId();
+      
+            if (txID != null)
+            {
+               // the session is non-XA and transacted, or XA and enrolled in a global transaction. An
+               // XA session that has not been enrolled in a global transaction behaves as a
+               // transacted session.
+               
+               ConnectionState connState = (ConnectionState)state.getParent();
+      
+               if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
+      
+               // If the ack is for a delivery that came through via a connection consumer then we use
+               // the connectionConsumer session as the session id, otherwise we use this sessions'
+               // session ID
+               
+               ClientSessionDelegate connectionConsumerDelegate =
+                  (ClientSessionDelegate)info.getConnectionConsumerSession();
+               
+               String sessionId = connectionConsumerDelegate != null ?
+                  connectionConsumerDelegate.getID() : state.getSessionID();
+               
+               connState.getResourceManager().addAck(txID, sessionId, info);
+            }        
+         }
       }
       
       return null;
@@ -302,85 +306,89 @@
       MethodInvocation mi = (MethodInvocation)invocation;
       SessionState state = getState(invocation);
       
-      int ackMode = state.getAcknowledgeMode();
-      
-      SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
-      
-      boolean res = true;
-
-      // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
-      // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
-      if (ackMode == Session.AUTO_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
+      synchronized (state)
       {
-         // It is possible that session.recover() is called inside a message listener onMessage
-         // method - i.e. between the invocations of preDeliver and postDeliver. In this case we
-         // don't want to acknowledge the last delivered messages - since it will be redelivered.
-         if (!state.isRecoverCalled())
-         {
-            DeliveryInfo delivery = state.getAutoAckInfo();
-            
-            if (delivery == null)
-            {
-               throw new IllegalStateException("Cannot find delivery to AUTO_ACKNOWLEDGE");
-            }
-                                 
-            if (trace) { log.trace(this + " auto acknowledging delivery " + delivery); }
-              
-            // We clear the state in a finally so then we don't get a knock on
-            // exception on the next ack since we haven't cleared the state. See
-            // http://jira.jboss.org/jira/browse/JBMESSAGING-852
-
-            //This is ok since the message is acked after delivery, then the client
-            //could get duplicates anyway
-            
-            try
-            {
-               res = ackDelivery(sd, delivery);
-            }
-            finally
-            {
-               state.setAutoAckInfo(null);               
-            }
-         }         
-         else
-         {
-            if (trace) { log.trace(this + " recover called, so NOT acknowledging"); }
-
-            state.setRecoverCalled(false);
-         }
-      }
-      else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
-      {
-         List acks = state.getClientAckList();
          
-         if (!state.isRecoverCalled())
+         int ackMode = state.getAcknowledgeMode();
+         
+         SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
+         
+         boolean res = true;
+   
+         // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
+         // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
+         if (ackMode == Session.AUTO_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
          {
-            if (acks.size() >= state.getDupsOKBatchSize())
+            // It is possible that session.recover() is called inside a message listener onMessage
+            // method - i.e. between the invocations of preDeliver and postDeliver. In this case we
+            // don't want to acknowledge the last delivered messages - since it will be redelivered.
+            if (!state.isRecoverCalled())
             {
-               // We clear the state in a finally
+               DeliveryInfo delivery = state.getAutoAckInfo();
+               
+               if (delivery == null)
+               {
+                  throw new IllegalStateException("Cannot find delivery to AUTO_ACKNOWLEDGE");
+               }
+                                    
+               if (trace) { log.trace(this + " auto acknowledging delivery " + delivery); }
+                 
+               // We clear the state in a finally so then we don't get a knock on
+               // exception on the next ack since we haven't cleared the state. See
                // http://jira.jboss.org/jira/browse/JBMESSAGING-852
-         
+   
+               //This is ok since the message is acked after delivery, then the client
+               //could get duplicates anyway
+               
                try
                {
-                  acknowledgeDeliveries(sd, acks);
+                  res = ackDelivery(sd, delivery);
                }
                finally
-               {                  
-                  acks.clear();
-                  state.setAutoAckInfo(null);
+               {
+                  state.setAutoAckInfo(null);               
                }
-            }    
+            }         
+            else
+            {
+               if (trace) { log.trace(this + " recover called, so NOT acknowledging"); }
+   
+               state.setRecoverCalled(false);
+            }
          }
-         else
+         else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
          {
-            if (trace) { log.trace(this + " recover called, so NOT acknowledging"); }
-
-            state.setRecoverCalled(false);
+            List acks = state.getClientAckList();
+            
+            if (!state.isRecoverCalled())
+            {
+               if (acks.size() >= state.getDupsOKBatchSize())
+               {
+                  // We clear the state in a finally
+                  // http://jira.jboss.org/jira/browse/JBMESSAGING-852
+            
+                  try
+                  {
+                     acknowledgeDeliveries(sd, acks);
+                  }
+                  finally
+                  {                  
+                     acks.clear();
+                     state.setAutoAckInfo(null);
+                  }
+               }    
+            }
+            else
+            {
+               if (trace) { log.trace(this + " recover called, so NOT acknowledging"); }
+   
+               state.setRecoverCalled(false);
+            }
+            state.setAutoAckInfo(null);                  
          }
-         state.setAutoAckInfo(null);                  
+   
+         return Boolean.valueOf(res);
       }
-
-      return Boolean.valueOf(res);
    }
    
    /**
@@ -390,18 +398,22 @@
    {    
       MethodInvocation mi = (MethodInvocation)invocation;
       SessionState state = getState(invocation);
-      SessionDelegate del = (SessionDelegate)mi.getTargetObject();            
-    
-      if (!state.getClientAckList().isEmpty())
-      {                 
-         //CLIENT_ACKNOWLEDGE can't be used with a MDB so it is safe to always acknowledge all
-         //on this session (rather than the connection consumer session)
-         acknowledgeDeliveries(del, state.getClientAckList());
       
-         state.getClientAckList().clear();
-      }      
-        
-      return null;
+      synchronized (state)
+      {      
+         SessionDelegate del = (SessionDelegate)mi.getTargetObject();            
+       
+         if (!state.getClientAckList().isEmpty())
+         {                 
+            //CLIENT_ACKNOWLEDGE can't be used with a MDB so it is safe to always acknowledge all
+            //on this session (rather than the connection consumer session)
+            acknowledgeDeliveries(del, state.getClientAckList());
+         
+            state.getClientAckList().clear();
+         }      
+           
+         return null;
+      }
    }
                        
    /*
@@ -415,50 +427,54 @@
             
       SessionState state = getState(invocation);
       
-      if (state.isTransacted() && !isXAAndConsideredNonTransacted(state))
+      synchronized (state)
       {
-         throw new IllegalStateException("Cannot recover a transacted session");
-      }
-      
-      if (trace) { log.trace("recovering the session"); }
-       
-      //Call redeliver
-      SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-      
-      int ackMode = state.getAcknowledgeMode();
-      
-      if (ackMode == Session.CLIENT_ACKNOWLEDGE)
-      {
-         List dels = state.getClientAckList();
          
-         state.setClientAckList(new ArrayList());
+         if (state.isTransacted() && !isXAAndConsideredNonTransacted(state))
+         {
+            throw new IllegalStateException("Cannot recover a transacted session");
+         }
          
-         del.redeliver(dels);
-
-         state.setRecoverCalled(true);
-      }
-      else if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
-      {
-         DeliveryInfo info = state.getAutoAckInfo();
+         if (trace) { log.trace("recovering the session"); }
+          
+         //Call redeliver
+         SessionDelegate del = (SessionDelegate)mi.getTargetObject();
          
-         //Don't recover if it's already to cancel
+         int ackMode = state.getAcknowledgeMode();
          
-         if (info != null)
+         if (ackMode == Session.CLIENT_ACKNOWLEDGE)
          {
-            List redels = new ArrayList();
+            List dels = state.getClientAckList();
             
-            redels.add(info);
+            state.setClientAckList(new ArrayList());
             
-            del.redeliver(redels);
-            
-            state.setAutoAckInfo(null);            
-
+            del.redeliver(dels);
+   
             state.setRecoverCalled(true);
          }
-      }   
-        
-
-      return null;  
+         else if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
+         {
+            DeliveryInfo info = state.getAutoAckInfo();
+            
+            //Don't recover if it's already to cancel
+            
+            if (info != null)
+            {
+               List redels = new ArrayList();
+               
+               redels.add(info);
+               
+               del.redeliver(redels);
+               
+               state.setAutoAckInfo(null);            
+   
+               state.setRecoverCalled(true);
+            }
+         }   
+           
+   
+         return null;  
+      }
    }
    
    /**
@@ -538,65 +554,71 @@
    public Object handleCommit(Invocation invocation) throws Throwable
    {
       SessionState state = getState(invocation);
-
-      if (!state.isTransacted())
-      {
-         throw new IllegalStateException("Cannot commit a non-transacted session");
+      
+      synchronized (state)
+      {   
+         if (!state.isTransacted())
+         {
+            throw new IllegalStateException("Cannot commit a non-transacted session");
+         }
+   
+         if (state.isXA())
+         {
+            throw new TransactionInProgressException("Cannot call commit on an XA session");
+         }
+   
+         ConnectionState connState = (ConnectionState)state.getParent();
+         ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
+     
+         try
+         {
+            connState.getResourceManager().commitLocal((LocalTx)state.getCurrentTxId(), conn);
+         }
+         finally
+         {
+            //Start new local tx
+            Object xid = connState.getResourceManager().createLocalTx();
+   
+            state.setCurrentTxId(xid);
+         }
+         
+         //TODO on commit we don't want to ACK any messages that have exceeded the max delivery count OR
+   
+         return null;
       }
-
-      if (state.isXA())
-      {
-         throw new TransactionInProgressException("Cannot call commit on an XA session");
-      }
-
-      ConnectionState connState = (ConnectionState)state.getParent();
-      ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
-  
-      try
-      {
-         connState.getResourceManager().commitLocal((LocalTx)state.getCurrentTxId(), conn);
-      }
-      finally
-      {
-         //Start new local tx
-         Object xid = connState.getResourceManager().createLocalTx();
-
-         state.setCurrentTxId(xid);
-      }
-      
-      //TODO on commit we don't want to ACK any messages that have exceeded the max delivery count OR
-
-      return null;
    }
 
    public Object handleRollback(Invocation invocation) throws Throwable
    {
       SessionState state = getState(invocation);
-
-      if (!state.isTransacted())
-      {
-         throw new IllegalStateException("Cannot rollback a non-transacted session");
-      }
-
-      if (state.isXA())
-      {
-         throw new TransactionInProgressException("Cannot call rollback on an XA session");
-      }
       
-      ConnectionState connState = (ConnectionState)state.getParent();
-      ResourceManager rm = connState.getResourceManager();
-      try
+      synchronized (state)
       {
-         rm.rollbackLocal((LocalTx)state.getCurrentTxId());
+         if (!state.isTransacted())
+         {
+            throw new IllegalStateException("Cannot rollback a non-transacted session");
+         }
+   
+         if (state.isXA())
+         {
+            throw new TransactionInProgressException("Cannot call rollback on an XA session");
+         }
+         
+         ConnectionState connState = (ConnectionState)state.getParent();
+         ResourceManager rm = connState.getResourceManager();
+         try
+         {
+            rm.rollbackLocal((LocalTx)state.getCurrentTxId());
+         }
+         finally
+         {
+            // start new local tx
+            Object xid = rm.createLocalTx();
+            state.setCurrentTxId(xid);
+         }
+   
+         return null;
       }
-      finally
-      {
-         // start new local tx
-         Object xid = rm.createLocalTx();
-         state.setCurrentTxId(xid);
-      }
-
-      return null;
    }
    
    public Object handleSend(Invocation invocation) throws Throwable




More information about the jboss-cvs-commits mailing list