[jboss-cvs] JBoss Messaging SVN: r5188 - in branches/Branch_Chunk_CRS2: src/main/org/jboss/messaging/core/server and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Oct 27 18:45:47 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-10-27 18:45:47 -0400 (Mon, 27 Oct 2008)
New Revision: 5188

Modified:
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/Queue.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
Log:
Fix on scheduled delivery

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java	2008-10-27 22:25:55 UTC (rev 5187)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java	2008-10-27 22:45:47 UTC (rev 5188)
@@ -90,6 +90,7 @@
    @Override
    public synchronized void encodeBody(final MessagingBuffer bufferOut, final int start, final int size)
    {
+      new Exception ("Encode body");
       validateFile();
 
       try

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-10-27 22:25:55 UTC (rev 5187)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-10-27 22:45:47 UTC (rev 5188)
@@ -509,17 +509,14 @@
 
                Queue queue = queues.get(encoding.queueID);
 
-                  if (queue == null)
-                  {
-                     throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
-                  }
-                  //remove the reference and then add it back in with the scheduled time set.
-                  MessageReference removed = queue.removeReferenceWithID(messageID);
+               if (queue == null)
+               {
+                  throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+               }
 
-                  removed.setScheduledDeliveryTime(encoding.scheduledDeliveryTime);
+               //remove the reference and then add it back in with the scheduled time set.
+               queue.rescheduleDelivery(messageID, encoding.scheduledDeliveryTime);
 
-                  queue.addLast(removed);
-
                break;
             }
             default:

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/Queue.java	2008-10-27 22:25:55 UTC (rev 5187)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/Queue.java	2008-10-27 22:45:47 UTC (rev 5188)
@@ -108,6 +108,9 @@
   
    MessageReference removeReferenceWithID(long id);
    
+   /** Remove message from queue, add it to the scheduled delivery list without affect reference counting */
+   void rescheduleDelivery(long id, long scheduledDeliveryTime);
+   
    MessageReference getReference(long id);
    
    void deleteAllReferences(StorageManager storageManager) throws Exception;

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-27 22:25:55 UTC (rev 5187)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-27 22:45:47 UTC (rev 5188)
@@ -290,7 +290,29 @@
 
       return removed;
    }
+   
+   // Remove message from queue, add it to the scheduled delivery list without affect reference counting
+   public void rescheduleDelivery(final long id, final long scheduledDeliveryTime)
+   {
+      Iterator<MessageReference> iterator = messageReferences.iterator();
+      while (iterator.hasNext())
+      {
+         MessageReference ref = iterator.next();
 
+         if (ref.getMessage().getMessageID() == id)
+         {
+            iterator.remove();
+            
+            ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+            
+            checkAndSchedule(ref);
+
+            break;
+         }
+      }
+   }
+   
+
    public synchronized MessageReference getReference(final long id)
    {
       Iterator<MessageReference> iterator = messageReferences.iterator();

Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-27 22:25:55 UTC (rev 5187)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-27 22:45:47 UTC (rev 5188)
@@ -225,69 +225,69 @@
 
    public void testMessageChunkNullPersistence() throws Exception
    {
-      testInternal(false, false, 1, 5000, false, 0);
+      testInternal(false, false, 1, 5000, false, 1000, 0);
    }
 
    public void testMessageChunkNullPersistenceDelayed() throws Exception
    {
-      testInternal(false, false, 100, 5000, false, 100);
+      testInternal(false, false, 100, 5000, false, 1000, 100);
    }
 
    public void testMessageChunkFilePersistence() throws Exception
    {
-      testInternal(true, false, 100, 262144, false, 0);
+      testInternal(true, false, 100, 262144, false, 1000, 0);
    }
 
    public void testMessageChunkFilePersistence100M() throws Exception
    {
-      testInternal(true, true, 1, 26214400, false, 0);
+      testInternal(true, true, 1, 26214400, false, 1000, 0);
    }
 
    public void testMessageChunkFilePersistenceDelayed() throws Exception
    {
-      testInternal(true, false, 1, 50000, false, 1000);
+      testInternal(true, false, 1, 50000, false, 1000, 2000);
    }
 
    public void testSendfileMessage() throws Exception
    {
-      testInternal(true, true, 100, 50000, false, 0);
+      testInternal(true, true, 100, 50000, false, 1000, 0);
 
    }
 
    public void testSendfileMessageOnNullPersistence() throws Exception
    {
-      testInternal(false, true, 100, 50000, false, 0);
+      testInternal(false, true, 100, 50000, false, 1000, 0);
    }
 
    public void testSendfileMessageSmallMessage() throws Exception
    {
-      testInternal(true, true, 100, 4, false, 0);
+      testInternal(true, true, 100, 4, false, 1000, 0);
 
    }
 
    public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
    {
-      testInternal(false, true, 100, 100, false, 0);
+      testInternal(false, true, 100, 100, false, 1000, 0);
    }
 
    public void testSendRegularMessageNullPersistence() throws Exception
    {
-      testInternal(false, false, 100, 100, false, 0);
+      testInternal(false, false, 100, 100, false, 1000, 0);
    }
 
    public void testSendRegularMessageNullPersistenceDelayed() throws Exception
    {
-      testInternal(false, false, 100, 100, false, 1000);
+      testInternal(false, false, 100, 100, false, 1000, 1000);
    }
 
    public void testSendRegularMessagePersistence() throws Exception
    {
-      testInternal(true, false, 100, 100, false, 0);
+      testInternal(true, false, 100, 100, false, 1000, 0);
    }
 
    public void testSendRegularMessagePersistenceDelayed() throws Exception
    {
-      testInternal(true, false, 100, 100, false, 1000);
+      testInternal(true, false, 100, 100, false, 1000, 1000);
    }
 
    public void testTwoBindingsOneAckAndrestart() throws Exception
@@ -534,6 +534,7 @@
                             final int numberOfMessages,
                             final int numberOfIntegers,
                             final boolean sendingBlocking,
+                            final int waitOnConsumer,
                             final long delayDelivery) throws Exception
    {
 
@@ -628,7 +629,7 @@
 
          for (int i = 0; i < numberOfMessages; i++)
          {
-            ClientMessage message = consumer.receive(60000 + delayDelivery);
+            ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
             
             assertNotNull(message);
             
@@ -668,6 +669,9 @@
          }
 
          session.close();
+         
+         File largemsgsFile = new File(this.largeMessagesDir);
+         assertEquals(0, largemsgsFile.list().length);
       }
       finally
       {

Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java	2008-10-27 22:25:55 UTC (rev 5187)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java	2008-10-27 22:45:47 UTC (rev 5188)
@@ -48,13 +48,13 @@
 
    public void testMessageChunkFilePersistence1G() throws Exception
    {
-      testInternal(true, true, 2, 268435456, false, 0);
+      testInternal(true, true, 2, 268435456, false, 120000, 0);
    }
 
    @Override
    public void testMessageChunkFilePersistence100M() throws Exception
    {
-      testInternal(true, true, 10, 26214400, false, 0);
+      testInternal(true, true, 10, 26214400, false, 120000, 0);
    }
 
    // Package protected ---------------------------------------------




More information about the jboss-cvs-commits mailing list