Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 15:40:01 -0400 (Wed, 21 Sep 2011)
New Revision: 11390
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
JBPAPP-7242 - back porting work from JBPAPP-7115 and JBPAPP-7116
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -409,7 +409,8 @@
lastChunk = pos >= bodySize;
- final SessionSendContinuationMessage chunk = new
SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
+ final SessionSendContinuationMessage chunk = new
SessionSendContinuationMessage(msgI,
+
bodyBuffer.toByteBuffer()
.array(),
!lastChunk,
lastChunk && sendBlocking);
@@ -529,11 +530,11 @@
buff = buff2;
- chunk = new SessionSendContinuationMessage(buff, false, sendBlocking,
messageSize.get());
+ chunk = new SessionSendContinuationMessage(msgI, buff, false, sendBlocking,
messageSize.get());
}
else
{
- chunk = new SessionSendContinuationMessage(buff, true, false);
+ chunk = new SessionSendContinuationMessage(msgI, buff, true, false);
}
if (sendBlocking && lastPacket)
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -63,6 +63,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@@ -108,8 +109,6 @@
private static final Logger log = Logger.getLogger(ClientSessionImpl.class);
- private final boolean trace = ClientSessionImpl.log.isTraceEnabled();
-
// Attributes
----------------------------------------------------------------------------
private Map<String, String> metadata = new HashMap<String, String>();
@@ -1198,6 +1197,15 @@
sendAckHandler.sendAcknowledged(ssm.getMessage());
}
+ else
+ if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
+ {
+ SessionSendContinuationMessage scm = (SessionSendContinuationMessage)packet;
+ if (!scm.isContinues())
+ {
+ sendAckHandler.sendAcknowledged(scm.getMessage());
+ }
+ }
}
// XAResource implementation
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -143,6 +143,14 @@
*/
protected MessageImpl(final MessageImpl other)
{
+ this(other, other.getProperties());
+ }
+
+ /*
+ * Copy constructor
+ */
+ protected MessageImpl(final MessageImpl other, TypedProperties properties)
+ {
messageID = other.getMessageID();
userID = other.getUserID();
address = other.getAddress();
@@ -151,7 +159,7 @@
expiration = other.getExpiration();
timestamp = other.getTimestamp();
priority = other.getPriority();
- properties = new TypedProperties(other.getProperties());
+ this.properties = new TypedProperties(properties);
// This MUST be synchronized using the monitor on the other message to prevent it
running concurrently
// with getEncodedBuffer(), otherwise can introduce race condition when delivering
concurrently to
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -2344,9 +2344,9 @@
}
- private static class QueueEncoding implements EncodingSupport
+ public static class QueueEncoding implements EncodingSupport
{
- long queueID;
+ public long queueID;
public QueueEncoding(final long queueID)
{
@@ -2398,7 +2398,7 @@
}
}
- private static class RefEncoding extends QueueEncoding
+ public static class RefEncoding extends QueueEncoding
{
public RefEncoding()
{
@@ -2848,7 +2848,7 @@
// Encoding functions for binding Journal
- private static Object newObjectEncoding(RecordInfo info)
+ public static Object newObjectEncoding(RecordInfo info)
{
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(info.data);
long id = info.id;
@@ -2990,9 +2990,9 @@
}
}
- private static class ReferenceDescribe
+ public static class ReferenceDescribe
{
- RefEncoding refEncoding;
+ public RefEncoding refEncoding;
public ReferenceDescribe(RefEncoding refEncoding)
{
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -26,6 +26,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
/**
* A LargeServerMessageImpl
@@ -70,12 +71,13 @@
/**
* Copy constructor
+ * @param properties
* @param copy
* @param fileCopy
*/
- private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile
fileCopy, final long newID)
+ private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties
properties, final SequentialFile fileCopy, final long newID)
{
- super(copy);
+ super(copy, properties);
linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
@@ -281,8 +283,28 @@
this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
}
}
+
+ @Override
+ public synchronized ServerMessage copy()
+ {
+ long idToUse = messageID;
+ if (linkMessage != null)
+ {
+ idToUse = linkMessage.getMessageID();
+ }
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
+
+ ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+ :
(LargeServerMessageImpl)linkMessage,
+ properties,
+ newfile,
+ messageID);
+ return newMessage;
+ }
+
+
@Override
public synchronized ServerMessage copy(final long newID)
{
@@ -301,6 +323,7 @@
ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ?
this
:
(LargeServerMessageImpl)linkMessage,
+ properties,
newfile,
newID);
return newMessage;
@@ -317,7 +340,7 @@
file.copyTo(newFile);
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile,
newID);
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this,
properties, newFile, newID);
newMessage.linkMessage = null;
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -18,6 +18,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -812,18 +813,32 @@
*/
protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
{
+ LinkedList<SimpleString> valuesToRemove = null;
+
+
for (SimpleString name : message.getPropertyNames())
{
// We use properties to establish routing context on clustering.
// However if the client resends the message after receiving, it needs to be
removed
if (name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) &&
!name.equals(MessageImpl.HDR_ROUTE_TO_IDS))
{
- message.removeProperty(name);
+ if (valuesToRemove == null)
+ {
+ valuesToRemove = new LinkedList<SimpleString>();
+ }
+ valuesToRemove.add(name);
}
}
+
+ if (valuesToRemove != null)
+ {
+ for (SimpleString removal : valuesToRemove)
+ {
+ message.removeProperty(removal);
+ }
+ }
}
-
private void setPagingStore(final ServerMessage message) throws Exception
{
PagingStore store = pagingManager.getPageStore(message.getAddress());
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -14,6 +14,7 @@
package org.hornetq.core.protocol.core.impl.wireformat;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
@@ -34,6 +35,9 @@
private boolean requiresResponse;
+ // Used on confirmation handling
+ private MessageInternal message;
+
/**
* to be sent on the last package
*/
@@ -53,10 +57,11 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues,
final boolean requiresResponse)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[]
body, final boolean continues, final boolean requiresResponse)
{
super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
this.requiresResponse = requiresResponse;
+ this.message = message;
}
/**
@@ -64,9 +69,9 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues,
final boolean requiresResponse, final long messageBodySize)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[]
body, final boolean continues, final boolean requiresResponse, final long
messageBodySize)
{
- this(body, continues, requiresResponse);
+ this(message, body, continues, requiresResponse);
this.messageBodySize = messageBodySize;
}
@@ -84,7 +89,16 @@
{
return messageBodySize;
}
+
+ /**
+ * @return the message
+ */
+ public MessageInternal getMessage()
+ {
+ return message;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -330,7 +330,7 @@
// Consumer implementation ---------------------------------------
/* Hook for processing message before forwarding */
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
if (useDuplicateDetection)
{
@@ -342,10 +342,20 @@
if (transformer != null)
{
- message = transformer.transform(message);
+ final ServerMessage transformedMessage = transformer.transform(message);
+ if (transformedMessage != message)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("The transformer " + transformer + " made a copy
of the message " + message + " as transformedMessage");
+ }
+ }
+ return transformedMessage;
}
-
- return message;
+ else
+ {
+ return message;
+ }
}
/**
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -113,34 +113,50 @@
}
@Override
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
// We make a copy of the message, then we strip out the unwanted routing id headers
and leave
// only
// the one pertinent for the address node - this is important since different
queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require
different headers
- message = message.copy();
+ ServerMessage messageCopy = message.copy();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Clustered bridge copied message " + message + " as
" + messageCopy + " before delivery");
+ }
// TODO - we can optimise this
- Set<SimpleString> propNames = new
HashSet<SimpleString>(message.getPropertyNames());
+ Set<SimpleString> propNames = new
HashSet<SimpleString>(messageCopy.getPropertyNames());
byte[] queueIds = message.getBytesProperty(idsHeaderName);
+
+ if (queueIds == null)
+ {
+ // Sanity check only
+ log.warn("no queue IDs defined!, originalMessage = " + message +
+ ", copiedMessage = " +
+ messageCopy +
+ ", props=" +
+ idsHeaderName);
+ throw new IllegalStateException("no queueIDs defined");
+ }
for (SimpleString propName : propNames)
{
if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
{
- message.removeProperty(propName);
+ messageCopy.removeProperty(propName);
}
}
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+ messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
- message = super.beforeForward(message);
-
- return message;
+ messageCopy = super.beforeForward(messageCopy);
+
+ return messageCopy;
}
private void setupNotificationConsumer() throws Exception
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.MemorySize;
+import org.hornetq.utils.TypedProperties;
/**
*
@@ -89,6 +90,14 @@
super(other);
}
+ /*
+ * Copy constructor
+ */
+ protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties
properties)
+ {
+ super(other, properties);
+ }
+
public boolean isServerMessage()
{
return true;
@@ -193,6 +202,7 @@
public ServerMessage copy()
{
+ // This is a simple copy, used only to avoid changing original properties
return new ServerMessageImpl(this);
}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -23,6 +23,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -195,11 +196,13 @@
{
message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 *
1024));
}
+ else
+ {
+ message.getBodyBuffer().writeBytes(bytes);
+ }
message.putIntProperty(propKey, i);
- message.getBodyBuffer().writeBytes(bytes);
-
producer0.send(message);
}
@@ -255,6 +258,173 @@
}
+ public void testBridgeWithLargeMessage() throws Exception
+ {
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+
+ final int PAGE_MAX = 1024 * 1024 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+ ServerLocator locator = null;
+ try
+ {
+
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE,
PAGE_MAX, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String,
TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(),
server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+
+ BridgeConfiguration bridgeConfiguration = new
BridgeConfiguration("bridge1",
+ queueName0,
+
forwardAddress,
+ null,
+ null,
+ 1000,
+ 1d,
+ -1,
+ false,
+ 0,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
staticConnectors,
+ false,
+
ConfigurationImpl.DEFAULT_CLUSTER_USER,
+
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress,
queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress,
queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+ server0.start();
+
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator.setMinLargeMessageSize(1024);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new
SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final int numMessages = 50;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final int LARGE_MESSAGE_SIZE = 10 * 1024;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+ message.setBodyInputStream(createFakeLargeStream(LARGE_MESSAGE_SIZE));
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ session0.commit();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ HornetQBuffer buff = message.getBodyBuffer();
+
+ for (int posMsg = 0 ; posMsg < LARGE_MESSAGE_SIZE; posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), buff.readByte());
+ }
+
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+
+ }
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ Map<Long, AtomicInteger> maps = loadQueues(server0);
+
+ for (Map.Entry<Long, AtomicInteger> value : maps.entrySet())
+ {
+ System.out.println("queue " + value.getKey() + "=" +
value.getValue());
+
+ assertEquals(0, value.getValue().intValue());
+ }
+ }
+
+
/**
* @param server1Params
*/
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -38,7 +38,6 @@
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -71,6 +70,16 @@
{
private static final Logger log = Logger.getLogger(ClusterTestBase.class);
+ public ClusterTestBase()
+ {
+ super();
+ }
+
+ public ClusterTestBase(String name)
+ {
+ super(name);
+ }
+
private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
TransportConstants.DEFAULT_PORT + 1,
TransportConstants.DEFAULT_PORT + 2,
@@ -84,6 +93,16 @@
private static final long WAIT_TIMEOUT = 5000;
+ protected int getLargeMessageSize()
+ {
+ return 1024;
+ }
+
+ protected boolean isLargeMessage()
+ {
+ return false;
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -138,13 +157,11 @@
consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
-
-
nodeManagers = null;
super.tearDown();
- // ServerLocatorImpl.shutdown();
+ // ServerLocatorImpl.shutdown();
}
// Private
-------------------------------------------------------------------------------------------------------
@@ -189,7 +206,7 @@
{
return consumers[node].consumer;
}
-
+
protected void waitForMessages(final int node, final String address, final int count)
throws Exception
{
HornetQServer server = servers[node];
@@ -448,7 +465,7 @@
if (holder != null)
{
holder.consumer.close();
- // holder.session.close();
+ // holder.session.close();
consumers[i] = null;
}
@@ -523,6 +540,11 @@
{
ClientMessage message = session.createMessage(durable);
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
+
if (filterVal != null)
{
message.putStringProperty(ClusterTestBase.FILTER_PROP, new
SimpleString(filterVal));
@@ -531,13 +553,13 @@
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
producer.send(message);
-
+
if (i % 100 == 0)
{
session.commit();
}
}
-
+
session.commit();
}
finally
@@ -581,6 +603,11 @@
{
ClientMessage message = session.createMessage(durable);
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
+
message.putStringProperty(key, val);
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
producer.send(message);
@@ -672,6 +699,11 @@
Assert.assertNotNull("consumer " + consumerIDs[i] + " did
not receive message " + j, message);
}
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
if (ack)
{
message.acknowledge();
@@ -730,6 +762,11 @@
Assert.fail("consumer " + i + " did not receive all
messages");
}
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
if (ack)
{
message.acknowledge();
@@ -760,10 +797,9 @@
for (int j = msgStart; j < msgEnd; j++)
{
-
+
ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
-
+
if (message == null)
{
ClusterTestBase.log.info("*** dumping consumers:");
@@ -773,8 +809,11 @@
Assert.assertNotNull("consumer " + consumerID + " did not
receive message " + j, message);
}
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
-
if (ack)
{
message.acknowledge();
@@ -810,7 +849,7 @@
}
}
}
-
+
protected String clusterDescription(HornetQServer server)
{
String br = "-------------------------\n";
@@ -868,6 +907,11 @@
if (message != null)
{
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
ClusterTestBase.log.info("check receive Consumer " + consumerID
+
" received message " +
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
@@ -902,6 +946,10 @@
Assert.assertEquals("consumer " + consumerIDs[count] + " message
" + i,
i,
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
count++;
@@ -958,6 +1006,10 @@
ClientMessage msg = holder.consumer.receive(10000);
Assert.assertNotNull(msg);
+ if (isLargeMessage())
+ {
+ validateLargeMessage(msg);
+ }
int count = msg.getIntProperty(ClusterTestBase.COUNT_PROP);
@@ -1001,6 +1053,10 @@
ClientMessage msg = holder.consumer.consumer.receive(10000);
Assert.assertNotNull(msg);
+ if (isLargeMessage())
+ {
+ validateLargeMessage(msg);
+ }
int p = msg.getIntProperty(ClusterTestBase.COUNT_PROP);
@@ -1055,6 +1111,11 @@
{
int count =
(Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
// log.info("consumer " + consumerIDs[i] + " received
message " + count);
Assert.assertFalse(counts.contains(count));
@@ -1148,6 +1209,12 @@
message = consumer.consumer.receive(500);
if (message != null)
{
+
+ if (isLargeMessage())
+ {
+ validateLargeMessage(message);
+ }
+
if (ack)
{
message.acknowledge();
@@ -1168,7 +1235,7 @@
{
res[j++] = i;
}
-
+
if (ack)
{
// just to flush acks
@@ -1234,7 +1301,6 @@
sfs[node] = sf;
}
-
protected void setupSessionFactory(final int node, final boolean netty, int
reconnectAttempts) throws Exception
{
if (sfs[node] != null)
@@ -1285,7 +1351,6 @@
serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY,
params);
}
-
locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
locators[node].setRetryInterval(100);
locators[node].setRetryIntervalMultiplier(1d);
@@ -1321,75 +1386,74 @@
final boolean fileStorage,
final boolean sharedStorage,
final boolean netty)
+ {
+ if (servers[node] != null)
{
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setSharedStore(sharedStorage);
- configuration.setThreadPoolMaxSize(10);
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+
+ configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, generateParams(node,
+
netty)));
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
if (sharedStorage)
{
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
}
else
{
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ server = HornetQServers.newHornetQServer(configuration);
}
- configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
-
- configuration.getAcceptorConfigurations().clear();
-
configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true,
generateParams(node, netty)));
-
- HornetQServer server;
-
- if (fileStorage)
+ }
+ else
+ {
+ if (sharedStorage)
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration,
nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
}
else
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration,
nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
+ server = HornetQServers.newHornetQServer(configuration, false);
}
- servers[node] = server;
}
+ servers[node] = server;
+ }
-
- protected void setupBackupServer(final int node,
- final int liveNode,
- final boolean fileStorage,
- final boolean sharedStorage,
- final boolean netty)
+ protected void setupBackupServer(final int node,
+ final int liveNode,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty)
{
if (servers[node] != null)
{
@@ -1426,7 +1490,7 @@
configuration.getAcceptorConfigurations().clear();
TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true,
generateParams(node, netty));
configuration.getAcceptorConfigurations().add(acceptorConfig);
- //add backup connector
+ // add backup connector
TransportConfiguration liveConfig = createTransportConfiguration(netty, false,
generateParams(liveNode, netty));
configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
TransportConfiguration backupConfig = createTransportConfiguration(netty, false,
generateParams(node, netty));
@@ -1460,171 +1524,180 @@
}
protected void setupLiveServerWithDiscovery(final int node,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalMinFiles(2);
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
- configuration.setClustered(true);
- configuration.setBackup(false);
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ configuration.setClustered(true);
+ configuration.setBackup(false);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
- null,
- -1,
-
groupAddress,
- port,
- 1000,
-
connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+
groupAddress,
+ port,
+ 1000,
+
connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1",
+ null,
+
groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(),
dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration,
nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration,
nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
protected void setupBackupServerWithDiscovery(final int node,
- final int liveNode,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
+ final int liveNode,
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setSharedStore(sharedStorage);
- if (sharedStorage)
- {
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
- configuration.setJournalDirectory(getJournalDir(liveNode, false));
- configuration.setPagingDirectory(getPageDir(liveNode, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode,
false));
- }
- else
- {
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
- }
- configuration.setClustered(true);
- configuration.setBackup(true);
+ configuration.setSecurityEnabled(false);
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+ configuration.setJournalDirectory(getJournalDir(liveNode, false));
+ configuration.setPagingDirectory(getPageDir(liveNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setBackup(true);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
- null,
- -1,
-
groupAddress,
- port,
- 1000,
-
connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+
groupAddress,
+ port,
+ 1000,
+
connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1",
+ null,
+
groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(),
dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration,
nodeManagers[liveNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration,
nodeManagers[liveNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration,
nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration,
nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
-
protected void clearServer(final int... nodes)
{
for (int i = 0; i < nodes.length; i++)
@@ -1661,12 +1734,12 @@
{
throw new IllegalStateException("No server at node " + nodeFrom);
}
-
+
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false,
generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(name,
connectorFrom);
List<String> pairs = null;
-
+
if (nodeTo != -1)
{
TransportConfiguration serverTotc = createTransportConfiguration(netty, false,
generateParams(nodeTo, netty));
@@ -1683,11 +1756,11 @@
forwardWhenNoConsumers,
maxHops,
1024,
-
pairs, allowDirectConnectionsOnly);
+
pairs,
+
allowDirectConnectionsOnly);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
-
protected void setupClusterConnection(final String name,
final String address,
final boolean forwardWhenNoConsumers,
@@ -1705,7 +1778,7 @@
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false,
generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(),
connectorFrom);
-
+
List<String> pairs = new ArrayList<String>();
for (int element : nodesTo)
{
@@ -1722,7 +1795,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
-
pairs, false);
+
pairs,
+
false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1761,7 +1835,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
-
pairs, false);
+
pairs,
+
false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1783,7 +1858,7 @@
TransportConfiguration connectorConfig = createTransportConfiguration(netty, false,
generateParams(node, netty));
server.getConfiguration().getConnectorConfigurations().put(name, connectorConfig);
-
+
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
address,
name,
@@ -1816,23 +1891,22 @@
}
for (int node : nodes)
{
- //wait for each server to start, it may be a backup and started in a separate
thread
+ // wait for each server to start, it may be a backup and started in a separate
thread
waitForServer(servers[node]);
}
}
- private void waitForServer(HornetQServer server)
- throws InterruptedException
+ private void waitForServer(HornetQServer server) throws InterruptedException
{
- long timetowait =System.currentTimeMillis() + 5000;
- while(!server.isStarted())
+ long timetowait = System.currentTimeMillis() + 5000;
+ while (!server.isStarted())
{
Thread.sleep(100);
- if(server.isStarted())
+ if (server.isStarted())
{
break;
}
- else if(System.currentTimeMillis() > timetowait)
+ else if (System.currentTimeMillis() > timetowait)
{
fail("server didnt start");
}
@@ -1873,6 +1947,17 @@
}
}
+ /**
+ * @param message
+ */
+ private void validateLargeMessage(ClientMessage message)
+ {
+ for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+ }
+ }
+
protected boolean isFileStorage()
{
return true;
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -25,9 +25,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -44,6 +41,19 @@
{
private static final Logger log = Logger.getLogger(MessageRedistributionTest.class);
+ public MessageRedistributionTest()
+ {
+ super();
+ }
+
+ /**
+ * @param name
+ */
+ public MessageRedistributionTest(String name)
+ {
+ super(name);
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -118,96 +128,6 @@
MessageRedistributionTest.log.info("Test done");
}
- //
https://issues.jboss.org/browse/HORNETQ-654
- public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
- {
- setupCluster(false);
-
- MessageRedistributionTest.log.info("Doing test");
-
- startServers(0, 1, 2);
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- createQueue(0, "queues.testaddress", "queue0", null, true);
- createQueue(1, "queues.testaddress", "queue0", null, true);
- createQueue(2, "queues.testaddress", "queue0", null, true);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 2, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);
-
- send(0, "queues.testaddress", 20, true, null);
-
- getReceivedOrder(0, true);
- int[] ids1 = getReceivedOrder(1, false);
- getReceivedOrder(2, true);
-
- for (ClusterConnection conn :
servers[1].getClusterManager().getClusterConnections())
- {
- ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
- for (MessageFlowRecord record : impl.getRecords().values())
- {
- if (record.getBridge() != null)
- {
- System.out.println("stop record bridge");
- record.getBridge().stop();
- }
- }
- }
-
- removeConsumer(1);
-
- // Need to wait some time as we need to handle all redistributions before we stop
the servers
- Thread.sleep(5000);
-
- for (int i = 0; i <= 2; i++)
- {
- servers[i].stop();
- servers[i] = null;
- }
-
- setupServers();
-
- setupCluster(false);
-
- startServers(0, 1, 2);
-
- for (int i = 0 ; i <= 2; i++)
- {
- consumers[i] = null;
- sfs[i] = null;
- }
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
-
- verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
- MessageRedistributionTest.log.info("Test done");
- }
-
public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws
Exception
{
setupCluster(false);
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-21
19:07:57 UTC (rev 11389)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-21
19:40:01 UTC (rev 11390)
@@ -34,10 +34,12 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.Context;
import javax.transaction.xa.XAException;
@@ -57,9 +59,16 @@
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import
org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -1231,7 +1240,63 @@
}
return bindingsFound;
}
+ /**
+ * It will inspect the journal directly and determine if there are queues on this
journal,
+ * @return a Map containing the reference counts per queue
+ * @param serverToInvestigate
+ * @throws Exception
+ */
+ protected Map<Long, AtomicInteger> loadQueues(HornetQServer serverToInvestigate)
throws Exception
+ {
+ SequentialFileFactory messagesFF = new
NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalDirectory());
+ JournalImpl messagesJournal = new
JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(),
+
serverToInvestigate.getConfiguration().getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+ List<PreparedTransactionInfo> preparedTransactions = new
LinkedList<PreparedTransactionInfo>();
+
+ messagesJournal.start();
+ messagesJournal.load(records, preparedTransactions, null);
+
+ // These are more immutable integers
+ Map<Long, AtomicInteger> messageRefCounts = new HashMap<Long,
AtomicInteger>();
+
+
+ for (RecordInfo info : records)
+ {
+ Object o = JournalStorageManager.newObjectEncoding(info);
+ if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe)o;
+ AtomicInteger count = messageRefCounts.get(ref.refEncoding.queueID);
+ if (count == null)
+ {
+ count = new AtomicInteger(1);
+ messageRefCounts.put(ref.refEncoding.queueID, count);
+ }
+ else
+ {
+ count.incrementAndGet();
+ }
+ }
+ }
+
+
+ messagesJournal.stop();
+
+
+ return messageRefCounts;
+
+ }
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------