[hornetq-commits] JBoss hornetq SVN: r11928 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 22 08:32:54 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-22 08:32:53 -0500 (Thu, 22 Dec 2011)
New Revision: 11928

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
Log:
JBPAPP-7809 - adding compatibility with old format

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-22 11:22:01 UTC (rev 11927)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-22 13:32:53 UTC (rev 11928)
@@ -1746,6 +1746,26 @@
 
       messageEncoding.decode(buff);
 
+      if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
+      {
+         // for compatibility: couple with old behaviour, copying the old file to avoid message loss
+         long originalMessageID = largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
+         
+         SequentialFile currentFile = createFileForLargeMessage(largeMessage.getMessageID(), true);
+         
+         if (!currentFile.exists())
+         {
+            SequentialFile linkedFile = createFileForLargeMessage(originalMessageID, true);
+            if (linkedFile.exists())
+            {
+               linkedFile.copyTo(currentFile);
+               linkedFile.close();
+            }
+         }
+         
+         currentFile.close();
+      }
+
       return largeMessage;
    }
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java	2011-12-22 11:22:01 UTC (rev 11927)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java	2011-12-22 13:32:53 UTC (rev 11928)
@@ -13,6 +13,9 @@
 
 package org.hornetq.tests.integration.client;
 
+import java.io.File;
+
+import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientMessage;
@@ -136,6 +139,7 @@
 
          Thread.sleep(1500);
 
+         // just to try expiring
          ClientConsumer cons = session.createConsumer(MY_QUEUE);
          assertNull(cons.receive(1000));
 
@@ -145,18 +149,17 @@
 
          cons = session.createConsumer(EXPIRY);
          session.start();
-         
-         
+
          // Consume half of the messages to make sure all the messages are paging (on the second try)
-         for (int i = 0 ; i < numberOfMessages / 2; i++)
+         for (int i = 0; i < numberOfMessages / 2; i++)
          {
             ClientMessage msg = cons.receive(5000);
             assertNotNull(msg);
             msg.acknowledge();
          }
-         
+
          session.commit();
-         
+
          cons.close();
 
          for (int rep = 0; rep < 6; rep++)
@@ -174,7 +177,7 @@
                {
                   System.out.println("Received " + i);
                }
-               
+
                for (int location = 0; location < messageSize; location++)
                {
                   assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
@@ -200,7 +203,7 @@
             session = sf.createSession(false, false);
             session.start();
          }
-         
+
          cons = session.createConsumer(EXPIRY);
          session.start();
          assertNull(cons.receiveImmediate());
@@ -262,6 +265,148 @@
       }
    }
 
+   /** 
+    * Tests if the system would still couple with old data where the LargeMessage was linked to its previous copy
+    * @throws Exception
+    */
+   public void testCompatilityWithLinks() throws Exception
+   {
+      HornetQServer server = createServer(true);
+
+      server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+      AddressSettings setting = new AddressSettings();
+      setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      setting.setMaxDeliveryAttempts(5);
+      setting.setMaxSizeBytes(-1);
+      setting.setPageSizeBytes(10 * 1024);
+      setting.setExpiryAddress(EXPIRY);
+      setting.setDeadLetterAddress(DLQ);
+      server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+      setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      setting.setMaxDeliveryAttempts(5);
+      setting.setMaxSizeBytes(-1);
+      setting.setPageSizeBytes(10 * 1024);
+      setting.setDeadLetterAddress(DLQ);
+      server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+      server.start();
+
+      try
+      {
+
+         server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+         server.createQueue(DLQ, DLQ, null, true, false);
+
+         server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+         ServerLocator locator = createInVMNonHALocator();
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(true, true, 0);
+
+         byte bufferSample[] = new byte[messageSize];
+
+         for (int i = 0; i < bufferSample.length; i++)
+         {
+            bufferSample[i] = getSamplebyte(i);
+         }
+
+         ClientProducer producer = session.createProducer(MY_QUEUE);
+
+         long timeToExpiry = System.currentTimeMillis() + 1000;
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = session.createMessage(true);
+
+            message.putIntProperty("count", i);
+
+            // Everything is going to be a large message
+            message.putBooleanProperty("tst-large", true);
+            message.setBodyInputStream(createFakeLargeStream(messageSize));
+
+            message.setExpiration(timeToExpiry);
+
+            producer.send(message);
+         }
+
+         server.stop();
+         server.start();
+
+         sf = locator.createSessionFactory();
+
+         session = sf.createSession(true, true, 0);
+         session.start();
+
+         Thread.sleep(1500);
+
+         ClientConsumer cons = session.createConsumer(MY_QUEUE);
+         assertNull(cons.receive(1000));
+
+         session.close();
+
+         session = sf.createSession(false, false);
+
+         cons = session.createConsumer(EXPIRY);
+         session.start();
+
+         ClientMessage msg = cons.receive(5000);
+         assertNotNull(msg);
+         msg.acknowledge();
+         session.rollback();
+
+         server.stop();
+
+         // rename the file, simulating old behaviour
+         long messageID = msg.getMessageID();
+         long oldID = msg.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
+
+         File largeMessagesFileDir = new File(getLargeMessagesDir());
+         File oldFile = new File(largeMessagesFileDir, oldID + ".msg");
+         File currentFile = new File(largeMessagesFileDir, messageID + ".msg");
+         currentFile.renameTo(oldFile);
+
+         server.start();
+
+         sf = locator.createSessionFactory();
+
+         session = sf.createSession(true, true, 0);
+         session.start();
+
+         cons = session.createConsumer(EXPIRY);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = cons.receive(5000);
+            assertNotNull(message);
+
+            if (i % 10 == 0)
+            {
+               System.out.println("Received " + i);
+            }
+
+            for (int location = 0; location < messageSize; location++)
+            {
+               assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
+            }
+            message.acknowledge();
+         }
+
+         session.commit();
+
+         session.close();
+         sf.close();
+         locator.close();
+      }
+      finally
+      {
+         server.stop();
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list