[jboss-cvs] JBoss Messaging SVN: r5495 - in trunk: tests/src/org/jboss/messaging/tests/integration/chunkmessage and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 9 18:36:04 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-12-09 18:36:04 -0500 (Tue, 09 Dec 2008)
New Revision: 5495

Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
Log:
Fixing ACK on PreCommit & LargeMessages

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-09 21:39:37 UTC (rev 5494)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-09 23:36:04 UTC (rev 5495)
@@ -41,9 +41,9 @@
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
@@ -156,7 +156,7 @@
 
       messageQueue.addConsumer(this);
 
-      this.minLargeMessageSize = session.getMinLargeMessageSize();
+      minLargeMessageSize = session.getMinLargeMessageSize();
    }
 
    // ServerConsumer implementation
@@ -548,7 +548,6 @@
             }
          }
 
-
          return HandleStatus.HANDLED;
       }
       finally
@@ -598,15 +597,18 @@
     *  This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent*/
    private class LargeMessageSender
    {
-      private long sizePendingLargeMessage;
+      private final long sizePendingLargeMessage;
 
       /** The current message being processed */
-      private ServerLargeMessage pendingLargeMessage;
+      private final ServerLargeMessage pendingLargeMessage;
 
       private final MessageReference ref;
 
       private boolean sentFirstMessage = false;
 
+      // To avoid ACK to be called twice, as sendLargeMessage could be waiting another call because of flowControl
+      private boolean acked = false;
+
       /** The current position on the message being processed */
       private long positionPendingLargeMessage;
 
@@ -614,7 +616,7 @@
 
       public LargeMessageSender(final ServerLargeMessage message, final MessageReference ref)
       {
-         pendingLargeMessage = (ServerLargeMessage)message;
+         pendingLargeMessage = message;
 
          sizePendingLargeMessage = pendingLargeMessage.getBodySize();
 
@@ -643,7 +645,7 @@
                sentFirstMessage = true;
 
                MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
-               
+
                pendingLargeMessage.encodeProperties(headerBuffer);
 
                SessionReceiveMessage initialMessage = new SessionReceiveMessage(id,
@@ -702,10 +704,9 @@
 
             pendingLargeMessage.releaseResources();
 
-            ServerConsumerImpl.this.largeMessageSender = null;
-            
-            if (preAcknowledge)
+            if (preAcknowledge && !acked)
             {
+               acked = true;
                try
                {
                   doAck(ref);
@@ -716,6 +717,8 @@
                }
             }
 
+            largeMessageSender = null;
+
             return true;
          }
          finally
@@ -732,11 +735,14 @@
 
          localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
 
-         MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
+         MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(localChunkLen));
 
          pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
 
-         chunk = new SessionReceiveContinuationMessage(id, bodyBuffer.array(), positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false); 
+         chunk = new SessionReceiveContinuationMessage(id,
+                                                       bodyBuffer.array(),
+                                                       positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage,
+                                                       false);
 
          return chunk;
       }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-12-09 21:39:37 UTC (rev 5494)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-12-09 23:36:04 UTC (rev 5495)
@@ -66,6 +66,8 @@
 
    // Constants -----------------------------------------------------
 
+   final static int RECEIVE_WAIT_TIME = 10000;
+   
    // Attributes ----------------------------------------------------
 
    static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
