Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 08:32:53 -0500 (Thu, 22 Dec 2011)
New Revision: 11928
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
Log:
JBPAPP-7809 - adding compatibility with old format
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-22
11:22:01 UTC (rev 11927)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-22
13:32:53 UTC (rev 11928)
@@ -1746,6 +1746,26 @@
messageEncoding.decode(buff);
+ if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
+ {
+ // for compatibility: couple with old behaviour, copying the old file to avoid
message loss
+ long originalMessageID =
largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
+
+ SequentialFile currentFile =
createFileForLargeMessage(largeMessage.getMessageID(), true);
+
+ if (!currentFile.exists())
+ {
+ SequentialFile linkedFile = createFileForLargeMessage(originalMessageID,
true);
+ if (linkedFile.exists())
+ {
+ linkedFile.copyTo(currentFile);
+ linkedFile.close();
+ }
+ }
+
+ currentFile.close();
+ }
+
return largeMessage;
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java 2011-12-22
11:22:01 UTC (rev 11927)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java 2011-12-22
13:32:53 UTC (rev 11928)
@@ -13,6 +13,9 @@
package org.hornetq.tests.integration.client;
+import java.io.File;
+
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
@@ -136,6 +139,7 @@
Thread.sleep(1500);
+ // just to try expiring
ClientConsumer cons = session.createConsumer(MY_QUEUE);
assertNull(cons.receive(1000));
@@ -145,18 +149,17 @@
cons = session.createConsumer(EXPIRY);
session.start();
-
-
+
// Consume half of the messages to make sure all the messages are paging (on the
second try)
- for (int i = 0 ; i < numberOfMessages / 2; i++)
+ for (int i = 0; i < numberOfMessages / 2; i++)
{
ClientMessage msg = cons.receive(5000);
assertNotNull(msg);
msg.acknowledge();
}
-
+
session.commit();
-
+
cons.close();
for (int rep = 0; rep < 6; rep++)
@@ -174,7 +177,7 @@
{
System.out.println("Received " + i);
}
-
+
for (int location = 0; location < messageSize; location++)
{
assertEquals(getSamplebyte((long)location),
message.getBodyBuffer().readByte());
@@ -200,7 +203,7 @@
session = sf.createSession(false, false);
session.start();
}
-
+
cons = session.createConsumer(EXPIRY);
session.start();
assertNull(cons.receiveImmediate());
@@ -262,6 +265,148 @@
}
}
+ /**
+ * Tests if the system would still couple with old data where the LargeMessage was
linked to its previous copy
+ * @throws Exception
+ */
+ public void testCompatilityWithLinks() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(-1);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setExpiryAddress(EXPIRY);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(-1);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+ server.start();
+
+ try
+ {
+
+ server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+ server.createQueue(DLQ, DLQ, null, true, false);
+
+ server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ byte bufferSample[] = new byte[messageSize];
+
+ for (int i = 0; i < bufferSample.length; i++)
+ {
+ bufferSample[i] = getSamplebyte(i);
+ }
+
+ ClientProducer producer = session.createProducer(MY_QUEUE);
+
+ long timeToExpiry = System.currentTimeMillis() + 1000;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("count", i);
+
+ // Everything is going to be a large message
+ message.putBooleanProperty("tst-large", true);
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+
+ message.setExpiration(timeToExpiry);
+
+ producer.send(message);
+ }
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+ session.start();
+
+ Thread.sleep(1500);
+
+ ClientConsumer cons = session.createConsumer(MY_QUEUE);
+ assertNull(cons.receive(1000));
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.rollback();
+
+ server.stop();
+
+ // rename the file, simulating old behaviour
+ long messageID = msg.getMessageID();
+ long oldID = msg.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
+
+ File largeMessagesFileDir = new File(getLargeMessagesDir());
+ File oldFile = new File(largeMessagesFileDir, oldID + ".msg");
+ File currentFile = new File(largeMessagesFileDir, messageID +
".msg");
+ currentFile.renameTo(oldFile);
+
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+ session.start();
+
+ cons = session.createConsumer(EXPIRY);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location),
message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+
+ session.commit();
+
+ session.close();
+ sf.close();
+ locator.close();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------