[Jboss-cvs] JBoss Messaging SVN: r1229 - in branches/Branch_1_0: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 22 17:07:13 EDT 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-08-22 17:07:07 -0400 (Tue, 22 Aug 2006)
New Revision: 1229

Modified:
   branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java
   branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java
   branches/Branch_1_0/src/main/org/jboss/jms/tx/ResourceManager.java
   branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java
   branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
Log:
minor reformatting

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2006-08-22 21:07:07 UTC (rev 1229)
@@ -101,18 +101,17 @@
    
    public Object handleClosing(Invocation invocation) throws Throwable
    {      
-      //First we make sure closing is called on the ServerConsumerEndpoint
-      //This ensures that any in transit messages are flushed out to the client side
+      // First we make sure closing is called on the ServerConsumerEndpoint. This ensures that any
+      // in-transit messages are flushed out to the client side.
+
       Object res = invocation.invokeNext();
       
       ConsumerState consumerState = getState(invocation);
-      
       SessionState sessionState = (SessionState)consumerState.getParent();
-      
       ConnectionState connectionState = (ConnectionState)sessionState.getParent();
             
-      //Then we call close on the messagecallbackhandler which waits for onMessage invocations
-      //to complete and then cancels anything in the client buffer
+      // Then we call close on the messagecallbackhandler which waits for onMessage invocations
+      // to complete and then cancels anything in the client buffer.
       consumerState.getMessageCallbackHandler().close();
       
       sessionState.removeCallbackHandler(consumerState.getMessageCallbackHandler());

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-08-22 21:07:07 UTC (rev 1229)
@@ -201,29 +201,33 @@
       return null;  
    }
    
