[jboss-cvs] JBoss Messaging SVN: r5898 - in trunk: src/main/org/jboss/messaging/core/client/impl and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Feb 18 23:08:33 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-02-18 23:08:32 -0500 (Wed, 18 Feb 2009)
New Revision: 5898

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/consumer/TransactionDurabilityTest.java
   trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
JBMESSAGING-1339 and JBMESSAGING-1294 - Fix on Delivery Counter + implement rollback(lastMessageAsDelivered)

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -161,6 +161,12 @@
 
    void rollback() throws MessagingException;
 
+   /**
+    * @param isLastMessageAsDelived the first message on deliveringMessage Buffer is considered as delivered
+    * @throws MessagingException
+    */
+   void rollback(boolean isLastMessageAsDelived) throws MessagingException;
+
    void close() throws MessagingException;
 
    boolean isClosed();

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -51,6 +51,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -534,6 +535,11 @@
 
    public void rollback() throws MessagingException
    {
+      rollback(false);
+   }
+
+   public void rollback(final boolean isLastMessageAsDelived) throws MessagingException
+   {
       checkClosed();
 
       flushAcks();
@@ -556,8 +562,8 @@
          consumer.clear();
       }
 
-      channel.sendBlocking(new PacketImpl(PacketImpl.SESS_ROLLBACK));
-
+      channel.sendBlocking(new RollbackMessage(isLastMessageAsDelived));
+                           
       if (wasStarted)
       {
          start();

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -100,6 +100,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReplicateCreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -701,7 +702,7 @@
          }
          case SESS_ROLLBACK:
          {
-            packet = new PacketImpl(PacketImpl.SESS_ROLLBACK);
+            packet = new RollbackMessage(); 
             break;
          }
          case SESS_QUEUEQUERY:

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+
+/**
+ * A RollbackMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Feb 18, 2009 2:11:17 PM
+ *
+ *
+ */
+public class RollbackMessage extends PacketImpl
+{
+
+   /**
+    * @param type
+    */
+   public RollbackMessage()
+   {
+      super(SESS_ROLLBACK);
+   }
+
+   public RollbackMessage(final boolean isLastMessageAsDelived)
+   {
+      super(SESS_ROLLBACK);
+      this.isLastMessageAsDelived = isLastMessageAsDelived;
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private boolean isLastMessageAsDelived;
+
+   /**
+    * @return the isLastMessageAsDelived
+    */
+   public boolean isLastMessageAsDelived()
+   {
+      return isLastMessageAsDelived;
+   }
+
+   /**
+    * @param isLastMessageAsDelived the isLastMessageAsDelived to set
+    */
+   public void setLastMessageAsDelived(final boolean isLastMessageAsDelived)
+   {
+      this.isLastMessageAsDelived = isLastMessageAsDelived;
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl#getRequiredBufferSize()
+    */
+   @Override
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_BOOLEAN;
+   }
+
+   @Override
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putBoolean(isLastMessageAsDelived);
+   }
+
+   @Override
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      this.isLastMessageAsDelived = buffer.getBoolean();
+   }
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessageReference.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/MessageReference.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -56,6 +56,8 @@
    void setDeliveryCount(int deliveryCount);
 
    void incrementDeliveryCount();
+   
+   void decrementDeliveryCount();
 
    Queue getQueue();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -42,7 +42,7 @@
 	
 	void close() throws Exception;
 
-	List<MessageReference> cancelRefs() throws Exception;
+	List<MessageReference> cancelRefs(boolean lastConsumedAsDelivered, Transaction tx) throws Exception;
 	
 	void setStarted(boolean started);
 	

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -25,6 +25,7 @@
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -81,7 +82,7 @@
 
    void handleExpired(final SessionExpiredMessage packet);
 
-   void handleRollback(Packet packet);
+   void handleRollback(RollbackMessage packet);
 
    void handleCommit(Packet packet);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -103,6 +103,11 @@
    {
       deliveryCount++;
    }
+   
+   public void decrementDeliveryCount()
+   {
+      deliveryCount--;
+   }
 
    public long getScheduledDeliveryTime()
    {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -120,7 +120,7 @@
     * if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
     */
    private final boolean browseOnly;
-   
+
    private final boolean updateDeliveries;
 
    private final StorageManager storageManager;
@@ -184,7 +184,7 @@
       binding.getQueue().addConsumer(this);
 
       minLargeMessageSize = session.getMinLargeMessageSize();
-      
+
       this.updateDeliveries = updateDeliveries;
    }
 
