[hornetq-commits] JBoss hornetq SVN: r10968 - in branches/HORNETQ-720_Replication: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Jul 12 03:40:24 EDT 2011
Author: ataylor
Date: 2011-07-12 03:40:24 -0400 (Tue, 12 Jul 2011)
New Revision: 10968
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedNettyAsynchronousFailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagingFailoverTest.java
Log:
fixed replication tests and changed reconnect attempts to forever for backup connecting to live
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-12 06:24:28 UTC (rev 10967)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-12 07:40:24 UTC (rev 10968)
@@ -41,6 +41,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -525,6 +526,8 @@
private final class SharedNothingBackupActivation implements Activation
{
+ private ServerLocatorInternal serverLocator;
+
public void run()
{
try
@@ -540,11 +543,11 @@
throw new IllegalArgumentException("Cannot have a replicated backup without configuring its live-server!");
}
final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
- final ServerLocatorInternal serverLocator =
+ serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
- // XXX Need to retry the connection a couple of times
- // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
+ serverLocator.setReconnectAttempts(-1);
+
final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
if (liveServerSessionFactory == null)
@@ -580,9 +583,16 @@
}
public void close(final boolean permanently) throws Exception
- {
+ {
+ if(serverLocator != null)
+ {
+ serverLocator.close();
+ serverLocator = null;
+ }
+
if (configuration.isBackup())
{
+
long timeout = 30000;
long start = System.currentTimeMillis();
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-07-12 06:24:28 UTC (rev 10967)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-07-12 07:40:24 UTC (rev 10968)
@@ -1376,6 +1376,53 @@
}
+ protected void setupLiveServer(final int node,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty,
+ NodeManager nodeManager)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
+
+ Configuration configuration = createBasicConfig();
+
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+
+ configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node, netty)));
+
+ HornetQServer server;
+
+ server = createInVMFailoverServer(fileStorage, configuration, nodeManager);
+
+ servers[node] = server;
+ }
+
protected void setupBackupServer(final int node,
final int liveNode,
final boolean fileStorage,
@@ -1423,11 +1470,12 @@
TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
configuration.getConnectorConfigurations().put(backupConfig.getName(), backupConfig);
+ configuration.setLiveConnectorName(liveConfig.getName());
HornetQServer server;
if (fileStorage)
{
- if (sharedStorage)
+ if (sharedStorage )
{
server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
}
@@ -1450,6 +1498,62 @@
servers[node] = server;
}
+ protected void setupBackupServer(final int node,
+ final int liveNode,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty,
+ NodeManager nodeManager)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
+
+ Configuration configuration = createBasicConfig();
+
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+ configuration.setJournalDirectory(getJournalDir(liveNode, false));
+ configuration.setPagingDirectory(getPageDir(liveNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+ configuration.setBackup(true);
+
+ configuration.getAcceptorConfigurations().clear();
+ TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));
+ configuration.getAcceptorConfigurations().add(acceptorConfig);
+ //add backup connector
+ TransportConfiguration liveConfig = createTransportConfiguration(netty, false, generateParams(liveNode, netty));
+ configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
+ TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
+ configuration.getConnectorConfigurations().put(backupConfig.getName(), backupConfig);
+
+ configuration.setLiveConnectorName(liveConfig.getName());
+ HornetQServer server;
+
+ server = createInVMFailoverServer(fileStorage, configuration, nodeManager);
+
+ servers[node] = server;
+ }
+
protected void setupLiveServerWithDiscovery(final int node,
final String groupAddress,
final int port,
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-07-12 06:24:28 UTC (rev 10967)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-07-12 07:40:24 UTC (rev 10968)
@@ -24,9 +24,11 @@
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
@@ -39,9 +41,10 @@
public void testGroupingLocalHandlerFails() throws Exception
{
- setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty());
+ NodeManager nodeManager = new InVMNodeManager();
+ setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty(), nodeManager);
- setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty());
+ setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty(), nodeManager);
setupLiveServer(1, isFileStorage(), isSharedServer(), isNetty());
@@ -117,10 +120,12 @@
public void testGroupingLocalHandlerFailsMultipleGroups() throws Exception
{
- setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty());
+ NodeManager nodeManager = new InVMNodeManager();
- setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty());
+ setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty(), nodeManager);
+ setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty(), nodeManager);
+
setupLiveServer(1, isFileStorage(), isSharedServer(), isNetty());
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2011-07-12 06:24:28 UTC (rev 10967)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2011-07-12 07:40:24 UTC (rev 10968)
@@ -41,45 +41,11 @@
// Protected -----------------------------------------------------
- @Override
- protected TestableServer createLiveServer()
- {
- return new SameProcessHornetQServer(createServer(true, liveConfig));
- }
@Override
- protected TestableServer createBackupServer()
- {
- return new SameProcessHornetQServer(createServer(true, backupConfig));
- }
-
- @Override
protected void createConfigs() throws Exception
{
- Configuration config1 = super.createDefaultConfig();
- config1.setBindingsDirectory(config1.getBindingsDirectory() + "_backup");
- config1.setJournalDirectory(config1.getJournalDirectory() + "_backup");
- config1.getAcceptorConfigurations().clear();
- config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
- config1.setSecurityEnabled(false);
- config1.setSharedStore(false);
- config1.setBackup(true);
- backupConfig = config1;
- backupServer = createBackupServer();
-
- Configuration config0 = super.createDefaultConfig();
- config0.getAcceptorConfigurations().clear();
- config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
-
- /*liveConfig.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
- liveConfig.setBackupConnectorName("toBackup");*/
- config0.setSecurityEnabled(false);
- config0.setSharedStore(false);
- liveConfig = config0;
- liveServer = createLiveServer();
-
- backupServer.start();
- liveServer.start();
+ createReplicatedConfigs();
}
// Private -------------------------------------------------------
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedNettyAsynchronousFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedNettyAsynchronousFailoverTest.java 2011-07-12 06:24:28 UTC (rev 10967)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedNettyAsynchronousFailoverTest.java 2011-07-12 07:40:24 UTC (rev 10968)
@@ -41,43 +41,11 @@
// Protected -----------------------------------------------------
- @Override
- protected TestableServer createLiveServer()
- {
- return new SameProcessHornetQServer(createServer(true, liveConfig));
- }
@Override
- protected TestableServer createBackupServer()
- {
- return new SameProcessHornetQServer(createServer(true, backupConfig));
- }
-
- @Override
protected void createConfigs() throws Exception
{
- backupConfig = super.createDefaultConfig();
- backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + "_backup");
- backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() + "_backup");
- backupConfig.getAcceptorConfigurations().clear();
- backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
- backupConfig.setSecurityEnabled(false);
- backupConfig.setSharedStore(false);
- backupConfig.setBackup(true);
- backupServer = createBackupServer();
-
- liveConfig = super.createDefaultConfig();
- liveConfig.getAcceptorConfigurations().clear();
- liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
-
- //liveConfig.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
- //liveConfig.setBackupConnectorName("toBackup");
- liveConfig.setSecurityEnabled(false);
- liveConfig.setSharedStore(false);
- liveServer = createLiveServer();
-
- backupServer.start();
- liveServer.start();
+ createReplicatedConfigs();
}
// Private -------------------------------------------------------
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagingFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagingFailoverTest.java 2011-07-12 06:24:28 UTC (rev 10967)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagingFailoverTest.java 2011-07-12 07:40:24 UTC (rev 10968)
@@ -41,46 +41,11 @@
// Protected -----------------------------------------------------
- @Override
- protected TestableServer createBackupServer()
- {
- return new SameProcessHornetQServer(createServer(true, backupConfig));
- }
-
@Override
- protected TestableServer createLiveServer()
- {
- return new SameProcessHornetQServer(createServer(true, liveConfig));
-
- }
-
- @Override
protected void createConfigs() throws Exception
{
- backupConfig = super.createDefaultConfig();
- backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + "_backup");
- backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() + "_backup");
- backupConfig.setPagingDirectory(backupConfig.getPagingDirectory() + "_backup");
- backupConfig.getAcceptorConfigurations().clear();
- backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
- backupConfig.setSecurityEnabled(false);
- backupConfig.setSharedStore(false);
- backupConfig.setBackup(true);
- backupServer = createBackupServer();
-
- liveConfig = super.createDefaultConfig();
- liveConfig.getAcceptorConfigurations().clear();
- liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
-
- /*liveConfig.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
- liveConfig.setBackupConnectorName("toBackup");*/
- liveConfig.setSecurityEnabled(false);
- liveConfig.setSharedStore(false);
- liveServer = createLiveServer();
-
- backupServer.start();
- liveServer.start();
+ createReplicatedConfigs();
}
// Private -------------------------------------------------------
More information about the hornetq-commits
mailing list