-   /*
+   /**
     * Redelivery occurs in two situations:
+    *
     * 1) When session.recover() is called (JMS1.1 4.4.11)
+    *
     * "A session's recover method is used to stop a session and restart it with its first
-    * unacknowledged message. In effect, the session's series of delivered messages
-    * is reset to the point after its last acknowledged message."
-    * An important note here is that session recovery is LOCAL to the session.
-    * Session recovery DOES NOT result in delivered messages being cancelled back
-    * to the channel where they can be redelivered - since that may result in them being
-    * picked up by another session, which would break the semantics of recovery as described
-    * in the spec.
-    * 2) When session rollback occurs (JMS1.1 4.4.7)
-    * On rollback of a session the spec is clear that session recovery occurs:
-    * "If a transaction rollback is done, its produced messages
-    * are destroyed and its consumed messages are automatically recovered. For
-    * more information on session recovery, see Section 4.4.11 'Message
-    * Acknowledgment.'"
-    * So on rollback we do session recovery (local redelivery) in the same as if 
-    * session.recover() was called.
+    * unacknowledged message. In effect, the session's series of delivered messages is reset to the
+    * point after its last acknowledged message."
+    *
+    * An important note here is that session recovery is LOCAL to the session. Session recovery DOES
+    * NOT result in delivered messages being cancelled back to the channel where they can be
+    * redelivered - since that may result in them being picked up by another session, which would
+    * break the semantics of recovery as described in the spec.
+    *
+    * 2) When session rollback occurs (JMS1.1 4.4.7). On rollback of a session the spec is clear
+    * that session recovery occurs:
+    *
+    * "If a transaction rollback is done, its produced messages are destroyed and its consumed
+    * messages are automatically recovered. For more information on session recovery, see Section
+    * 4.4.11 'Message Acknowledgment.'"
+    *
+    * So on rollback we do session recovery (local redelivery) in the same as if session.recover()
+    * was called.
     * 
-    * There is a conflict here though. It seems a CTS test requires messages to be available to OTHER
-    * sessions on rollback - see CTSMiscellaneousTest.testContestedQueueOnRollback()
-    * Which seems in direct contradiction to the spec.
+    * There is a conflict here though. It seems a CTS test requires messages to be available to
+    * OTHER sessions on rollback - see CTSMiscellaneousTest.testContestedQueueOnRollback(), which
+    * seems in direct contradiction to the spec.
     * 
     * In order to satisfy the test, on session recovery, if there are no local consumers available
     * to consume the message, we cancel the message back to the channel.
@@ -233,39 +237,32 @@
       if (trace) { log.trace("redeliver called"); }
       
       MethodInvocation mi = (MethodInvocation)invocation;
-            
       SessionState state = getState(invocation);
             
-      //We put the messages back in the front of their appropriate consumer buffers and
-      //set JMSRedelivered to true
+      // We put the messages back in the front of their appropriate consumer buffers and set
+      // JMSRedelivered to true.
       
       List toRedeliver = (List)mi.getArguments()[0];
-       
       LinkedList toCancel = new LinkedList();
       
-      //Need to be recovered in reverse order
+      // Need to be recovered in reverse order.
       for (int i = toRedeliver.size() - 1; i >= 0; i--)
       {
          AckInfo info = (AckInfo)toRedeliver.get(i);
-         
          MessageProxy proxy = info.getMessage();
-         
          proxy.setJMSRedelivered(true);
          
-         //TODO delivery count although optional should be global
-         //so we need to send it back to the server
-         //but this has performance hit so perhaps we just don't support it?
+         //TODO delivery count although optional should be global so we need to send it back to the
+         //     server but this has performance hit so perhaps we just don't support it?
          proxy.incDeliveryCount();
          
          MessageCallbackHandler handler = state.getCallbackHandler(info.getConsumerID());
               
          if (handler == null)
          {
-            // This is ok.
-
-            // The original consumer has closed, this message wil get cancelled back to the channel.
-            
-            toCancel.addFirst(info);            
+            // This is ok. The original consumer has closed, this message wil get cancelled back
+            // to the channel.
+            toCancel.addFirst(info);
          }
          else
          {
@@ -275,10 +272,9 @@
       
       if (!toCancel.isEmpty())
       {
-         //Cancel the messages that can't be redelivered locally
+         // Cancel the messages that can't be redelivered locally
          
          SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-         
          del.cancelDeliveries(toCancel);
       }
             

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java	2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java	2006-08-22 21:07:07 UTC (rev 1229)
@@ -36,6 +36,7 @@
 import org.jboss.jms.tx.AckInfo;
 import org.jboss.jms.tx.LocalTx;
 import org.jboss.jms.tx.TxState;
+import org.jboss.jms.tx.ResourceManager;
 
 /**
  * This aspect handles transaction related logic
@@ -49,183 +50,183 @@
 public class TransactionAspect
 {
    // Constants -----------------------------------------------------
-   
+
    // Attributes ----------------------------------------------------
-       
+
    // Static --------------------------------------------------------
-   
+
    // Constructors --------------------------------------------------
-   
+
    // Public --------------------------------------------------------
-   
+
    public Object handleClose(Invocation invocation) throws Throwable
    {
       Object res = invocation.invokeNext();
-      
+
       SessionState state = (SessionState)getState(invocation);
-      
+
       ConnectionState connState = (ConnectionState)state.getParent();
-            
+
       Object xid = state.getCurrentTxId();
-      
+
       if (xid != null)
       {
          //Remove transaction from the resource manager
          connState.getResourceManager().removeTx(xid);
-      }           
-      
+      }
+
       return res;
    }
-   
+
    public Object handleCommit(Invocation invocation) throws Throwable
    {
       SessionState state = (SessionState)getState(invocation);
-      
+
       if (!state.isTransacted())
       {
          throw new IllegalStateException("Cannot commit a non-transacted session");
       }
-      
+
       if (state.isXA())
       {
          throw new TransactionInProgressException("Cannot call commit on an XA session");
       }
-      
+
       ConnectionState connState = (ConnectionState)state.getParent();
       ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
-      
+
       try
-      {            
+      {
          connState.getResourceManager().commitLocal((LocalTx)state.getCurrentTxId(), conn);
       }
       finally
       {
          //Start new local tx
          Object xid = connState.getResourceManager().createLocalTx();
-         
+
          state.setCurrentTxId(xid);
       }
-      
+
       return null;
    }
-   
+
    public Object handleRollback(Invocation invocation) throws Throwable
    {
       SessionState state = (SessionState)getState(invocation);
-           
+
       if (!state.isTransacted())
       {
          throw new IllegalStateException("Cannot rollback a non-transacted session");
       }
-      
+
       if (state.isXA())
       {
          throw new TransactionInProgressException("Cannot call rollback on an XA session");
       }
-      
+
       ConnectionState connState = (ConnectionState)state.getParent();
+      ResourceManager rm = connState.getResourceManager();
       ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
-      
-      TxState tx = connState.getResourceManager().getTx(state.getCurrentTxId());
-      
+
+      TxState tx = rm.getTx(state.getCurrentTxId());
+
       if (tx == null)
       {
          throw new IllegalStateException("Cannot find tx:" + state.getCurrentTxId());
       }
-        
+
       try
       {
-         connState.getResourceManager().rollbackLocal((LocalTx)state.getCurrentTxId(), conn);
+         rm.rollbackLocal((LocalTx)state.getCurrentTxId(), conn);
       }
       finally
       {
-         //Start new local tx
-         Object xid = connState.getResourceManager().createLocalTx();
-         
+         // start new local tx
+         Object xid = rm.createLocalTx();
          state.setCurrentTxId(xid);
-      }                 
-           
-      return null;            
+      }
+
+      return null;
    }
-   
+
    public Object handleSend(Invocation invocation) throws Throwable
    {
       SessionState sessionState = (SessionState)getState(invocation);
-                        
+
       if (sessionState.isTransacted())
       {
          //Session is transacted - so we add message to tx instead of sending now
-         
+
          Object txID = sessionState.getCurrentTxId();
-         
+
          if (txID == null)
-         {            
+         {
             throw new IllegalStateException("Attempt to send message in tx, but txId is null, XA?" + sessionState.isXA());
          }
-         
+
          ConnectionState connState = (ConnectionState)sessionState.getParent();
-         
+
          MethodInvocation mi = (MethodInvocation)invocation;
-         
-         Message m = (Message)mi.getArguments()[0];         
-         
+
+         Message m = (Message)mi.getArguments()[0];
+
          connState.getResourceManager().addMessage(txID, m);
-         
+
          // ... and we don't invoke any further interceptors in the stack
-         return null;               
+         return null;
       }
       else
-      {      
+      {
          return invocation.invokeNext();
       }
    }
-   
+
    public Object handlePreDeliver(Invocation invocation) throws Throwable
    {
       SessionState state = (SessionState)getState(invocation);
-       
+
       if (state.isTransacted())
       {
          MethodInvocation mi = (MethodInvocation)invocation;
-         
+
          MessageProxy proxy = (MessageProxy)mi.getArguments()[0];
-         
+
          //long messageID = proxy.getMessage().getMessageID();
-         
+
          int consumerID = ((Integer)mi.getArguments()[1]).intValue();
-         
+
          AckInfo info = new AckInfo(proxy, consumerID);
-         
+
          Object txID = state.getCurrentTxId();
-         
+
          if (txID == null)
          {
             throw new IllegalStateException("Attempt to send message in tx, but txId is null, XA?" + state.isXA());
          }
-         
+
          ConnectionState connState = (ConnectionState)state.getParent();
-         
+
          //Add the acknowledgement to the transaction
 
-         connState.getResourceManager().addAck(txID, info);                  
+         connState.getResourceManager().addAck(txID, info);
       }
 
       return null;
    }
 
    // Protected ------------------------------------------------------
-   
+
    // Package Private ------------------------------------------------
-   
+
    // Private --------------------------------------------------------
-   
+
    private HierarchicalState getState(Invocation inv)
    {
       return ((DelegateSupport)inv.getTargetObject()).getState();
    }
-   
+
    // Inner Classes --------------------------------------------------
-   
+
 }
 
 

Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-08-22 21:07:07 UTC (rev 1229)
@@ -311,29 +311,25 @@
          
       waitForOnMessageToComplete();
       
-      //Now we cancel anything left in the buffer
-      //The reason we do this now is that otherwise the deliveries wouldn't get cancelled
-      //until session close (since we don't cancel consumer's deliveries until then)
-      //which is too late - since we need to preserve the order of messages delivered in a session.
+      // Now we cancel anything left in the buffer. The reason we do this now is that otherwise the
+      // deliveries wouldn't get cancelled until session close (since we don't cancel consumer's
+      // deliveries until then), which is too late - since we need to preserve the order of messages
+      // delivered in a session.
       
       if (!buffer.isEmpty())
       {            
-         //Now we cancel any deliveries that might be waiting in our buffer
-         //This is because, otherwise the messages wouldn't get cancelled until
-         //the corresponding session died.
-         //So if another consumer in another session tried to consume from the channel
-         //before that session died it wouldn't receive those messages
-         Iterator iter = buffer.iterator();
-         
+         // Now we cancel any deliveries that might be waiting in our buffer. This is because
+         // otherwise the messages wouldn't get cancelled until the corresponding session died.
+         // So if another consumer in another session tried to consume from the channel before that
+         // session died it wouldn't receive those messages.
+
          List ackInfos = new ArrayList();
-         while (iter.hasNext())
-         {                        
-            MessageProxy mp = (MessageProxy)iter.next();
-            
+
+         for(Iterator i = buffer.iterator(); i.hasNext();)
+         {
+            MessageProxy mp = (MessageProxy)i.next();
             AckInfo ack = new AckInfo(mp, consumerID);
-            
             ackInfos.add(ack);
-            
          }
                
          sessionDelegate.cancelDeliveries(ackInfos);
@@ -346,7 +342,7 @@
    
    private void waitForOnMessageToComplete()
    {
-      //Wait for any on message executions to complete
+      // Wait for any on message executions to complete
       
       Future result = new Future();
       

Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-08-22 21:07:07 UTC (rev 1229)
@@ -457,7 +457,7 @@
    {
       try
       {
-         //Deliveries must be cancelled in reverse order
+         // deliveries must be cancelled in reverse order
           
          Set consumers = new HashSet();
          
@@ -465,8 +465,9 @@
          {
             AckInfo ack = (AckInfo)ackInfos.get(i);
             
-            //We look in the global map since the message might have come from connection consumer
-            ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ack.getConsumerID());
+            // We look in the global map since the message might have come from connection consumer
+            ServerConsumerEndpoint consumer =
+               this.connectionEndpoint.getConsumerEndpoint(ack.getConsumerID());
    
             if (consumer == null)
             {
@@ -474,18 +475,14 @@
             }
             
             consumer.cancelDelivery(new Long(ack.getMessageID()));
-            
             consumers.add(consumer);
          }
          
-         //Need to prompt delivery for all consumers
+         // need to prompt delivery for all consumers
          
-         Iterator iter = consumers.iterator();
-         
-         while (iter.hasNext())
+         for(Iterator i = consumers.iterator(); i.hasNext(); )
          {
-            ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)iter.next();
-            
+            ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)i.next();
             consumer.promptDelivery();
          }
       }

Modified: branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java	2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java	2006-08-22 21:07:07 UTC (rev 1229)
@@ -48,7 +48,7 @@
    
    protected int consumerID;
    
-   //The actual proxy must not get serialized
+   // The actual proxy must not get serialized
    protected transient MessageProxy msg;
    
    // Static --------------------------------------------------------

Modified: branches/Branch_1_0/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/tx/ResourceManager.java	2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/tx/ResourceManager.java	2006-08-22 21:07:07 UTC (rev 1229)
@@ -166,19 +166,19 @@
    {
       if (trace) { log.trace("rolling back local xid " + xid); }
       
-      TxState tx = removeTx(xid);
+      TxState ts = removeTx(xid);
       
-      if (tx == null)
+      if (ts == null)
       {      
          throw new IllegalStateException("Cannot find transaction with xid:" + xid);         
       }
       
-      //Don't need messages for rollback
-      tx.clearMessages();
+      // don't need messages for rollback
+      ts.clearMessages();
       
-      //For one phase rollback there is nothing to do on the server
+      // for one phase rollback there is nothing to do on the server
       
-      redeliverMessages(tx);
+      redeliverMessages(ts);
    }
    
    public void commit(Xid xid, boolean onePhase, ConnectionDelegate connection) throws XAException
@@ -275,59 +275,45 @@
    
    /*
     * Rollback has occurred so we need to redeliver any unacked messages corresponding to the acks
-    * is in the transaction
+    * is in the transaction.
     */