@@ -276,7 +276,7 @@
 
       session.removeConsumer(this);
 
-      LinkedList<MessageReference> refs = cancelRefs();
+      LinkedList<MessageReference> refs = cancelRefs(false, null);
 
       Iterator<MessageReference> iter = refs.iterator();
 
@@ -300,11 +300,10 @@
          props.putStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
 
          props.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
-         
+
          props.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
 
-         props.putStringProperty(ManagementHelper.HDR_FILTERSTRING,
-                                 filter == null ? null : filter.getFilterString());
+         props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null ? null : filter.getFilterString());
 
          props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
 
@@ -314,15 +313,27 @@
       }
    }
 
-   public LinkedList<MessageReference> cancelRefs() throws Exception
+   public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
    {
+
+      boolean performACK = lastConsumedAsDelivered;
+
       LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
 
       if (!deliveringRefs.isEmpty())
       {
          for (MessageReference ref : deliveringRefs)
          {
-            refs.add(ref);
+            if (performACK)
+            {
+               acknowledge(false, tx, ref.getMessage().getMessageID());
+               performACK = false;
+            }
+            else
+            {
+               ref.decrementDeliveryCount();
+               refs.add(ref);
+            }
          }
 
          deliveringRefs.clear();
@@ -586,7 +597,7 @@
       }
 
       lock.lock();
-      
+
       try
       {
 
@@ -629,18 +640,32 @@
             }
 
             ref.getQueue().referenceHandled();
-         }
 
-         if (preAcknowledge)
-         {
-            if (message.isLargeMessage())
+         
+            ref.incrementDeliveryCount();
+
+            // If updateDeliveries = false (set by strict-update),
+            // the updateDeliveryCount would still be updated after cancel
+            if (updateDeliveries)
             {
-               // we must hold one reference, or the file will be deleted before it could be delivered
-               message.incrementRefCount();
+               if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+               {
+                  storageManager.updateDeliveryCount(ref);
+               }
             }
 
-            // With pre-ack, we ack *before* sending to the client
-            ref.getQueue().acknowledge(ref);
+            if (preAcknowledge)
+            {
+               if (message.isLargeMessage())
+               {
+                  // we must hold one reference, or the file will be deleted before it could be delivered
+                  message.incrementRefCount();
+               }
+
+               // With pre-ack, we ack *before* sending to the client
+               ref.getQueue().acknowledge(ref);
+            }
+
          }
 
          if (message.isLargeMessage())
@@ -652,18 +677,6 @@
             deliverStandardMessage(ref, message);
          }
 
-         ref.incrementDeliveryCount();
-         
-         // If updateDeliveries = false (set by strict-update),
-         // the updateDeliveryCount would still be updated after cancel
-         if (updateDeliveries)
-         {
-            if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
-            {
-               storageManager.updateDeliveryCount(ref);
-            }
-         }
-         
          return HandleStatus.HANDLED;
       }
       finally
@@ -724,7 +737,7 @@
          availableCredits.addAndGet(-message.getEncodeSize());
       }
 
-      final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
+      final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount());
 
       DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID()));
 
@@ -824,7 +837,7 @@
 
                pendingLargeMessage.encodeProperties(headerBuffer);
 
-               initialMessage = new SessionReceiveMessage(id, headerBuffer.array(), ref.getDeliveryCount() + 1);
+               initialMessage = new SessionReceiveMessage(id, headerBuffer.array(), ref.getDeliveryCount());
             }
 
             int precalculateAvailableCredits;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -50,6 +50,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -305,7 +306,7 @@
       {
          // We only rollback local txs on close, not XA tx branches
 
-         rollback();
+         rollback(false);
       }
 
       Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
@@ -556,7 +557,7 @@
       }
    }
 
-   public void handleRollback(final Packet packet)
+   public void handleRollback(final RollbackMessage packet)
    {
       DelayedResult result = channel.replicatePacket(packet);
 
@@ -1806,13 +1807,13 @@
       channel.send(response);
    }
 
-   private void doHandleRollback(final Packet packet)
+   private void doHandleRollback(final RollbackMessage packet)
    {
       Packet response = null;
 
       try
       {
-         rollback();
+         rollback(packet.isLastMessageAsDelived());
 
          response = new NullResponseMessage();
       }
@@ -2134,7 +2135,7 @@
                }
                else
                {
-                  doRollback(theTx);
+                  doRollback(false, theTx);
 
                   response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
                }
