[jboss-cvs] JBoss Messaging SVN: r5188 - in branches/Branch_Chunk_CRS2: src/main/org/jboss/messaging/core/server and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Oct 27 18:45:47 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-27 18:45:47 -0400 (Mon, 27 Oct 2008)
New Revision: 5188
Modified:
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/Queue.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
Log:
Fix on scheduled delivery
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-27 22:25:55 UTC (rev 5187)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-27 22:45:47 UTC (rev 5188)
@@ -90,6 +90,7 @@
@Override
public synchronized void encodeBody(final MessagingBuffer bufferOut, final int start, final int size)
{
+ new Exception ("Encode body");
validateFile();
try
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-27 22:25:55 UTC (rev 5187)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-27 22:45:47 UTC (rev 5188)
@@ -509,17 +509,14 @@
Queue queue = queues.get(encoding.queueID);
- if (queue == null)
- {
- throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
- }
- //remove the reference and then add it back in with the scheduled time set.
- MessageReference removed = queue.removeReferenceWithID(messageID);
+ if (queue == null)
+ {
+ throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+ }
- removed.setScheduledDeliveryTime(encoding.scheduledDeliveryTime);
+ //remove the reference and then add it back in with the scheduled time set.
+ queue.rescheduleDelivery(messageID, encoding.scheduledDeliveryTime);
- queue.addLast(removed);
-
break;
}
default:
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/Queue.java 2008-10-27 22:25:55 UTC (rev 5187)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/Queue.java 2008-10-27 22:45:47 UTC (rev 5188)
@@ -108,6 +108,9 @@
MessageReference removeReferenceWithID(long id);
+ /** Remove message from queue, add it to the scheduled delivery list without affect reference counting */
+ void rescheduleDelivery(long id, long scheduledDeliveryTime);
+
MessageReference getReference(long id);
void deleteAllReferences(StorageManager storageManager) throws Exception;
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-27 22:25:55 UTC (rev 5187)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-27 22:45:47 UTC (rev 5188)
@@ -290,7 +290,29 @@
return removed;
}
+
+ // Remove message from queue, add it to the scheduled delivery list without affect reference counting
+ public void rescheduleDelivery(final long id, final long scheduledDeliveryTime)
+ {
+ Iterator<MessageReference> iterator = messageReferences.iterator();
+ while (iterator.hasNext())
+ {
+ MessageReference ref = iterator.next();
+ if (ref.getMessage().getMessageID() == id)
+ {
+ iterator.remove();
+
+ ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+
+ checkAndSchedule(ref);
+
+ break;
+ }
+ }
+ }
+
+
public synchronized MessageReference getReference(final long id)
{
Iterator<MessageReference> iterator = messageReferences.iterator();
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-27 22:25:55 UTC (rev 5187)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-27 22:45:47 UTC (rev 5188)
@@ -225,69 +225,69 @@
public void testMessageChunkNullPersistence() throws Exception
{
- testInternal(false, false, 1, 5000, false, 0);
+ testInternal(false, false, 1, 5000, false, 1000, 0);
}
public void testMessageChunkNullPersistenceDelayed() throws Exception
{
- testInternal(false, false, 100, 5000, false, 100);
+ testInternal(false, false, 100, 5000, false, 1000, 100);
}
public void testMessageChunkFilePersistence() throws Exception
{
- testInternal(true, false, 100, 262144, false, 0);
+ testInternal(true, false, 100, 262144, false, 1000, 0);
}
public void testMessageChunkFilePersistence100M() throws Exception
{
- testInternal(true, true, 1, 26214400, false, 0);
+ testInternal(true, true, 1, 26214400, false, 1000, 0);
}
public void testMessageChunkFilePersistenceDelayed() throws Exception
{
- testInternal(true, false, 1, 50000, false, 1000);
+ testInternal(true, false, 1, 50000, false, 1000, 2000);
}
public void testSendfileMessage() throws Exception
{
- testInternal(true, true, 100, 50000, false, 0);
+ testInternal(true, true, 100, 50000, false, 1000, 0);
}
public void testSendfileMessageOnNullPersistence() throws Exception
{
- testInternal(false, true, 100, 50000, false, 0);
+ testInternal(false, true, 100, 50000, false, 1000, 0);
}
public void testSendfileMessageSmallMessage() throws Exception
{
- testInternal(true, true, 100, 4, false, 0);
+ testInternal(true, true, 100, 4, false, 1000, 0);
}
public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
{
- testInternal(false, true, 100, 100, false, 0);
+ testInternal(false, true, 100, 100, false, 1000, 0);
}
public void testSendRegularMessageNullPersistence() throws Exception
{
- testInternal(false, false, 100, 100, false, 0);
+ testInternal(false, false, 100, 100, false, 1000, 0);
}
public void testSendRegularMessageNullPersistenceDelayed() throws Exception
{
- testInternal(false, false, 100, 100, false, 1000);
+ testInternal(false, false, 100, 100, false, 1000, 1000);
}
public void testSendRegularMessagePersistence() throws Exception
{
- testInternal(true, false, 100, 100, false, 0);
+ testInternal(true, false, 100, 100, false, 1000, 0);
}
public void testSendRegularMessagePersistenceDelayed() throws Exception
{
- testInternal(true, false, 100, 100, false, 1000);
+ testInternal(true, false, 100, 100, false, 1000, 1000);
}
public void testTwoBindingsOneAckAndrestart() throws Exception
@@ -534,6 +534,7 @@
final int numberOfMessages,
final int numberOfIntegers,
final boolean sendingBlocking,
+ final int waitOnConsumer,
final long delayDelivery) throws Exception
{
@@ -628,7 +629,7 @@
for (int i = 0; i < numberOfMessages; i++)
{
- ClientMessage message = consumer.receive(60000 + delayDelivery);
+ ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
assertNotNull(message);
@@ -668,6 +669,9 @@
}
session.close();
+
+ File largemsgsFile = new File(this.largeMessagesDir);
+ assertEquals(0, largemsgsFile.list().length);
}
finally
{
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java 2008-10-27 22:25:55 UTC (rev 5187)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java 2008-10-27 22:45:47 UTC (rev 5188)
@@ -48,13 +48,13 @@
public void testMessageChunkFilePersistence1G() throws Exception
{
- testInternal(true, true, 2, 268435456, false, 0);
+ testInternal(true, true, 2, 268435456, false, 120000, 0);
}
@Override
public void testMessageChunkFilePersistence100M() throws Exception
{
- testInternal(true, true, 10, 26214400, false, 0);
+ testInternal(true, true, 10, 26214400, false, 120000, 0);
}
// Package protected ---------------------------------------------
More information about the jboss-cvs-commits
mailing list