Author: clebert.suconic(a)jboss.com
Date: 2011-09-09 17:41:06 -0400 (Fri, 09 Sep 2011)
New Revision: 11315
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
HORNETQ-753 and JBPAPP-7115
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-09
21:41:06 UTC (rev 11315)
@@ -143,6 +143,14 @@
*/
protected MessageImpl(final MessageImpl other)
{
+ this(other, other.getProperties());
+ }
+
+ /*
+ * Copy constructor
+ */
+ protected MessageImpl(final MessageImpl other, TypedProperties properties)
+ {
messageID = other.getMessageID();
userID = other.getUserID();
address = other.getAddress();
@@ -151,7 +159,7 @@
expiration = other.getExpiration();
timestamp = other.getTimestamp();
priority = other.getPriority();
- properties = new TypedProperties(other.getProperties());
+ this.properties = new TypedProperties(properties);
// This MUST be synchronized using the monitor on the other message to prevent it
running concurrently
// with getEncodedBuffer(), otherwise can introduce race condition when delivering
concurrently to
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-09
21:41:06 UTC (rev 11315)
@@ -26,6 +26,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
/**
* A LargeServerMessageImpl
@@ -70,12 +71,13 @@
/**
* Copy constructor
+ * @param properties
* @param copy
* @param fileCopy
*/
- private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile
fileCopy, final long newID)
+ private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties
properties, final SequentialFile fileCopy, final long newID)
{
- super(copy);
+ super(copy, properties);
linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
@@ -281,8 +283,28 @@
this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
}
}
+
+ @Override
+ public synchronized ServerMessage copy()
+ {
+ long idToUse = messageID;
+ if (linkMessage != null)
+ {
+ idToUse = linkMessage.getMessageID();
+ }
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
+
+ ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+ :
(LargeServerMessageImpl)linkMessage,
+ properties,
+ newfile,
+ messageID);
+ return newMessage;
+ }
+
+
@Override
public synchronized ServerMessage copy(final long newID)
{
@@ -301,6 +323,7 @@
ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ?
this
:
(LargeServerMessageImpl)linkMessage,
+ properties,
newfile,
newID);
return newMessage;
@@ -317,7 +340,7 @@
file.copyTo(newFile);
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile,
newID);
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this,
properties, newFile, newID);
newMessage.linkMessage = null;
@@ -341,9 +364,9 @@
@Override
public String toString()
{
- return "ServerMessage[messageID=" + messageID + ",priority=" +
this.getPriority() +
+ return "LargeServerMessage[messageID=" + messageID +
",priority=" + this.getPriority() +
",expiration=[" + (this.getExpiration() != 0 ? new
java.util.Date(this.getExpiration()) : "null") + "]" +
- ", durable=" + durable + ", address=" + getAddress() +
",properties=" + properties.toString() + "]";
+ ", durable=" + durable + ", address=" + getAddress() +
",properties=" + properties.toString() + "]@" +
System.identityHashCode(this);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-09
20:31:28 UTC (rev 11314)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-09
21:41:06 UTC (rev 11315)
@@ -61,6 +61,7 @@
{
super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
this.requiresResponse = requiresResponse;
+ this.message = message;
}
/**
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-09
21:41:06 UTC (rev 11315)
@@ -455,7 +455,7 @@
// Consumer implementation ---------------------------------------
/* Hook for processing message before forwarding */
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
if (useDuplicateDetection)
{
@@ -467,10 +467,20 @@
if (transformer != null)
{
- message = transformer.transform(message);
+ final ServerMessage transformedMessage = transformer.transform(message);
+ if (transformedMessage != message)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("The transformer " + transformer + " made a copy
of the message " + message + " as transformedMessage");
+ }
+ }
+ return transformedMessage;
}
-
- return message;
+ else
+ {
+ return message;
+ }
}
/**
@@ -535,6 +545,12 @@
// that this will throw a disconnect, we need to remove the message
// from the acks so it will get resent, duplicate detection will cope
// with any messages resent
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("XXX going to send message " + message);
+ }
+
try
{
producer.send(dest, message);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-09
20:31:28 UTC (rev 11314)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-09
21:41:06 UTC (rev 11315)
@@ -169,34 +169,50 @@
}
@Override
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
// We make a copy of the message, then we strip out the unwanted routing id headers
and leave
// only
// the one pertinent for the address node - this is important since different
queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require
different headers
- message = message.copy();
+ ServerMessage messageCopy = message.copy();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Clustered bridge copied message " + message + " as
" + messageCopy + " before delivery");
+ }
// TODO - we can optimise this
- Set<SimpleString> propNames = new
HashSet<SimpleString>(message.getPropertyNames());
+ Set<SimpleString> propNames = new
HashSet<SimpleString>(messageCopy.getPropertyNames());
byte[] queueIds = message.getBytesProperty(idsHeaderName);
+
+ if (queueIds == null)
+ {
+ // Sanity check only
+ log.warn("no queue IDs defined!, originalMessage = " + message +
+ ", copiedMessage = " +
+ messageCopy +
+ ", props=" +
+ idsHeaderName);
+ throw new IllegalStateException("no queueIDs defined");
+ }
for (SimpleString propName : propNames)
{
if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
{
- message.removeProperty(propName);
+ messageCopy.removeProperty(propName);
}
}
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+ messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
- message = super.beforeForward(message);
-
- return message;
+ messageCopy = super.beforeForward(messageCopy);
+
+ return messageCopy;
}
private void setupNotificationConsumer() throws Exception
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-09
21:41:06 UTC (rev 11315)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.MemorySize;
+import org.hornetq.utils.TypedProperties;
/**
*
@@ -89,6 +90,14 @@
super(other);
}
+ /*
+ * Copy constructor
+ */
+ protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties
properties)
+ {
+ super(other, properties);
+ }
+
public boolean isServerMessage()
{
return true;
@@ -193,6 +202,7 @@
public ServerMessage copy()
{
+ // This is a simple copy, used only to avoid changing original properties
return new ServerMessageImpl(this);
}
@@ -275,7 +285,7 @@
{
return "ServerMessage[messageID=" + messageID + ",priority=" +
this.getPriority() +
",expiration=" + (this.getExpiration() != 0 ? new
java.util.Date(this.getExpiration()) : 0) +
- ", durable=" + durable + ", address=" + getAddress() +
",properties=" + properties.toString() + "]";
+ ", durable=" + durable + ", address=" + getAddress() +
",properties=" + properties.toString() + "]@" +
System.identityHashCode(this);
}
// FIXME - this is stuff that is only used in large messages
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-09
20:31:28 UTC (rev 11314)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-09
21:41:06 UTC (rev 11315)
@@ -1073,6 +1073,11 @@
LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
+ if (log.isTraceEnabled())
+ {
+ log.trace("sendLarge::" + largeMsg);
+ }
+
if (currentLargeMessage != null)
{
ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with
ID=" + currentLargeMessage.getMessageID());
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-09
20:31:28 UTC (rev 11314)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-09
21:41:06 UTC (rev 11315)
@@ -74,7 +74,17 @@
public abstract class ClusterTestBase extends ServiceTestBase
{
private final Logger log = Logger.getLogger(this.getClass());
+ public ClusterTestBase()
+ {
+ super();
+ }
+ public ClusterTestBase(String name)
+ {
+ super(name);
+ }
+
+
private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
TransportConstants.DEFAULT_PORT + 1,
TransportConstants.DEFAULT_PORT + 2,
@@ -87,7 +97,18 @@
TransportConstants.DEFAULT_PORT + 9, };
private static final long WAIT_TIMEOUT = 10000;
+
+ protected int getLargeMessageSize()
+ {
+ return 500;
+ }
+ protected boolean isLargeMessage()
+ {
+ return false;
+ }
+
+
private static final long TIMEOUT_START_SERVER = 10;
@Override
@@ -635,6 +656,11 @@
}
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
+
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
producer.send(message);
@@ -686,9 +712,15 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
+
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
message.putStringProperty(key, val);
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
+
producer.send(message);
}
}
@@ -881,6 +913,12 @@
log.info("msg on ClusterTestBase = " + message);
+
+ if (isLargeMessage())
+ {
+ checkMessageBody(message);
+ }
+
if (ack)
{
message.acknowledge();
@@ -1180,7 +1218,10 @@
if (message != null)
{
int count =
(Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
+
+ checkMessageBody(message);
+
// log.info("consumer " + consumerIDs[i] + " received
message " + count);
Assert.assertFalse(counts.contains(count));
@@ -1241,6 +1282,20 @@
}
+ /**
+ * @param message
+ */
+ private void checkMessageBody(ClientMessage message)
+ {
+ if (isLargeMessage())
+ {
+ for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+ }
+ }
+ }
+
protected void verifyReceiveRoundRobinInSomeOrderNoAck(final int numMessages, final
int... consumerIDs) throws Exception
{
if (numMessages < consumerIDs.length)
@@ -1274,6 +1329,12 @@
message = consumer.consumer.receive(500);
if (message != null)
{
+
+ if (isLargeMessage())
+ {
+ checkMessageBody(message);
+ }
+
if (ack)
{
message.acknowledge();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-09
20:31:28 UTC (rev 11314)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-09
21:41:06 UTC (rev 11315)
@@ -25,9 +25,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -44,6 +41,19 @@
{
private static final Logger log = Logger.getLogger(MessageRedistributionTest.class);
+ public MessageRedistributionTest()
+ {
+ super();
+ }
+
+ /**
+ * @param name
+ */
+ public MessageRedistributionTest(String name)
+ {
+ super(name);
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -113,101 +123,11 @@
removeConsumer(1);
- verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
+ verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
MessageRedistributionTest.log.info("Test done");
}
- //
https://issues.jboss.org/browse/HORNETQ-654
- public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
- {
- setupCluster(false);
-
- MessageRedistributionTest.log.info("Doing test");
-
- startServers(0, 1, 2);
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- createQueue(0, "queues.testaddress", "queue0", null, true);
- createQueue(1, "queues.testaddress", "queue0", null, true);
- createQueue(2, "queues.testaddress", "queue0", null, true);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 2, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);
-
- send(0, "queues.testaddress", 20, true, null);
-
- getReceivedOrder(0, true);
- int[] ids1 = getReceivedOrder(1, false);
- getReceivedOrder(2, true);
-
- for (ClusterConnection conn :
servers[1].getClusterManager().getClusterConnections())
- {
- ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
- for (MessageFlowRecord record : impl.getRecords().values())
- {
- if (record.getBridge() != null)
- {
- System.out.println("stop record bridge");
- record.getBridge().stop();
- }
- }
- }
-
- removeConsumer(1);
-
- // Need to wait some time as we need to handle all redistributions before we stop
the servers
- Thread.sleep(1000);
-
- for (int i = 0; i <= 2; i++)
- {
- servers[i].stop();
- servers[i] = null;
- }
-
- setupServers();
-
- setupCluster(false);
-
- startServers(0, 1, 2);
-
- for (int i = 0 ; i <= 2; i++)
- {
- consumers[i] = null;
- sfs[i] = null;
- }
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
-
- verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
- MessageRedistributionTest.log.info("Test done");
- }
-
public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws
Exception
{
setupCluster(false);