@@ -206,33 +208,33 @@
 
    public void testMessageChunkFilePersistence() throws Exception
    {
-      testChunks(true, false, false, 100, 262144, false, 1000, 0);
+      testChunks(true, false, false, 100, 262144, false, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testMessageChunkFilePersistenceBlocked() throws Exception
    {
-      testChunks(true, false, false, 100, 262144, true, 1000, 0);
+      testChunks(true, false, false, 100, 262144, true, RECEIVE_WAIT_TIME, 0);
    }
 
 
    public void testMessageChunkFilePersistenceBlockedPreCommit() throws Exception
    {
-      testChunks(true, false, true, 100, 262144, true, 1000, 0);
+      testChunks(true, false, true, 100, 262144, true, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testMessageChunkFilePersistenceDelayed() throws Exception
    {
-      testChunks(true, false, false, 1, 50000, false, 1000, 2000);
+      testChunks(true, false, false, 1, 50000, false, RECEIVE_WAIT_TIME, 2000);
    }
 
    public void testMessageChunkNullPersistence() throws Exception
    {
-      testChunks(false, false, false, 1, 50000, false, 1000, 0);
+      testChunks(false, false, false, 1, 50000, false, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testMessageChunkNullPersistenceDelayed() throws Exception
    {
-      testChunks(false, false, false, 100, 50000, false, 10000, 100);
+      testChunks(false, false, false, 100, 50000, false, RECEIVE_WAIT_TIME, 100);
    }
 
    public void testPageOnLargeMessage() throws Exception
@@ -249,44 +251,44 @@
 
    public void testSendfileMessage() throws Exception
    {
-      testChunks(true, true, false, 100, 50000, false, 1000, 0);
+      testChunks(true, true, false, 100, 50000, false, RECEIVE_WAIT_TIME, 0);
 
    }
 
    public void testSendfileMessageOnNullPersistence() throws Exception
    {
-      testChunks(false, true, false, 100, 50000, false, 1000, 0);
+      testChunks(false, true, false, 100, 50000, false, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
    {
-      testChunks(false, true, false, 100, 100, true, 1000, 0);
+      testChunks(false, true, false, 100, 100, true, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testSendfileMessageSmallMessage() throws Exception
    {
-      testChunks(true, true, false, 100, 4, false, 1000, 0);
+      testChunks(true, true, false, 100, 4, false, RECEIVE_WAIT_TIME, 0);
 
    }
 
    public void testSendRegularMessageNullPersistence() throws Exception
    {
-      testChunks(false, false, false, 100, 100, false, 1000, 0);
+      testChunks(false, false, false, 100, 100, false, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testSendRegularMessageNullPersistenceDelayed() throws Exception
    {
-      testChunks(false, false, false, 100, 100, false, 1000, 1000);
+      testChunks(false, false, false, 100, 100, false, RECEIVE_WAIT_TIME, 1000);
    }
 
    public void testSendRegularMessagePersistence() throws Exception
    {
-      testChunks(true, false, false, 100, 100, false, 1000, 0);
+      testChunks(true, false, false, 100, 100, false, RECEIVE_WAIT_TIME, 0);
    }
 
    public void testSendRegularMessagePersistenceDelayed() throws Exception
    {
-      testChunks(true, false, false, 100, 100, false, 1000, 1000);
+      testChunks(true, false, false, 100, 100, false, RECEIVE_WAIT_TIME, 1000);
    }
 
    public void testTwoBindingsTwoStartedConsumers() throws Exception
@@ -330,7 +332,7 @@
 
          
          ClientConsumer consumer = session.createFileConsumer(new File(getClientLargeMessagesDir()), queue[1]);
-         ClientMessage msg = consumer.receive(5000);
+         ClientMessage msg = consumer.receive(RECEIVE_WAIT_TIME);
          assertNull(consumer.receive(1000)); 
          assertNotNull(msg);
          
@@ -346,7 +348,7 @@
          session.start();
          
 
-         msg = consumer1.receive(5000);
+         msg = consumer1.receive(RECEIVE_WAIT_TIME);
          assertNotNull(msg);
          msg.acknowledge();
          consumer1.close();
@@ -564,7 +566,7 @@
 
          for (int i = 0; i < 100; i++)
          {
-            ClientMessage message2 = consumer.receive(10000);
+            ClientMessage message2 = consumer.receive(RECEIVE_WAIT_TIME);
 
             assertNotNull(message2);
 




More information about the jboss-cvs-commits mailing list