Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 08:37:04 -0500 (Thu, 22 Dec 2011)
New Revision: 11929
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/LargeServerMessage.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
backporting JBPAPP-7809 into JBPAPP-7710
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-22
13:32:53 UTC (rev 11928)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-22
13:37:04 UTC (rev 11929)
@@ -1680,23 +1680,25 @@
if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
{
+ // for compatibility: couple with old behaviour, copying the old file to avoid
message loss
long originalMessageID =
largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
-
- LargeServerMessage originalMessage =
(LargeServerMessage)messages.get(originalMessageID);
-
- if (originalMessage == null)
+
+ SequentialFile currentFile =
createFileForLargeMessage(largeMessage.getMessageID(), true);
+
+ if (!currentFile.exists())
{
- // this could happen if the message was deleted but the file still exists as
the file still being used
- originalMessage = createLargeMessage();
- originalMessage.setDurable(true);
- originalMessage.setMessageID(originalMessageID);
- messages.put(originalMessageID, originalMessage);
+ SequentialFile linkedFile = createFileForLargeMessage(originalMessageID,
true);
+ if (linkedFile.exists())
+ {
+ linkedFile.copyTo(currentFile);
+ linkedFile.close();
+ }
}
+
+ currentFile.close();
+ }
- originalMessage.incrementDelayDeletionCount();
- largeMessage.setLinkedMessage(originalMessage);
- }
return largeMessage;
}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-12-22
13:32:53 UTC (rev 11928)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-12-22
13:37:04 UTC (rev 11929)
@@ -48,8 +48,6 @@
// Attributes ----------------------------------------------------
private final JournalStorageManager storageManager;
-
- private LargeServerMessage linkMessage;
private boolean paged;
@@ -78,7 +76,6 @@
private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties
properties, final SequentialFile fileCopy, final long newID)
{
super(copy, properties);
- linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
bodySize = copy.bodySize;
@@ -155,18 +152,28 @@
public synchronized void incrementDelayDeletionCount()
{
delayDeletionCount.incrementAndGet();
+ try
+ {
+ incrementRefCount();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
public synchronized void decrementDelayDeletionCount() throws Exception
{
int count = delayDeletionCount.decrementAndGet();
+
+ decrementRefCount();
if (count == 0)
{
checkDelete();
}
}
-
+
@Override
public BodyEncoder getBodyEncoder() throws HornetQException
{
@@ -178,27 +185,19 @@
{
if (getRefCount() <= 0)
{
- if (linkMessage != null)
+ if (LargeServerMessageImpl.isTrace)
{
- // This file is linked to another message, deleting the reference where it
belongs on this case
- linkMessage.decrementDelayDeletionCount();
+ LargeServerMessageImpl.log.trace("Deleting file " + file + "
as the usage was complete");
}
- else
- {
- if (LargeServerMessageImpl.isTrace)
- {
- LargeServerMessageImpl.log.trace("Deleting file " + file +
" as the usage was complete");
- }
- try
- {
- deleteFile();
- }
- catch (Exception e)
- {
- LargeServerMessageImpl.log.error(e.getMessage(), e);
- }
+ try
+ {
+ deleteFile();
}
+ catch (Exception e)
+ {
+ LargeServerMessageImpl.log.error(e.getMessage(), e);
+ }
}
}
@@ -289,15 +288,9 @@
{
long idToUse = messageID;
- if (linkMessage != null)
- {
- idToUse = linkMessage.getMessageID();
- }
-
SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
- ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
- :
(LargeServerMessageImpl)linkMessage,
+ ServerMessage newMessage = new LargeServerMessageImpl(this,
properties,
newfile,
messageID);
@@ -308,65 +301,43 @@
@Override
public synchronized ServerMessage copy(final long newID)
{
- if (!paged)
+ try
{
- incrementDelayDeletionCount();
-
- long idToUse = messageID;
-
- if (linkMessage != null)
- {
- idToUse = linkMessage.getMessageID();
- }
-
- SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
-
- ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ?
this
- :
(LargeServerMessageImpl)linkMessage,
- properties,
- newfile,
- newID);
+ validateFile();
+
+ SequentialFile file = this.file;
+
+ SequentialFile newFile = storageManager.createFileForLargeMessage(newID,
durable);
+
+ file.copyTo(newFile);
+
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties,
newFile, newID);
+
return newMessage;
}
- else
+ catch (Exception e)
{
- try
- {
- validateFile();
-
- SequentialFile file = this.file;
-
- SequentialFile newFile = storageManager.createFileForLargeMessage(newID,
durable);
-
- file.copyTo(newFile);
-
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this,
properties, newFile, newID);
-
- newMessage.linkMessage = null;
-
- newMessage.setPaged();
-
- return newMessage;
- }
- catch (Exception e)
- {
- log.warn("Error on copying large message this for DLA or Expiry",
e);
- return null;
- }
+ log.warn("Error on copying large message " + this + " for DLA or
Expiry", e);
+ return null;
}
+ finally
+ {
+ releaseResources();
+ }
}
- public SequentialFile getFile()
+ public SequentialFile getFile() throws Exception
{
+ validateFile();
return file;
}
@Override
public String toString()
{
- return "ServerMessage[messageID=" + messageID + ",priority=" +
this.getPriority() +
+ return "LargeServerMessage[messageID=" + messageID +
",priority=" + this.getPriority() +
",expiration=[" + (this.getExpiration() != 0 ? new
java.util.Date(this.getExpiration()) : "null") + "]" +
- ", durable=" + durable + ", address=" + getAddress() +
",properties=" + properties.toString() + "]";
+ ", durable=" + durable + ", address=" + getAddress() +
",properties=" + properties.toString() + "]@" +
System.identityHashCode(this);
}
@@ -396,7 +367,7 @@
file = storageManager.createFileForLargeMessage(getMessageID(), durable);
- file.open();
+ openFile();
bodySize = file.size();
}
@@ -407,32 +378,27 @@
throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(), e);
}
}
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
- */
- public void setLinkedMessage(final LargeServerMessage message)
+
+ protected void openFile() throws Exception
{
- if (file != null)
+ if (file == null)
+ {
+ validateFile();
+ }
+ else
+ if (!file.isOpen())
{
- // Sanity check.. it shouldn't happen
- throw new IllegalStateException("LargeMessage file was already set");
- }
-
- linkMessage = message;
-
- file = storageManager.createFileForLargeMessage(message.getMessageID(), durable);
- try
- {
file.open();
- bodySize = file.size();
- file.close();
}
- catch (Exception e)
- {
- throw new RuntimeException("could not setup linked file", e);
- }
}
+
+ protected void closeFile() throws Exception
+ {
+ if (file != null && file.isOpen())
+ {
+ file.close();
+ }
+ }
// Inner classes -------------------------------------------------
@@ -444,6 +410,10 @@
{
try
{
+ if (cFile != null && cFile.isOpen())
+ {
+ cFile.close();
+ }
cFile = file.copy();
cFile.open();
}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-12-22
13:32:53 UTC (rev 11928)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-12-22
13:37:04 UTC (rev 11929)
@@ -13,6 +13,7 @@
package org.hornetq.core.server;
+
/**
* A LargeMessage
*
@@ -26,9 +27,6 @@
{
void addBytes(byte[] bytes) throws Exception;
- /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we
specify a link between the messages */
- void setLinkedMessage(LargeServerMessage message);
-
boolean isFileExists() throws Exception;
/**
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-22
13:32:53 UTC (rev 11928)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-22
13:37:04 UTC (rev 11929)
@@ -930,7 +930,7 @@
int localChunkLen = 0;
localChunkLen = (int)Math.min(sizePendingLargeMessage -
positionPendingLargeMessage, minLargeMessageSize);
-
+
HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
context.encode(bodyBuffer, localChunkLen);
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
(rev 0)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java 2011-12-22
13:37:04 UTC (rev 11929)
@@ -0,0 +1,415 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.io.File;
+
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * This test will send large messages in page-mode, DLQ then, expiry then, and they
should be received fine
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class ExpiryLargeMessageTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+ final SimpleString EXPIRY = new SimpleString("my-expiry");
+
+ final SimpleString DLQ = new SimpleString("my-DLQ");
+
+ final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
+
+ final int messageSize = 10 * 1024;
+
+ // it has to be an even number
+ final int numberOfMessages = 50;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testExpiryMessagesThenDLQ() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(50 * 1024);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setExpiryAddress(EXPIRY);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(50 * 1024);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+ server.start();
+
+ try
+ {
+
+ server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+ server.createQueue(DLQ, DLQ, null, true, false);
+
+ server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ byte bufferSample[] = new byte[messageSize];
+
+ for (int i = 0; i < bufferSample.length; i++)
+ {
+ bufferSample[i] = getSamplebyte(i);
+ }
+
+ ClientProducer producer = session.createProducer(MY_QUEUE);
+
+ long timeToExpiry = System.currentTimeMillis() + 1000;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("count", i);
+
+ // Send a few regular messages first, then all is just large messages
+ if (i % 2 == 0)
+ {
+ message.putBooleanProperty("tst-large", false);
+ message.getBodyBuffer().writeBytes(bufferSample);
+ }
+ else
+ {
+ message.putBooleanProperty("tst-large", true);
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+ }
+
+ message.setExpiration(timeToExpiry);
+
+ producer.send(message);
+ }
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+
+ Thread.sleep(1500);
+
+ ClientConsumer cons = session.createConsumer(MY_QUEUE);
+ assertNull(cons.receive(1000));
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ // Consume half of the messages to make sure all the messages are paging (on the
second try)
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ cons.close();
+
+ for (int rep = 0; rep < 6; rep++)
+ {
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ System.out.println("Trying " + rep);
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location),
message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+
+ session.rollback();
+
+ cons.close();
+
+ session.close();
+ sf.close();
+
+ if (rep == 0)
+ {
+ // restart the server at the first try
+ server.stop();
+ server.start();
+ }
+
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false);
+ session.start();
+ }
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+ assertNull(cons.receiveImmediate());
+
+ cons.close();
+
+ session.close();
+ sf.close();
+
+ for (int rep = 0; rep < 2; rep++)
+ {
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(DLQ);
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location),
message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+ if (rep == 0)
+ {
+ session.rollback();
+ session.close();
+ sf.close();
+ server.stop();
+ server.start();
+ }
+ }
+
+ session.commit();
+
+ assertNull(cons.receiveImmediate());
+
+ session.close();
+ sf.close();
+ locator.close();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ /**
+ * Tests if the system would still couple with old data where the LargeMessage was
linked to its previous copy
+ * @throws Exception
+ */
+ public void testCompatilityWithLinks() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(-1);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setExpiryAddress(EXPIRY);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(-1);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+ server.start();
+
+ try
+ {
+
+ server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+ server.createQueue(DLQ, DLQ, null, true, false);
+
+ server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ byte bufferSample[] = new byte[messageSize];
+
+ for (int i = 0; i < bufferSample.length; i++)
+ {
+ bufferSample[i] = getSamplebyte(i);
+ }
+
+ ClientProducer producer = session.createProducer(MY_QUEUE);
+
+ long timeToExpiry = System.currentTimeMillis() + 1000;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("count", i);
+
+ // Everything is going to be a large message
+ message.putBooleanProperty("tst-large", true);
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+
+ message.setExpiration(timeToExpiry);
+
+ producer.send(message);
+ }
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+ session.start();
+
+ Thread.sleep(1500);
+
+ ClientConsumer cons = session.createConsumer(MY_QUEUE);
+ assertNull(cons.receive(1000));
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.rollback();
+
+ server.stop();
+
+ // rename the file, simulating old behaviour
+ long messageID = msg.getMessageID();
+ long oldID = msg.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
+
+ File largeMessagesFileDir = new File(getLargeMessagesDir());
+ File oldFile = new File(largeMessagesFileDir, oldID + ".msg");
+ File currentFile = new File(largeMessagesFileDir, messageID +
".msg");
+ currentFile.renameTo(oldFile);
+
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+ session.start();
+
+ cons = session.createConsumer(EXPIRY);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location),
message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+
+ session.commit();
+
+ session.close();
+ sf.close();
+ locator.close();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}