Author: clebert.suconic(a)jboss.com
Date: 2011-09-01 18:27:06 -0400 (Thu, 01 Sep 2011)
New Revision: 11279
Added:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
Modified:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
back porting JBPAPP-7115
Modified:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-01
21:58:56 UTC (rev 11278)
+++
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-01
22:27:06 UTC (rev 11279)
@@ -143,6 +143,14 @@
*/
protected MessageImpl(final MessageImpl other)
{
+ this(other, other.getProperties());
+ }
+
+ /*
+ * Copy constructor
+ */
+ protected MessageImpl(final MessageImpl other, TypedProperties properties)
+ {
messageID = other.getMessageID();
userID = other.getUserID();
address = other.getAddress();
@@ -151,7 +159,7 @@
expiration = other.getExpiration();
timestamp = other.getTimestamp();
priority = other.getPriority();
- properties = new TypedProperties(other.getProperties());
+ this.properties = new TypedProperties(properties);
// This MUST be synchronized using the monitor on the other message to prevent it
running concurrently
// with getEncodedBuffer(), otherwise can introduce race condition when delivering
concurrently to
Modified:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-01
21:58:56 UTC (rev 11278)
+++
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-01
22:27:06 UTC (rev 11279)
@@ -26,6 +26,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
/**
* A LargeServerMessageImpl
@@ -70,12 +71,13 @@
/**
* Copy constructor
+ * @param properties
* @param copy
* @param fileCopy
*/
- private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile
fileCopy, final long newID)
+ private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties
properties, final SequentialFile fileCopy, final long newID)
{
- super(copy);
+ super(copy, properties);
linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
@@ -281,8 +283,30 @@
this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
}
}
+
+ @Override
+ public synchronized ServerMessage copy()
+ {
+ incrementDelayDeletionCount();
+ long idToUse = messageID;
+ if (linkMessage != null)
+ {
+ idToUse = linkMessage.getMessageID();
+ }
+
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
+
+ ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+ :
(LargeServerMessageImpl)linkMessage,
+ properties,
+ newfile,
+ messageID);
+ return newMessage;
+ }
+
+
@Override
public synchronized ServerMessage copy(final long newID)
{
@@ -301,6 +325,7 @@
ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ?
this
:
(LargeServerMessageImpl)linkMessage,
+ properties,
newfile,
newID);
return newMessage;
@@ -317,7 +342,7 @@
file.copyTo(newFile);
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile,
newID);
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this,
properties, newFile, newID);
newMessage.linkMessage = null;
Modified:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-01
21:58:56 UTC (rev 11278)
+++
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-01
22:27:06 UTC (rev 11279)
@@ -326,7 +326,7 @@
// Consumer implementation ---------------------------------------
/* Hook for processing message before forwarding */
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
if (useDuplicateDetection)
{
@@ -338,10 +338,20 @@
if (transformer != null)
{
- message = transformer.transform(message);
+ final ServerMessage transformedMessage = transformer.transform(message);
+ if (transformedMessage != message)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("The transformer " + transformer + " made a copy
of the message " + message + " as transformedMessage");
+ }
+ }
+ return transformedMessage;
}
-
- return message;
+ else
+ {
+ return message;
+ }
}
/**
Modified:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-01
21:58:56 UTC (rev 11278)
+++
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-01
22:27:06 UTC (rev 11279)
@@ -113,34 +113,50 @@
}
@Override
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
// We make a copy of the message, then we strip out the unwanted routing id headers
and leave
// only
// the one pertinent for the address node - this is important since different
queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require
different headers
- message = message.copy();
+ ServerMessage messageCopy = message.copy();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Clustered bridge copied message " + message + " as
" + messageCopy + " before delivery");
+ }
// TODO - we can optimise this
- Set<SimpleString> propNames = new
HashSet<SimpleString>(message.getPropertyNames());
+ Set<SimpleString> propNames = new
HashSet<SimpleString>(messageCopy.getPropertyNames());
byte[] queueIds = message.getBytesProperty(idsHeaderName);
+
+ if (queueIds == null)
+ {
+ // Sanity check only
+ log.warn("no queue IDs defined!, originalMessage = " + message +
+ ", copiedMessage = " +
+ messageCopy +
+ ", props=" +
+ idsHeaderName);
+ throw new IllegalStateException("no queueIDs defined");
+ }
for (SimpleString propName : propNames)
{
if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
{
- message.removeProperty(propName);
+ messageCopy.removeProperty(propName);
}
}
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+ messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
- message = super.beforeForward(message);
-
- return message;
+ messageCopy = super.beforeForward(messageCopy);
+
+ return messageCopy;
}
private void setupNotificationConsumer() throws Exception
Modified:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-01
21:58:56 UTC (rev 11278)
+++
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-01
22:27:06 UTC (rev 11279)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.MemorySize;
+import org.hornetq.utils.TypedProperties;
/**
*
@@ -89,6 +90,14 @@
super(other);
}
+ /*
+ * Copy constructor
+ */
+ protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties
properties)
+ {
+ super(other, properties);
+ }
+
public boolean isServerMessage()
{
return true;
@@ -193,6 +202,7 @@
public ServerMessage copy()
{
+ // This is a simple copy, used only to avoid changing original properties
return new ServerMessageImpl(this);
}
Modified:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01
21:58:56 UTC (rev 11278)
+++
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01
22:27:06 UTC (rev 11279)
@@ -71,6 +71,16 @@
{
private static final Logger log = Logger.getLogger(ClusterTestBase.class);
+ public ClusterTestBase()
+ {
+ super();
+ }
+
+ public ClusterTestBase(String name)
+ {
+ super(name);
+ }
+
private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
TransportConstants.DEFAULT_PORT + 1,
TransportConstants.DEFAULT_PORT + 2,
@@ -88,12 +98,12 @@
{
return 1024;
}
-
+
protected boolean isLargeMessage()
{
return false;
}
-
+
@Override
protected void setUp() throws Exception
{
@@ -148,13 +158,11 @@
consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
-
-
nodeManagers = null;
super.tearDown();
- // ServerLocatorImpl.shutdown();
+ // ServerLocatorImpl.shutdown();
}
// Private
-------------------------------------------------------------------------------------------------------
@@ -199,7 +207,7 @@
{
return consumers[node].consumer;
}
-
+
protected void waitForMessages(final int node, final String address, final int count)
throws Exception
{
HornetQServer server = servers[node];
@@ -458,7 +466,7 @@
if (holder != null)
{
holder.consumer.close();
- // holder.session.close();
+ // holder.session.close();
consumers[i] = null;
}
@@ -532,7 +540,7 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
-
+
if (isLargeMessage())
{
message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
@@ -545,7 +553,6 @@
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
-
producer.send(message);
}
}
@@ -589,7 +596,7 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
-
+
if (isLargeMessage())
{
message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
@@ -774,10 +781,9 @@
for (int j = msgStart; j < msgEnd; j++)
{
-
+
ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
-
+
if (message == null)
{
ClusterTestBase.log.info("*** dumping consumers:");
@@ -789,13 +795,12 @@
if (isLargeMessage())
{
- for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+ for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
{
assertEquals(getSamplebyte(posMsg),
message.getBodyBuffer().readByte());
}
}
-
if (ack)
{
message.acknowledge();
@@ -831,7 +836,7 @@
}
}
}
-
+
protected String clusterDescription(HornetQServer server)
{
String br = "-------------------------\n";
@@ -1076,6 +1081,14 @@
{
int count =
(Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
+ if (isLargeMessage())
+ {
+ for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg),
message.getBodyBuffer().readByte());
+ }
+ }
+
// log.info("consumer " + consumerIDs[i] + " received
message " + count);
Assert.assertFalse(counts.contains(count));
@@ -1169,6 +1182,15 @@
message = consumer.consumer.receive(500);
if (message != null)
{
+
+ if (isLargeMessage())
+ {
+ for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg),
message.getBodyBuffer().readByte());
+ }
+ }
+
if (ack)
{
message.acknowledge();
@@ -1189,7 +1211,7 @@
{
res[j++] = i;
}
-
+
if (ack)
{
// just to flush acks
@@ -1255,7 +1277,6 @@
sfs[node] = sf;
}
-
protected void setupSessionFactory(final int node, final boolean netty, int
reconnectAttempts) throws Exception
{
if (sfs[node] != null)
@@ -1306,7 +1327,6 @@
serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY,
params);
}
-
locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
locators[node].setRetryInterval(100);
locators[node].setRetryIntervalMultiplier(1d);
@@ -1342,74 +1362,74 @@
final boolean fileStorage,
final boolean sharedStorage,
final boolean netty)
+ {
+ if (servers[node] != null)
{
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setSharedStore(sharedStorage);
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+
+ configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, generateParams(node,
+
netty)));
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
if (sharedStorage)
{
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
}
else
{
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ server = HornetQServers.newHornetQServer(configuration);
}
- configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
-
- configuration.getAcceptorConfigurations().clear();
-
configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true,
generateParams(node, netty)));
-
- HornetQServer server;
-
- if (fileStorage)
+ }
+ else
+ {
+ if (sharedStorage)
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration,
nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
}
else
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration,
nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
+ server = HornetQServers.newHornetQServer(configuration, false);
}
- servers[node] = server;
}
+ servers[node] = server;
+ }
-
- protected void setupBackupServer(final int node,
- final int liveNode,
- final boolean fileStorage,
- final boolean sharedStorage,
- final boolean netty)
+ protected void setupBackupServer(final int node,
+ final int liveNode,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty)
{
if (servers[node] != null)
{
@@ -1446,7 +1466,7 @@
configuration.getAcceptorConfigurations().clear();
TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true,
generateParams(node, netty));
configuration.getAcceptorConfigurations().add(acceptorConfig);
- //add backup connector
+ // add backup connector
TransportConfiguration liveConfig = createTransportConfiguration(netty, false,
generateParams(liveNode, netty));
configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
TransportConfiguration backupConfig = createTransportConfiguration(netty, false,
generateParams(node, netty));
@@ -1480,171 +1500,180 @@
}
protected void setupLiveServerWithDiscovery(final int node,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalMinFiles(2);
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
- configuration.setClustered(true);
- configuration.setBackup(false);
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ configuration.setClustered(true);
+ configuration.setBackup(false);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
- null,
- -1,
-
groupAddress,
- port,
- 1000,
-
connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+
groupAddress,
+ port,
+ 1000,
+
connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1",
+ null,
+
groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(),
dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration,
nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration,
nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
protected void setupBackupServerWithDiscovery(final int node,
- final int liveNode,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
+ final int liveNode,
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setSharedStore(sharedStorage);
- if (sharedStorage)
- {
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
- configuration.setJournalDirectory(getJournalDir(liveNode, false));
- configuration.setPagingDirectory(getPageDir(liveNode, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode,
false));
- }
- else
- {
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
- }
- configuration.setClustered(true);
- configuration.setBackup(true);
+ configuration.setSecurityEnabled(false);
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+ configuration.setJournalDirectory(getJournalDir(liveNode, false));
+ configuration.setPagingDirectory(getPageDir(liveNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setBackup(true);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
- null,
- -1,
-
groupAddress,
- port,
- 1000,
-
connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+
groupAddress,
+ port,
+ 1000,
+
connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1",
+ null,
+
groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(),
dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration,
nodeManagers[liveNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration,
nodeManagers[liveNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration,
nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration,
nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
-
protected void clearServer(final int... nodes)
{
for (int i = 0; i < nodes.length; i++)
@@ -1681,12 +1710,12 @@
{
throw new IllegalStateException("No server at node " + nodeFrom);
}
-
+
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false,
generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(name,
connectorFrom);
List<String> pairs = null;
-
+
if (nodeTo != -1)
{
TransportConfiguration serverTotc = createTransportConfiguration(netty, false,
generateParams(nodeTo, netty));
@@ -1703,11 +1732,11 @@
forwardWhenNoConsumers,
maxHops,
1024,
-
pairs, allowDirectConnectionsOnly);
+
pairs,
+
allowDirectConnectionsOnly);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
-
protected void setupClusterConnection(final String name,
final String address,
final boolean forwardWhenNoConsumers,
@@ -1725,7 +1754,7 @@
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false,
generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(),
connectorFrom);
-
+
List<String> pairs = new ArrayList<String>();
for (int element : nodesTo)
{
@@ -1742,7 +1771,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
-
pairs, false);
+
pairs,
+
false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1781,7 +1811,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
-
pairs, false);
+
pairs,
+
false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1803,7 +1834,7 @@
TransportConfiguration connectorConfig = createTransportConfiguration(netty, false,
generateParams(node, netty));
server.getConfiguration().getConnectorConfigurations().put(name, connectorConfig);
-
+
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
address,
name,
@@ -1836,23 +1867,22 @@
}
for (int node : nodes)
{
- //wait for each server to start, it may be a backup and started in a separate
thread
+ // wait for each server to start, it may be a backup and started in a separate
thread
waitForServer(servers[node]);
}
}
- private void waitForServer(HornetQServer server)
- throws InterruptedException
+ private void waitForServer(HornetQServer server) throws InterruptedException
{
- long timetowait =System.currentTimeMillis() + 5000;
- while(!server.isStarted())
+ long timetowait = System.currentTimeMillis() + 5000;
+ while (!server.isStarted())
{
Thread.sleep(100);
- if(server.isStarted())
+ if (server.isStarted())
{
break;
}
- else if(System.currentTimeMillis() > timetowait)
+ else if (System.currentTimeMillis() > timetowait)
{
fail("server didnt start");
}
Added:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
===================================================================
---
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
(rev 0)
+++
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java 2011-09-01
22:27:06 UTC (rev 11279)
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+
+/**
+ * A LargeMessageRedistributionTest
+ *
+ * @author clebert
+ *
+ *
+ */
+public class LargeMessageRedistributionTest extends MessageRedistributionTest
+{
+
+ public LargeMessageRedistributionTest()
+ {
+ super();
+ }
+
+ public LargeMessageRedistributionTest(String name)
+ {
+ super(name);
+ }
+
+ protected boolean isLargeMessage()
+ {
+ return true;
+ }
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
---
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-01
21:58:56 UTC (rev 11278)
+++
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-01
22:27:06 UTC (rev 11279)
@@ -25,9 +25,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -44,6 +41,19 @@
{
private static final Logger log = Logger.getLogger(MessageRedistributionTest.class);
+ public MessageRedistributionTest()
+ {
+ super();
+ }
+
+ /**
+ * @param name
+ */
+ public MessageRedistributionTest(String name)
+ {
+ super(name);
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -118,96 +128,6 @@
MessageRedistributionTest.log.info("Test done");
}
- //
https://issues.jboss.org/browse/HORNETQ-654
- public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
- {
- setupCluster(false);
-
- MessageRedistributionTest.log.info("Doing test");
-
- startServers(0, 1, 2);
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- createQueue(0, "queues.testaddress", "queue0", null, true);
- createQueue(1, "queues.testaddress", "queue0", null, true);
- createQueue(2, "queues.testaddress", "queue0", null, true);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 2, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);
-
- send(0, "queues.testaddress", 20, true, null);
-
- getReceivedOrder(0, true);
- int[] ids1 = getReceivedOrder(1, false);
- getReceivedOrder(2, true);
-
- for (ClusterConnection conn :
servers[1].getClusterManager().getClusterConnections())
- {
- ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
- for (MessageFlowRecord record : impl.getRecords().values())
- {
- if (record.getBridge() != null)
- {
- System.out.println("stop record bridge");
- record.getBridge().stop();
- }
- }
- }
-
- removeConsumer(1);
-
- // Need to wait some time as we need to handle all redistributions before we stop
the servers
- Thread.sleep(5000);
-
- for (int i = 0; i <= 2; i++)
- {
- servers[i].stop();
- servers[i] = null;
- }
-
- setupServers();
-
- setupCluster(false);
-
- startServers(0, 1, 2);
-
- for (int i = 0 ; i <= 2; i++)
- {
- consumers[i] = null;
- sfs[i] = null;
- }
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
-
- verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
- MessageRedistributionTest.log.info("Test done");
- }
-
public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws
Exception
{
setupCluster(false);