@@ -2702,7 +2703,7 @@
       return largeMessage;
    }
 
-   private void doRollback(final Transaction theTx) throws Exception
+   private void doRollback(final boolean lastMessageAsDelived, final Transaction theTx) throws Exception
    {
       boolean wasStarted = started;
 
@@ -2715,7 +2716,7 @@
             consumer.setStarted(false);
          }
 
-         toCancel.addAll(consumer.cancelRefs());
+         toCancel.addAll(consumer.cancelRefs(lastMessageAsDelived, theTx));
       }
 
       for (MessageReference ref : toCancel)
@@ -2734,7 +2735,7 @@
       }
    }
 
-   private void rollback() throws Exception
+   private void rollback(boolean lastMessageAsDelived) throws Exception
    {
       if (tx == null)
       {
@@ -2743,7 +2744,7 @@
          tx = new TransactionImpl(storageManager);
       }
 
-      doRollback(tx);
+      doRollback(lastMessageAsDelived, tx);
 
       tx = new TransactionImpl(storageManager);
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -49,6 +49,7 @@
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -159,7 +160,7 @@
             }
             case SESS_ROLLBACK:
             {
-               session.handleRollback(packet);
+               session.handleRollback((RollbackMessage)packet);
                break;
             }
             case SESS_XA_COMMIT:

Modified: trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -106,7 +106,7 @@
          {            
             try
             {                              
-               session.getCoreSession().rollback();
+               session.getCoreSession().rollback(true);
                
                session.setRecoverCalled(true);
             }

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -30,8 +30,6 @@
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 
-import org.jboss.messaging.jms.client.JBossTextMessage;
-
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a> <p/> $Id: AcknowledgementTest.java 3173 2007-10-05 12:48:16Z
  *         timfox $

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -518,14 +518,11 @@
          assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
 
          log.info("rolling back");
-         conn.stop();
          sess.rollback();
          
          log.info("closing");
          sess.close();
          log.info("Closed");
-         
-         conn.start();
 
          Session sess2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -164,26 +164,26 @@
          tm = (TextMessage)c.receive(1000);
          
          assertEquals("message2", tm.getText());
-	      assertTrue(tm.getJMSRedelivered());
-         assertEquals(2, tm.getIntProperty("JMSXDeliveryCount"));
+	      assertFalse(tm.getJMSRedelivered());
+         assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
          
          tm = (TextMessage)c.receive(1000);
          
          assertEquals("message3", tm.getText());
-	      assertTrue(tm.getJMSRedelivered());
-         assertEquals(2, tm.getIntProperty("JMSXDeliveryCount"));
+	      assertFalse(tm.getJMSRedelivered());
+         assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
          
          tm = (TextMessage)c.receive(1000);
          
          assertEquals("message4", tm.getText());
-	      assertTrue(tm.getJMSRedelivered());
-         assertEquals(2, tm.getIntProperty("JMSXDeliveryCount"));
+	      assertFalse(tm.getJMSRedelivered());
+         assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
          
          tm = (TextMessage)c.receive(1000);
          
          assertEquals("message5", tm.getText());
-	      assertTrue(tm.getJMSRedelivered());
-         assertEquals(2, tm.getIntProperty("JMSXDeliveryCount"));
+	      assertFalse(tm.getJMSRedelivered());
+         assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
          
 	      tm.acknowledge();
       }
@@ -633,7 +633,7 @@
          
          assertEquals(tm.getText(), rm.getText());
          
-         assertEquals(5, rm.getIntProperty("JMSXDeliveryCount"));
+         assertEquals(4, rm.getIntProperty("JMSXDeliveryCount"));
          
          assertTrue(rm.getJMSRedelivered());
          

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -78,7 +78,7 @@
    public void testSimpleConsumerBrowser() throws Exception
    {
       ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-      
+
       sf.setBlockOnNonPersistentSend(true);
 
       ClientSession session = sf.createSession(false, true, true);
@@ -284,12 +284,22 @@
 
    }
 
-   public void testConsumerBrowserMessagesArentAcked() throws Exception
+   public void testConsumerBrowserMessages() throws Exception
    {
+      testConsumerBrowserMessagesArentAcked(false);
+   }
+   
+   public void testConsumerBrowserMessagesPreACK() throws Exception
+   {
+      testConsumerBrowserMessagesArentAcked(false);
+   }
+   
 