-   private void redeliverMessages(TxState tx) throws JMSException
+   private void redeliverMessages(TxState ts) throws JMSException
    {
-      Iterator iter = tx.getAcks().iterator();
-      
-      //Sort them into lists - one for each session
-        
-      //We use a LinkedHashMap since we need to preserve the order of the sessions
+      // Sort messages into lists, one for each session. We use a LinkedHashMap since we need to
+      // preserve the order of the sessions.
+
       Map toAck = new LinkedHashMap();
-      
-      while (iter.hasNext())
+
+      for(Iterator i = ts.getAcks().iterator(); i.hasNext(); )
       {
-         AckInfo ack = (AckInfo)iter.next();
-         
+         AckInfo ack = (AckInfo)i.next();
          SessionDelegate del = ack.msg.getSessionDelegate();
          
          List acks = (List)toAck.get(del);
-         
          if (acks == null)
          {
             acks = new ArrayList();
-            
             toAck.put(del, acks);
          }
-         
          acks.add(ack);
       }
       
-      //Now tell each session to redeliver
+      // Now tell each session to redeliver.
       
       LinkedList l = new LinkedList();
       
-      iter = toAck.entrySet().iterator();
-                  
-      //need to reverse the order
-      while (iter.hasNext())
+      for(Iterator i = toAck.entrySet().iterator(); i.hasNext();)
       {
-         Object entry = iter.next();
-         
-         l.addFirst(entry);         
+         // need to reverse the order
+         Object entry = i.next();
+         l.addFirst(entry);
       }
       
-      iter = l.iterator();
-      
-      while (iter.hasNext())
+      for(Iterator i = l.iterator(); i.hasNext();)
       {
-         Map.Entry entry = (Map.Entry)iter.next();
-         
+         Map.Entry entry = (Map.Entry)i.next();
          SessionDelegate sess = (SessionDelegate)entry.getKey();
-         
          List acks = (List)entry.getValue();
-         
          sess.redeliver(acks);
       }  
    }

