Author: clebert.suconic(a)jboss.com
Date: 2011-09-01 18:59:45 -0400 (Thu, 01 Sep 2011)
New Revision: 11282
Added:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
back porting JBPAPP-7115
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-01
22:50:36 UTC (rev 11281)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-01
22:59:45 UTC (rev 11282)
@@ -143,6 +143,14 @@
*/
protected MessageImpl(final MessageImpl other)
{
+ this(other, other.getProperties());
+ }
+
+ /*
+ * Copy constructor
+ */
+ protected MessageImpl(final MessageImpl other, TypedProperties properties)
+ {
messageID = other.getMessageID();
userID = other.getUserID();
address = other.getAddress();
@@ -151,7 +159,7 @@
expiration = other.getExpiration();
timestamp = other.getTimestamp();
priority = other.getPriority();
- properties = new TypedProperties(other.getProperties());
+ this.properties = new TypedProperties(properties);
// This MUST be synchronized using the monitor on the other message to prevent it
running concurrently
// with getEncodedBuffer(), otherwise can introduce race condition when delivering
concurrently to
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-01
22:50:36 UTC (rev 11281)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-01
22:59:45 UTC (rev 11282)
@@ -26,6 +26,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
/**
* A LargeServerMessageImpl
@@ -70,12 +71,13 @@
/**
* Copy constructor
+ * @param properties
* @param copy
* @param fileCopy
*/
- private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile
fileCopy, final long newID)
+ private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties
properties, final SequentialFile fileCopy, final long newID)
{
- super(copy);
+ super(copy, properties);
linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
@@ -281,8 +283,30 @@
this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
}
}
+
+ @Override
+ public synchronized ServerMessage copy()
+ {
+ incrementDelayDeletionCount();
+ long idToUse = messageID;
+ if (linkMessage != null)
+ {
+ idToUse = linkMessage.getMessageID();
+ }
+
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
+
+ ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+ :
(LargeServerMessageImpl)linkMessage,
+ properties,
+ newfile,
+ messageID);
+ return newMessage;
+ }
+
+
@Override
public synchronized ServerMessage copy(final long newID)
{
@@ -301,6 +325,7 @@
ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ?
this
:
(LargeServerMessageImpl)linkMessage,
+ properties,
newfile,
newID);
return newMessage;
@@ -317,7 +342,7 @@
file.copyTo(newFile);
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile,
newID);
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this,
properties, newFile, newID);
newMessage.linkMessage = null;
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-01
22:50:36 UTC (rev 11281)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-01
22:59:45 UTC (rev 11282)
@@ -325,7 +325,7 @@
// Consumer implementation ---------------------------------------
/* Hook for processing message before forwarding */
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
if (useDuplicateDetection)
{
@@ -337,10 +337,20 @@
if (transformer != null)
{
- message = transformer.transform(message);
+ final ServerMessage transformedMessage = transformer.transform(message);
+ if (transformedMessage != message)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("The transformer " + transformer + " made a copy
of the message " + message + " as transformedMessage");
+ }
+ }
+ return transformedMessage;
}
-
- return message;
+ else
+ {
+ return message;
+ }
}
/**
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-01
22:50:36 UTC (rev 11281)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-01
22:59:45 UTC (rev 11282)
@@ -113,34 +113,50 @@
}
@Override
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
// We make a copy of the message, then we strip out the unwanted routing id headers
and leave
// only
// the one pertinent for the address node - this is important since different
queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require
different headers
- message = message.copy();
+ ServerMessage messageCopy = message.copy();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Clustered bridge copied message " + message + " as
" + messageCopy + " before delivery");
+ }
// TODO - we can optimise this
- Set<SimpleString> propNames = new
HashSet<SimpleString>(message.getPropertyNames());
+ Set<SimpleString> propNames = new
HashSet<SimpleString>(messageCopy.getPropertyNames());
byte[] queueIds = message.getBytesProperty(idsHeaderName);
+
+ if (queueIds == null)
+ {
+ // Sanity check only
+ log.warn("no queue IDs defined!, originalMessage = " + message +
+ ", copiedMessage = " +
+ messageCopy +
+ ", props=" +
+ idsHeaderName);
+ throw new IllegalStateException("no queueIDs defined");
+ }
for (SimpleString propName : propNames)
{
if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
{
- message.removeProperty(propName);
+ messageCopy.removeProperty(propName);
}
}
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+ messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
- message = super.beforeForward(message);
-
- return message;
+ messageCopy = super.beforeForward(messageCopy);
+
+ return messageCopy;
}
private void setupNotificationConsumer() throws Exception
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-01
22:50:36 UTC (rev 11281)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-01
22:59:45 UTC (rev 11282)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.MemorySize;
+import org.hornetq.utils.TypedProperties;
/**
*
@@ -89,6 +90,14 @@
super(other);
}
+ /*
+ * Copy constructor
+ */
+ protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties
properties)
+ {
+ super(other, properties);
+ }
+
public boolean isServerMessage()
{
return true;
@@ -193,6 +202,7 @@
public ServerMessage copy()
{
+ // This is a simple copy, used only to avoid changing original properties
return new ServerMessageImpl(this);
}
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01
22:50:36 UTC (rev 11281)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01
22:59:45 UTC (rev 11282)
@@ -38,7 +38,6 @@
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -71,6 +70,16 @@
{
private static final Logger log = Logger.getLogger(ClusterTestBase.class);
+ public ClusterTestBase()
+ {
+ super();
+ }
+
+ public ClusterTestBase(String name)
+ {
+ super(name);
+ }
+
private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
TransportConstants.DEFAULT_PORT + 1,
TransportConstants.DEFAULT_PORT + 2,
@@ -690,6 +699,11 @@
Assert.assertNotNull("consumer " + consumerIDs[i] + " did
not receive message " + j, message);
}
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
if (ack)
{
message.acknowledge();
@@ -748,6 +762,11 @@
Assert.fail("consumer " + i + " did not receive all
messages");
}
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
if (ack)
{
message.acknowledge();
@@ -792,10 +811,7 @@
if (isLargeMessage())
{
- for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
- {
- assertEquals(getSamplebyte(posMsg),
message.getBodyBuffer().readByte());
- }
+ validateLargeMessage(message);
}
if (ack)
@@ -891,6 +907,11 @@
if (message != null)
{
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
ClusterTestBase.log.info("check receive Consumer " + consumerID
+
" received message " +
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
@@ -925,6 +946,10 @@
Assert.assertEquals("consumer " + consumerIDs[count] + " message
" + i,
i,
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
count++;
@@ -981,6 +1006,10 @@
ClientMessage msg = holder.consumer.receive(10000);
Assert.assertNotNull(msg);
+ if (isLargeMessage())
+ {
+ validateLargeMessage(msg);
+ }
int count = msg.getIntProperty(ClusterTestBase.COUNT_PROP);
@@ -1024,6 +1053,10 @@
ClientMessage msg = holder.consumer.consumer.receive(10000);
Assert.assertNotNull(msg);
+ if (isLargeMessage())
+ {
+ validateLargeMessage(msg);
+ }
int p = msg.getIntProperty(ClusterTestBase.COUNT_PROP);
@@ -1078,6 +1111,11 @@
{
int count =
(Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
// log.info("consumer " + consumerIDs[i] + " received
message " + count);
Assert.assertFalse(counts.contains(count));
@@ -1171,6 +1209,12 @@
message = consumer.consumer.receive(500);
if (message != null)
{
+
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
if (ack)
{
message.acknowledge();
@@ -1356,7 +1400,6 @@
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(getDefaultJournalType());
configuration.setSharedStore(sharedStorage);
- configuration.setThreadPoolMaxSize(10);
if (sharedStorage)
{
// Shared storage will share the node between the backup and live node
@@ -1904,6 +1947,17 @@
}
}
+ /**
+ * @param message
+ */
+ private void validateLargeMessage(ClientMessage message)
+ {
+ for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+ }
+ }
+
protected boolean isFileStorage()
{
return true;
Added:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
(rev 0)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java 2011-09-01
22:59:45 UTC (rev 11282)
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+
+/**
+ * A LargeMessageRedistributionTest
+ *
+ * @author clebert
+ *
+ *
+ */
+public class LargeMessageRedistributionTest extends MessageRedistributionTest
+{
+
+ public LargeMessageRedistributionTest()
+ {
+ super();
+ }
+
+ public LargeMessageRedistributionTest(String name)
+ {
+ super(name);
+ }
+
+ protected boolean isLargeMessage()
+ {
+ return true;
+ }
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-01
22:50:36 UTC (rev 11281)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-01
22:59:45 UTC (rev 11282)
@@ -25,9 +25,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -44,6 +41,19 @@
{
private static final Logger log = Logger.getLogger(MessageRedistributionTest.class);
+ public MessageRedistributionTest()
+ {
+ super();
+ }
+
+ /**
+ * @param name
+ */
+ public MessageRedistributionTest(String name)
+ {
+ super(name);
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -118,96 +128,6 @@
MessageRedistributionTest.log.info("Test done");
}
- //
https://issues.jboss.org/browse/HORNETQ-654
- public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
- {
- setupCluster(false);
-
- MessageRedistributionTest.log.info("Doing test");
-
- startServers(0, 1, 2);
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- createQueue(0, "queues.testaddress", "queue0", null, true);
- createQueue(1, "queues.testaddress", "queue0", null, true);
- createQueue(2, "queues.testaddress", "queue0", null, true);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 2, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);
-
- send(0, "queues.testaddress", 20, true, null);
-
- getReceivedOrder(0, true);
- int[] ids1 = getReceivedOrder(1, false);
- getReceivedOrder(2, true);
-
- for (ClusterConnection conn :
servers[1].getClusterManager().getClusterConnections())
- {
- ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
- for (MessageFlowRecord record : impl.getRecords().values())
- {
- if (record.getBridge() != null)
- {
- System.out.println("stop record bridge");
- record.getBridge().stop();
- }
- }
- }
-
- removeConsumer(1);
-
- // Need to wait some time as we need to handle all redistributions before we stop
the servers
- Thread.sleep(5000);
-
- for (int i = 0; i <= 2; i++)
- {
- servers[i].stop();
- servers[i] = null;
- }
-
- setupServers();
-
- setupCluster(false);
-
- startServers(0, 1, 2);
-
- for (int i = 0 ; i <= 2; i++)
- {
- consumers[i] = null;
- sfs[i] = null;
- }
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
-
- verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
- MessageRedistributionTest.log.info("Test done");
- }
-
public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws
Exception
{
setupCluster(false);