[jboss-cvs] JBoss Messaging SVN: r5495 - in trunk: tests/src/org/jboss/messaging/tests/integration/chunkmessage and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 9 18:36:04 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-12-09 18:36:04 -0500 (Tue, 09 Dec 2008)
New Revision: 5495
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
Log:
Fixing ACK on PreCommit & LargeMessages
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-09 21:39:37 UTC (rev 5494)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-09 23:36:04 UTC (rev 5495)
@@ -41,9 +41,9 @@
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
@@ -156,7 +156,7 @@
messageQueue.addConsumer(this);
- this.minLargeMessageSize = session.getMinLargeMessageSize();
+ minLargeMessageSize = session.getMinLargeMessageSize();
}
// ServerConsumer implementation
@@ -548,7 +548,6 @@
}
}
-
return HandleStatus.HANDLED;
}
finally
@@ -598,15 +597,18 @@
* This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent*/
private class LargeMessageSender
{
- private long sizePendingLargeMessage;
+ private final long sizePendingLargeMessage;
/** The current message being processed */
- private ServerLargeMessage pendingLargeMessage;
+ private final ServerLargeMessage pendingLargeMessage;
private final MessageReference ref;
private boolean sentFirstMessage = false;
+ // To avoid ACK to be called twice, as sendLargeMessage could be waiting another call because of flowControl
+ private boolean acked = false;
+
/** The current position on the message being processed */
private long positionPendingLargeMessage;
@@ -614,7 +616,7 @@
public LargeMessageSender(final ServerLargeMessage message, final MessageReference ref)
{
- pendingLargeMessage = (ServerLargeMessage)message;
+ pendingLargeMessage = message;
sizePendingLargeMessage = pendingLargeMessage.getBodySize();
@@ -643,7 +645,7 @@
sentFirstMessage = true;
MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
-
+
pendingLargeMessage.encodeProperties(headerBuffer);
SessionReceiveMessage initialMessage = new SessionReceiveMessage(id,
@@ -702,10 +704,9 @@
pendingLargeMessage.releaseResources();
- ServerConsumerImpl.this.largeMessageSender = null;
-
- if (preAcknowledge)
+ if (preAcknowledge && !acked)
{
+ acked = true;
try
{
doAck(ref);
@@ -716,6 +717,8 @@
}
}
+ largeMessageSender = null;
+
return true;
}
finally
@@ -732,11 +735,14 @@
localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
- MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
+ MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(localChunkLen));
pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
- chunk = new SessionReceiveContinuationMessage(id, bodyBuffer.array(), positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
+ chunk = new SessionReceiveContinuationMessage(id,
+ bodyBuffer.array(),
+ positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage,
+ false);
return chunk;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-12-09 21:39:37 UTC (rev 5494)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-12-09 23:36:04 UTC (rev 5495)
@@ -66,6 +66,8 @@
// Constants -----------------------------------------------------
+ final static int RECEIVE_WAIT_TIME = 10000;
+
// Attributes ----------------------------------------------------
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
@@ -206,33 +208,33 @@
public void testMessageChunkFilePersistence() throws Exception
{
- testChunks(true, false, false, 100, 262144, false, 1000, 0);
+ testChunks(true, false, false, 100, 262144, false, RECEIVE_WAIT_TIME, 0);
}
public void testMessageChunkFilePersistenceBlocked() throws Exception
{
- testChunks(true, false, false, 100, 262144, true, 1000, 0);
+ testChunks(true, false, false, 100, 262144, true, RECEIVE_WAIT_TIME, 0);
}
public void testMessageChunkFilePersistenceBlockedPreCommit() throws Exception
{
- testChunks(true, false, true, 100, 262144, true, 1000, 0);
+ testChunks(true, false, true, 100, 262144, true, RECEIVE_WAIT_TIME, 0);
}
public void testMessageChunkFilePersistenceDelayed() throws Exception
{
- testChunks(true, false, false, 1, 50000, false, 1000, 2000);
+ testChunks(true, false, false, 1, 50000, false, RECEIVE_WAIT_TIME, 2000);
}
public void testMessageChunkNullPersistence() throws Exception
{
- testChunks(false, false, false, 1, 50000, false, 1000, 0);
+ testChunks(false, false, false, 1, 50000, false, RECEIVE_WAIT_TIME, 0);
}
public void testMessageChunkNullPersistenceDelayed() throws Exception
{
- testChunks(false, false, false, 100, 50000, false, 10000, 100);
+ testChunks(false, false, false, 100, 50000, false, RECEIVE_WAIT_TIME, 100);
}
public void testPageOnLargeMessage() throws Exception
@@ -249,44 +251,44 @@
public void testSendfileMessage() throws Exception
{
- testChunks(true, true, false, 100, 50000, false, 1000, 0);
+ testChunks(true, true, false, 100, 50000, false, RECEIVE_WAIT_TIME, 0);
}
public void testSendfileMessageOnNullPersistence() throws Exception
{
- testChunks(false, true, false, 100, 50000, false, 1000, 0);
+ testChunks(false, true, false, 100, 50000, false, RECEIVE_WAIT_TIME, 0);
}
public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
{
- testChunks(false, true, false, 100, 100, true, 1000, 0);
+ testChunks(false, true, false, 100, 100, true, RECEIVE_WAIT_TIME, 0);
}
public void testSendfileMessageSmallMessage() throws Exception
{
- testChunks(true, true, false, 100, 4, false, 1000, 0);
+ testChunks(true, true, false, 100, 4, false, RECEIVE_WAIT_TIME, 0);
}
public void testSendRegularMessageNullPersistence() throws Exception
{
- testChunks(false, false, false, 100, 100, false, 1000, 0);
+ testChunks(false, false, false, 100, 100, false, RECEIVE_WAIT_TIME, 0);
}
public void testSendRegularMessageNullPersistenceDelayed() throws Exception
{
- testChunks(false, false, false, 100, 100, false, 1000, 1000);
+ testChunks(false, false, false, 100, 100, false, RECEIVE_WAIT_TIME, 1000);
}
public void testSendRegularMessagePersistence() throws Exception
{
- testChunks(true, false, false, 100, 100, false, 1000, 0);
+ testChunks(true, false, false, 100, 100, false, RECEIVE_WAIT_TIME, 0);
}
public void testSendRegularMessagePersistenceDelayed() throws Exception
{
- testChunks(true, false, false, 100, 100, false, 1000, 1000);
+ testChunks(true, false, false, 100, 100, false, RECEIVE_WAIT_TIME, 1000);
}
public void testTwoBindingsTwoStartedConsumers() throws Exception
@@ -330,7 +332,7 @@
ClientConsumer consumer = session.createFileConsumer(new File(getClientLargeMessagesDir()), queue[1]);
- ClientMessage msg = consumer.receive(5000);
+ ClientMessage msg = consumer.receive(RECEIVE_WAIT_TIME);
assertNull(consumer.receive(1000));
assertNotNull(msg);
@@ -346,7 +348,7 @@
session.start();
- msg = consumer1.receive(5000);
+ msg = consumer1.receive(RECEIVE_WAIT_TIME);
assertNotNull(msg);
msg.acknowledge();
consumer1.close();
@@ -564,7 +566,7 @@
for (int i = 0; i < 100; i++)
{
- ClientMessage message2 = consumer.receive(10000);
+ ClientMessage message2 = consumer.receive(RECEIVE_WAIT_TIME);
assertNotNull(message2);
More information about the jboss-cvs-commits
mailing list