Modified: branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-08-22 21:07:07 UTC (rev 1229)
@@ -939,49 +939,33 @@
 
       if (!removed)
       {
-         // This is ok
-         // This can happen if the message is cancelled before the result of
-         // ServerConsumerDelegate.handle
-         // has returned, in which case we won't have a record of the delivery
-         // in the Set
+         // This is OK. This can happen if the message is cancelled before the result of
+         // ServerConsumerDelegate.handle has returned, in which case we won't have a record of the
+         // delivery in the Set. In this case we don't want to add the message reference back into
+         // the state since it was never removed in the first place.
 
-         // In this case we don't want to add the message reference back into
-         // the state
-         // since it was never removed in the first place
-
-         if (trace)
-         {
-            log.trace(this + " can't find delivery " + del
-                     + " in state so not replacing messsage ref");
-         }
-
+         if (trace) { log.trace(this + " can't find delivery " + del + " in state so not replacing messsage ref"); }
       }
       else
       {
+         MessageReference ref;
          synchronized (refLock)
          {
-            messageRefs.addFirst(del.getReference(), del.getReference()
-                     .getPriority());
+            ref = del.getReference();
+            messageRefs.addFirst(ref, ref.getPriority());
 
             if (paging)
             {
-               // if paging we need to evict the end reference to storage to
-               // preserve the number of refs in the queue
+               // If paging we need to evict the end reference to storage to preserve the number of
+               // references in the queue.
 
-               MessageReference ref = (MessageReference) messageRefs
-                        .removeLast();
-
-               addToDownCache(ref);
-
+               MessageReference last = (MessageReference)messageRefs.removeLast();
+               addToDownCache(last);
                refsInStorage++;
             }
          }
 
-         if (trace)
-         {
-            log.trace(this + " added " + del.getReference()
-                     + " back into state");
-         }
+         if (trace) { log.trace(this + " added " + ref + " back into state"); }
       }
    }
 

Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java	2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java	2006-08-22 21:07:07 UTC (rev 1229)
@@ -506,7 +506,9 @@
    }
 
    /**
-    * Make sure redelivered flag is set on redelivery via rollback, different setup
+    * Make sure redelivered flag is set on redelivery via rollback, different setup: we close the
+    * rolled back session and we receive the message whose acknowledgment was cancelled on a new
+    * session.
     */
    public void testRedeliveredQueue2() throws Exception
    {
@@ -516,16 +518,17 @@
       {
          conn = cf.createConnection();
 
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer prod = sess.createProducer(queue);
-         prod.send(sess.createTextMessage("a message"));
+         Session sendSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
+         MessageProducer prod = sendSession.createProducer(queue);
+         prod.send(sendSession.createTextMessage("a message"));
+
          log.debug("Message was sent to the queue");
 
          conn.close();
 
          conn = cf.createConnection();
-         sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+         Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
 
          MessageConsumer cons = sess.createConsumer(queue);
 
@@ -539,16 +542,70 @@
          sess.rollback();
          sess.close();
 
-         sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+         Session sess2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-         cons = sess.createConsumer(queue);
+         cons = sess2.createConsumer(queue);
 
          tm = (TextMessage)cons.receive();
 
          assertEquals("a message", tm.getText());
          assertTrue(tm.getJMSRedelivered());
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
 
-         sess.commit();
+   /**
+    * Make sure redelivered flag is set on redelivery via rollback, different setup: we don't close
+    * the rolled back session and we receive the message whose acknowledgment was cancelled on a new
+    * session.
+    *
+    * TODO: Is this test semantically correct.
+    */
+   public void testRedeliveredQueue3() throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         conn = cf.createConnection();
+
+         Session sendSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sendSession.createProducer(queue);
+         prod.send(sendSession.createTextMessage("a message"));
+
+         log.debug("Message was sent to the queue");
+
+         conn.close();
+
+         conn = cf.createConnection();
+         Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageConsumer cons = sess.createConsumer(queue);
+
+         conn.start();
+
+         TextMessage tm = (TextMessage)cons.receive();
+
+         assertEquals("a message", tm.getText());
+         assertFalse(tm.getJMSRedelivered());
+
+         sess.rollback();
+
+         Session sess2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         cons = sess2.createConsumer(queue);
+
+         tm = (TextMessage)cons.receive(3000);
+
+         assertEquals("a message", tm.getText());
+         assertTrue(tm.getJMSRedelivered());
       }
       finally
       {
@@ -559,6 +616,7 @@
       }
    }
 
+
    public void testReceivedRollbackQueue() throws Exception
    {
       Connection conn = cf.createConnection();




More information about the jboss-cvs-commits mailing list