Author: clebert.suconic(a)jboss.com
Date: 2011-09-01 18:35:21 -0400 (Thu, 01 Sep 2011)
New Revision: 11280
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
just format (as it was format on the original branch) to make it easy to merge
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01
22:27:06 UTC (rev 11279)
+++
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01
22:35:21 UTC (rev 11280)
@@ -88,12 +88,12 @@
{
return 1024;
}
-
+
protected boolean isLargeMessage()
{
return false;
}
-
+
@Override
protected void setUp() throws Exception
{
@@ -148,13 +148,11 @@
consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
-
-
nodeManagers = null;
super.tearDown();
- // ServerLocatorImpl.shutdown();
+ // ServerLocatorImpl.shutdown();
}
// Private
-------------------------------------------------------------------------------------------------------
@@ -199,7 +197,7 @@
{
return consumers[node].consumer;
}
-
+
protected void waitForMessages(final int node, final String address, final int count)
throws Exception
{
HornetQServer server = servers[node];
@@ -458,7 +456,7 @@
if (holder != null)
{
holder.consumer.close();
- // holder.session.close();
+ // holder.session.close();
consumers[i] = null;
}
@@ -532,7 +530,7 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
-
+
if (isLargeMessage())
{
message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
@@ -545,15 +543,14 @@
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
-
producer.send(message);
-
+
if (i % 100 == 0)
{
session.commit();
}
}
-
+
session.commit();
}
finally
@@ -596,7 +593,7 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
-
+
if (isLargeMessage())
{
message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
@@ -781,10 +778,9 @@
for (int j = msgStart; j < msgEnd; j++)
{
-
+
ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
-
+
if (message == null)
{
ClusterTestBase.log.info("*** dumping consumers:");
@@ -796,13 +792,12 @@
if (isLargeMessage())
{
- for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+ for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
{
assertEquals(getSamplebyte(posMsg),
message.getBodyBuffer().readByte());
}
}
-
if (ack)
{
message.acknowledge();
@@ -838,7 +833,7 @@
}
}
}
-
+
protected String clusterDescription(HornetQServer server)
{
String br = "-------------------------\n";
@@ -1196,7 +1191,7 @@
{
res[j++] = i;
}
-
+
if (ack)
{
// just to flush acks
@@ -1262,7 +1257,6 @@
sfs[node] = sf;
}
-
protected void setupSessionFactory(final int node, final boolean netty, int
reconnectAttempts) throws Exception
{
if (sfs[node] != null)
@@ -1313,7 +1307,6 @@
serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY,
params);
}
-
locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
locators[node].setRetryInterval(100);
locators[node].setRetryIntervalMultiplier(1d);
@@ -1349,75 +1342,75 @@
final boolean fileStorage,
final boolean sharedStorage,
final boolean netty)
+ {
+ if (servers[node] != null)
{
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setSharedStore(sharedStorage);
- configuration.setThreadPoolMaxSize(10);
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ configuration.setThreadPoolMaxSize(10);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+
+ configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, generateParams(node,
+
netty)));
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
if (sharedStorage)
{
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
}
else
{
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ server = HornetQServers.newHornetQServer(configuration);
}
- configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
-
- configuration.getAcceptorConfigurations().clear();
-
configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true,
generateParams(node, netty)));
-
- HornetQServer server;
-
- if (fileStorage)
+ }
+ else
+ {
+ if (sharedStorage)
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration,
nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
}
else
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration,
nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
+ server = HornetQServers.newHornetQServer(configuration, false);
}
- servers[node] = server;
}
+ servers[node] = server;
+ }
-
- protected void setupBackupServer(final int node,
- final int liveNode,
- final boolean fileStorage,
- final boolean sharedStorage,
- final boolean netty)
+ protected void setupBackupServer(final int node,
+ final int liveNode,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty)
{
if (servers[node] != null)
{
@@ -1454,7 +1447,7 @@
configuration.getAcceptorConfigurations().clear();
TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true,
generateParams(node, netty));
configuration.getAcceptorConfigurations().add(acceptorConfig);
- //add backup connector
+ // add backup connector
TransportConfiguration liveConfig = createTransportConfiguration(netty, false,
generateParams(liveNode, netty));
configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
TransportConfiguration backupConfig = createTransportConfiguration(netty, false,
generateParams(node, netty));
@@ -1488,171 +1481,180 @@
}
protected void setupLiveServerWithDiscovery(final int node,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalMinFiles(2);
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
- configuration.setClustered(true);
- configuration.setBackup(false);
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ configuration.setClustered(true);
+ configuration.setBackup(false);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
- null,
- -1,
-
groupAddress,
- port,
- 1000,
-
connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+
groupAddress,
+ port,
+ 1000,
+
connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1",
+ null,
+
groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(),
dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration,
nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration,
nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
protected void setupBackupServerWithDiscovery(final int node,
- final int liveNode,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
+ final int liveNode,
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setSharedStore(sharedStorage);
- if (sharedStorage)
- {
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
- configuration.setJournalDirectory(getJournalDir(liveNode, false));
- configuration.setPagingDirectory(getPageDir(liveNode, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode,
false));
- }
- else
- {
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
- }
- configuration.setClustered(true);
- configuration.setBackup(true);
+ configuration.setSecurityEnabled(false);
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+ configuration.setJournalDirectory(getJournalDir(liveNode, false));
+ configuration.setPagingDirectory(getPageDir(liveNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setBackup(true);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
- null,
- -1,
-
groupAddress,
- port,
- 1000,
-
connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+
groupAddress,
+ port,
+ 1000,
+
connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1",
+ null,
+
groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(),
dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration,
nodeManagers[liveNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration,
nodeManagers[liveNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration,
nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration,
nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
-
protected void clearServer(final int... nodes)
{
for (int i = 0; i < nodes.length; i++)
@@ -1689,12 +1691,12 @@
{
throw new IllegalStateException("No server at node " + nodeFrom);
}
-
+
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false,
generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(name,
connectorFrom);
List<String> pairs = null;
-
+
if (nodeTo != -1)
{
TransportConfiguration serverTotc = createTransportConfiguration(netty, false,
generateParams(nodeTo, netty));
@@ -1711,11 +1713,11 @@
forwardWhenNoConsumers,
maxHops,
1024,
-
pairs, allowDirectConnectionsOnly);
+
pairs,
+
allowDirectConnectionsOnly);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
-
protected void setupClusterConnection(final String name,
final String address,
final boolean forwardWhenNoConsumers,
@@ -1733,7 +1735,7 @@
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false,
generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(),
connectorFrom);
-
+
List<String> pairs = new ArrayList<String>();
for (int element : nodesTo)
{
@@ -1750,7 +1752,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
-
pairs, false);
+
pairs,
+
false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1789,7 +1792,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
-
pairs, false);
+
pairs,
+
false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1811,7 +1815,7 @@
TransportConfiguration connectorConfig = createTransportConfiguration(netty, false,
generateParams(node, netty));
server.getConfiguration().getConnectorConfigurations().put(name, connectorConfig);
-
+
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
address,
name,
@@ -1844,23 +1848,22 @@
}
for (int node : nodes)
{
- //wait for each server to start, it may be a backup and started in a separate
thread
+ // wait for each server to start, it may be a backup and started in a separate
thread
waitForServer(servers[node]);
}
}
- private void waitForServer(HornetQServer server)
- throws InterruptedException
+ private void waitForServer(HornetQServer server) throws InterruptedException
{
- long timetowait =System.currentTimeMillis() + 5000;
- while(!server.isStarted())
+ long timetowait = System.currentTimeMillis() + 5000;
+ while (!server.isStarted())
{
Thread.sleep(100);
- if(server.isStarted())
+ if (server.isStarted())
{
break;
}
- else if(System.currentTimeMillis() > timetowait)
+ else if (System.currentTimeMillis() > timetowait)
{
fail("server didnt start");
}