+   private void testConsumerBrowserMessagesArentAcked(boolean preACK) throws Exception
+   {
       ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
 
-      ClientSession session = sf.createSession(false, true, true);
+      ClientSession session = sf.createSession(null, null, false, true, true, preACK, 0);
 
       session.createQueue(QUEUE, QUEUE, null, false, false);
 
@@ -312,8 +322,10 @@
          assertEquals("m" + i, message2.getBody().getString());
       }
       // assert that all the messages are there and none have been acked
-      assertEquals(((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount(), 0);
-      assertEquals(((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount(), 100);
+      assertEquals(0,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      assertEquals(100,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
 
       session.close();
    }
@@ -347,8 +359,10 @@
          assertEquals("m" + i, message2.getBody().getString());
       }
       // assert that all the messages are there and none have been acked
-      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
-      assertEquals(100, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+      assertEquals(0,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      assertEquals(100,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
 
       session.close();
    }
@@ -441,8 +455,10 @@
          assertEquals("m" + i, message2.getBody().getString());
       }
       // assert that all the messages are there and none have been acked
-      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
-      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+      assertEquals(0,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      assertEquals(0,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
 
       session.close();
    }
@@ -475,8 +491,10 @@
          assertEquals("m" + i, message2.getBody().getString());
       }
       // assert that all the messages are there and none have been acked
-      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
-      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+      assertEquals(0,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      assertEquals(0,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
 
       session.close();
    }
@@ -513,8 +531,10 @@
          }
       }
       // assert that all the messages are there and none have been acked
-      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
-      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+      assertEquals(0,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      assertEquals(0,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
 
       session.close();
    }
@@ -551,13 +571,17 @@
          }
       }
       // assert that all the messages are there and none have been acked
-      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
-      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+      assertEquals(0,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      assertEquals(0,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
 
       session.close();
 
-      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
-      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+      assertEquals(0,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+      assertEquals(0,
+                   ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
    }
 
    private ClientMessage createMessage(final ClientSession session, final String msg)

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -29,7 +29,6 @@
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.util.SimpleString;
@@ -72,45 +71,153 @@
       testDedeliveryMessageOnPersistent(false);
    }
 
-   protected void testDedeliveryMessageOnPersistent(boolean strictUpdate) throws Exception
+   public void testDeliveryNonPersistent() throws Exception
    {
-      setUp(strictUpdate);
-      ClientSession session = factory.createSession(false, true, false);
+      testDelivery(false);
+   }
+
+   public void testDeliveryPersistent() throws Exception
+   {
+      testDelivery(true);
+   }
+
+   public void testDelivery(final boolean persistent) throws Exception
+   {
+      setUp(true);
+      ClientSession session = factory.createSession(false, false, false);
       ClientProducer prod = session.createProducer(ADDRESS);
-      prod.send(createTextMessage(session, "Hello"));
+
+      for (int i = 0; i < 10; i++)
+      {
+         prod.send(createTextMessage(session, Integer.toString(i), persistent));
+      }
+
       session.commit();
       session.close();
+
       
-      messagingService.stop();
-      messagingService.start();
+      session = factory.createSession(null, null, false, true, true, true, 0);
       
-      session = factory.createSession(false, true, false);
       session.start();
+      for (int loopAck = 0; loopAck < 5; loopAck++)
+      {
+         ClientConsumer browser = session.createConsumer(ADDRESS, null, true);
+         for (int i = 0; i < 10; i++)
+         {
+            ClientMessage msg = browser.receive(1000);
+            assertNotNull("element i=" + i + " loopAck = " + loopAck + " was expected", msg);
+            msg.acknowledge();
+            assertEquals(Integer.toString(i), getTextMessage(msg));
+   
+            // We don't change the deliveryCounter on Browser, so this should be always 0
+            assertEquals(0, msg.getDeliveryCount());
+         }
+         
+         session.commit();
+         browser.close();
+      }
+      
+      session.close();
+      
+      
+      
+      session = factory.createSession(false, false, false);
+      session.start();
+
       ClientConsumer consumer = session.createConsumer(ADDRESS);
-      
+
+      for (int loopAck = 0; loopAck < 5; loopAck++)
+      {
+         for (int i = 0; i < 10; i++)
+         {
+            ClientMessage msg = consumer.receive(1000);
+            assertNotNull(msg);
+            assertEquals(Integer.toString(i), getTextMessage(msg));
+
+            // No ACK done, so deliveryCount should be always = 1
+            assertEquals(1, msg.getDeliveryCount());
+         }
+         session.rollback();
+      }
+
+      if (persistent)
+      {
+         session.close();
+         messagingService.stop();
+         messagingService.start();
+         session = factory.createSession(false, false, false);
+         session.start();
+         consumer = session.createConsumer(ADDRESS);
+      }
+
+      for (int loopAck = 1; loopAck <= 5; loopAck++)
+      {
+         for (int i = 0; i < 10; i++)
+         {
+            ClientMessage msg = consumer.receive(1000);
+            assertNotNull(msg);
+            msg.acknowledge();
+            assertEquals(Integer.toString(i), getTextMessage(msg));
+            assertEquals(loopAck, msg.getDeliveryCount());
+         }
+         if (loopAck < 5)
+         {
+            if (persistent)
+            {
+               session.close();
+               messagingService.stop();
+               messagingService.start();
+               session = factory.createSession(false, false, false);
+               session.start();
+               consumer = session.createConsumer(ADDRESS);
+            }
+            else
+            {
+               session.rollback();
+            }
+         }
+      }
+
+      session.close();
+   }
+
+   protected void testDedeliveryMessageOnPersistent(final boolean strictUpdate) throws Exception
+   {
+      setUp(strictUpdate);
+      ClientSession session = factory.createSession(false, false, false);
+      ClientProducer prod = session.createProducer(ADDRESS);
+      prod.send(createTextMessage(session, "Hello"));
+      session.commit();
+      session.close();
+
+      session = factory.createSession(false, false, false);
+      session.start();
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
       ClientMessage msg = consumer.receive(1000);
       assertEquals(1, msg.getDeliveryCount());
-      assertNotNull(msg);
       session.stop();
-      
-      // if strictUpdate == true, this will simulating a crash, where the server is stopped without closing/rolling back the session
+
+      // if strictUpdate == true, this will simulate a crash, where the server is stopped without closing/rolling back
+      // the session
       if (!strictUpdate)
       {
          // If non Strict, at least rollback/cancel should still update the delivery-counts
-         session.rollback();
+         session.rollback(true);
          session.close();
       }
-      
+
       messagingService.stop();
-      
+
       messagingService.start();
-      
+
       session = factory.createSession(false, true, false);
       session.start();
       consumer = session.createConsumer(ADDRESS);
       msg = consumer.receive(1000);
       assertNotNull(msg);
       assertEquals(2, msg.getDeliveryCount());
+      session.close();
    }
 
    // Package protected ---------------------------------------------
@@ -128,7 +235,7 @@
     * @throws Exception
     * @throws MessagingException
     */
-   private void setUp(boolean strictUpdateDelivery) throws Exception, MessagingException
+   private void setUp(final boolean strictUpdateDelivery) throws Exception, MessagingException
    {
       Configuration config = createFileConfig();
       config.setJournalFileSize(10 * 1024);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/consumer/TransactionDurabilityTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/consumer/TransactionDurabilityTest.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/consumer/TransactionDurabilityTest.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -73,7 +73,7 @@
       
       final SimpleString queue2 = new SimpleString("queue2");
                    
-      MessagingService messagingService = Messaging.newMessagingService(conf);
+      MessagingService messagingService = createService(true, conf); 
       
       messagingService.start();
 
@@ -139,7 +139,7 @@
       
       consumer2 = session2.createConsumer(queue2);
       
-      m1 = consumer1.receive(1000);
+      m1 = consumer1.receive(100);
       
       assertNull(m1);
       
@@ -171,11 +171,11 @@
       
       consumer2 = session2.createConsumer(queue2);
       
-      m1 = consumer1.receive(1000);
+      m1 = consumer1.receive(100);
       
       assertNull(m1);
       
-      m2 = consumer2.receive(1000);
+      m2 = consumer2.receive(100);
       
       assertNull(m2);
       

Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-02-18 17:31:32 UTC (rev 5897)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-02-19 04:08:32 UTC (rev 5898)
@@ -244,6 +244,12 @@
    {
       return createTextMessage(session, s, true);
    }
+   
+   public String getTextMessage(ClientMessage m)
+   {
+      m.getBody().rewind();
+      return m.getBody().getString();
+   }
 
    protected ClientMessage createTextMessage(final ClientSession session, final String s, final boolean durable)
    {




More information about the jboss-cvs-commits mailing list