Author: clebert.suconic(a)jboss.com
Date: 2011-12-21 23:23:07 -0500 (Wed, 21 Dec 2011)
New Revision: 11926
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
https://issues.jboss.org/browse/JBPAPP-7809 - fixing large message copy on DLQ / Expiry
Queue
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-21
15:53:48 UTC (rev 11925)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-22
04:23:07 UTC (rev 11926)
@@ -1746,25 +1746,6 @@
messageEncoding.decode(buff);
- if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
- {
- long originalMessageID =
largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
-
- LargeServerMessage originalMessage =
(LargeServerMessage)messages.get(originalMessageID);
-
- if (originalMessage == null)
- {
- // this could happen if the message was deleted but the file still exists as
the file still being used
- originalMessage = createLargeMessage();
- originalMessage.setDurable(true);
- originalMessage.setMessageID(originalMessageID);
- messages.put(originalMessageID, originalMessage);
- }
-
- originalMessage.incrementDelayDeletionCount();
-
- largeMessage.setLinkedMessage(originalMessage);
- }
return largeMessage;
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-12-21
15:53:48 UTC (rev 11925)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-12-22
04:23:07 UTC (rev 11926)
@@ -48,8 +48,6 @@
// Attributes ----------------------------------------------------
private final JournalStorageManager storageManager;
-
- private LargeServerMessage linkMessage;
private long pendingRecordID = -1;
@@ -80,7 +78,6 @@
private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties
properties, final SequentialFile fileCopy, final long newID)
{
super(copy, properties);
- linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
bodySize = copy.bodySize;
@@ -191,7 +188,7 @@
checkDelete();
}
}
-
+
@Override
public BodyEncoder getBodyEncoder() throws HornetQException
{
@@ -203,27 +200,19 @@
{
if (getRefCount() <= 0)
{
- if (linkMessage != null)
+ if (LargeServerMessageImpl.isTrace)
{
- // This file is linked to another message, deleting the reference where it
belongs on this case
- linkMessage.decrementDelayDeletionCount();
+ LargeServerMessageImpl.log.trace("Deleting file " + file + "
as the usage was complete");
}
- else
- {
- if (LargeServerMessageImpl.isTrace)
- {
- LargeServerMessageImpl.log.trace("Deleting file " + file +
" as the usage was complete");
- }
- try
- {
- deleteFile();
- }
- catch (Exception e)
- {
- LargeServerMessageImpl.log.error(e.getMessage(), e);
- }
+ try
+ {
+ deleteFile();
}
+ catch (Exception e)
+ {
+ LargeServerMessageImpl.log.error(e.getMessage(), e);
+ }
}
}
@@ -319,15 +308,9 @@
{
long idToUse = messageID;
- if (linkMessage != null)
- {
- idToUse = linkMessage.getMessageID();
- }
-
SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
- ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
- :
(LargeServerMessageImpl)linkMessage,
+ ServerMessage newMessage = new LargeServerMessageImpl(this,
properties,
newfile,
messageID);
@@ -338,60 +321,34 @@
@Override
public synchronized ServerMessage copy(final long newID)
{
- if (!paged)
+ try
{
- incrementDelayDeletionCount();
-
- long idToUse = messageID;
-
- if (linkMessage != null)
- {
- idToUse = linkMessage.getMessageID();
- }
-
- SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
-
- ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ?
this
- :
(LargeServerMessageImpl)linkMessage,
- properties,
- newfile,
- newID);
+ validateFile();
+
+ SequentialFile file = this.file;
+
+ SequentialFile newFile = storageManager.createFileForLargeMessage(newID,
durable);
+
+ file.copyTo(newFile);
+
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties,
newFile, newID);
+
return newMessage;
}
- else
+ catch (Exception e)
{
- try
- {
- validateFile();
-
- SequentialFile file = this.file;
-
- SequentialFile newFile = storageManager.createFileForLargeMessage(newID,
durable);
-
- file.copyTo(newFile);
-
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this,
properties, newFile, newID);
-
- newMessage.linkMessage = null;
-
- newMessage.setPaged();
-
- return newMessage;
- }
- catch (Exception e)
- {
- log.warn("Error on copying large message this for DLA or Expiry",
e);
- return null;
- }
- finally
- {
- releaseResources();
- }
+ log.warn("Error on copying large message " + this + " for DLA or
Expiry", e);
+ return null;
}
+ finally
+ {
+ releaseResources();
+ }
}
- public SequentialFile getFile()
+ public SequentialFile getFile() throws Exception
{
+ validateFile();
return file;
}
@@ -463,32 +420,6 @@
}
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
- */
- public void setLinkedMessage(final LargeServerMessage message)
- {
- if (file != null)
- {
- // Sanity check.. it shouldn't happen
- throw new IllegalStateException("LargeMessage file was already set");
- }
-
- linkMessage = message;
-
- file = storageManager.createFileForLargeMessage(message.getMessageID(), durable);
- try
- {
- openFile();
- bodySize = file.size();
- closeFile();
- }
- catch (Exception e)
- {
- throw new RuntimeException("could not setup linked file", e);
- }
- }
-
// Inner classes -------------------------------------------------
class DecodingContext implements BodyEncoder
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-12-21
15:53:48 UTC (rev 11925)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-12-22
04:23:07 UTC (rev 11926)
@@ -13,6 +13,7 @@
package org.hornetq.core.server;
+
/**
* A LargeMessage
*
@@ -26,9 +27,6 @@
{
void addBytes(byte[] bytes) throws Exception;
- /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we
specify a link between the messages */
- void setLinkedMessage(LargeServerMessage message);
-
void setPendingRecordID(long pendingRecordID);
long getPendingRecordID();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-12-21
15:53:48 UTC (rev 11925)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-12-22
04:23:07 UTC (rev 11926)
@@ -691,6 +691,9 @@
serverLocator.setBlockOnDurableSend(!useDuplicateDetection);
serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
serverLocator.setCallTimeout(callTimeout);
+
+ // No producer flow control on the bridges, as we don't want to lock the
queues
+ serverLocator.setProducerWindowSize(-1);
if (retryInterval > 0)
{
@@ -930,6 +933,9 @@
targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
targetLocator.setMinLargeMessageSize(minLargeMessageSize);
+ // No producer flow control on the bridges, as we don't want to lock the
queues
+ targetLocator.setProducerWindowSize(-1);
+
targetLocator.setAfterConnectionInternalListener(this);
targetLocator.setNodeID(nodeId);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-21
15:53:48 UTC (rev 11925)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-22
04:23:07 UTC (rev 11926)
@@ -930,7 +930,7 @@
int localChunkLen = 0;
localChunkLen = (int)Math.min(sizePendingLargeMessage -
positionPendingLargeMessage, minLargeMessageSize);
-
+
HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
context.encode(bodyBuffer, localChunkLen);
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
(rev 0)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java 2011-12-22
04:23:07 UTC (rev 11926)
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * This test will send large messages in page-mode, DLQ then, expiry then, and they
should be received fine
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class ExpiryLargeMessageTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+ final SimpleString EXPIRY = new SimpleString("my-expiry");
+
+ final SimpleString DLQ = new SimpleString("my-DLQ");
+
+ final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
+
+ final int messageSize = 10 * 1024;
+
+ // it has to be an even number
+ final int numberOfMessages = 50;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testExpiryMessagesThenDLQ() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(50 * 1024);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setExpiryAddress(EXPIRY);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(50 * 1024);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+ server.start();
+
+ try
+ {
+
+ server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+ server.createQueue(DLQ, DLQ, null, true, false);
+
+ server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ byte bufferSample[] = new byte[messageSize];
+
+ for (int i = 0; i < bufferSample.length; i++)
+ {
+ bufferSample[i] = getSamplebyte(i);
+ }
+
+ ClientProducer producer = session.createProducer(MY_QUEUE);
+
+ long timeToExpiry = System.currentTimeMillis() + 1000;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("count", i);
+
+ // Send a few regular messages first, then all is just large messages
+ if (i % 2 == 0)
+ {
+ message.putBooleanProperty("tst-large", false);
+ message.getBodyBuffer().writeBytes(bufferSample);
+ }
+ else
+ {
+ message.putBooleanProperty("tst-large", true);
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+ }
+
+ message.setExpiration(timeToExpiry);
+
+ producer.send(message);
+ }
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+
+ Thread.sleep(1500);
+
+ ClientConsumer cons = session.createConsumer(MY_QUEUE);
+ assertNull(cons.receive(1000));
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+
+ // Consume half of the messages to make sure all the messages are paging (on the
second try)
+ for (int i = 0 ; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ cons.close();
+
+ for (int rep = 0; rep < 6; rep++)
+ {
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ System.out.println("Trying " + rep);
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location),
message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+
+ session.rollback();
+
+ cons.close();
+
+ session.close();
+ sf.close();
+
+ if (rep == 0)
+ {
+ // restart the server at the first try
+ server.stop();
+ server.start();
+ }
+
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false);
+ session.start();
+ }
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+ assertNull(cons.receiveImmediate());
+
+ cons.close();
+
+ session.close();
+ sf.close();
+
+ for (int rep = 0; rep < 2; rep++)
+ {
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(DLQ);
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location),
message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+ if (rep == 0)
+ {
+ session.rollback();
+ session.close();
+ sf.close();
+ server.stop();
+ server.start();
+ }
+ }
+
+ session.commit();
+
+ assertNull(cons.receiveImmediate());
+
+ session.close();
+ sf.close();
+ locator.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-12-21
15:53:48 UTC (rev 11925)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-12-22
04:23:07 UTC (rev 11926)
@@ -96,9 +96,6 @@
}
locators.clear();
super.tearDown();
-// checkFreePort(5445);
-// checkFreePort(5446);
-// checkFreePort(5447);
if (InVMRegistry.instance.size() > 0)
{
fail("InVMREgistry size > 0");