JBoss hornetq SVN: r11663 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-04 10:38:08 -0400 (Fri, 04 Nov 2011)
New Revision: 11663
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
HORNETQ-720 Close the replicating backup if failed to connect to live.
Disable test (if sharedStore==false) that tried to
(1) stop the backup,
(2) crash the live,
(3) restart the backup.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-04 10:34:47 UTC (rev 11662)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-04 14:38:08 UTC (rev 11663)
@@ -2021,6 +2021,7 @@
private final class SharedNothingBackupActivation implements Activation
{
private ServerLocatorInternal serverLocator;
+ private volatile boolean failedConnection;
public void run()
{
@@ -2068,7 +2069,21 @@
}
catch (Exception e)
{
- log.warn("Unable to announce backup for replication.", e);
+ log.warn("Unable to announce backup for replication. Trying to stop the server.", e);
+ failedConnection = true;
+ try
+ {
+ synchronized (quorumManager)
+ {
+ quorumManager.notify();
+ }
+ HornetQServerImpl.this.stop();
+ return;
+ }
+ catch (Exception e1)
+ {
+ throw new RuntimeException(e1);
+ }
}
}
});
@@ -2081,11 +2096,12 @@
// we must remember to close stuff we don't need any more
synchronized (quorumManager)
{
- while (true)
- {
+ if (failedConnection)
+ return;
+ while (true)
+ {
quorumManager.wait();
- // nodeManager.awaitLiveNode();
- break;
+ break;
// if (!started || quorumManager.isNodeDown())
// {
// break;
@@ -2096,6 +2112,8 @@
serverLocator.close();
replicationEndpoint.stop();
+ if (failedConnection)
+ return;
if (!isRemoteBackupUpToDate())
{
/*
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-11-04 10:34:47 UTC (rev 11662)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-11-04 14:38:08 UTC (rev 11663)
@@ -1580,6 +1580,9 @@
public void testBackupServerNotRemoved() throws Exception
{
+ // HORNETQ-720 Disabling test for replicating backups.
+ if (!backupServer.getServer().getConfiguration().isSharedStore())
+ return;
locator.setFailoverOnInitialConnection(true);
createSessionFactory();
13 years, 2 months
JBoss hornetq SVN: r11662 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-04 06:34:47 -0400 (Fri, 04 Nov 2011)
New Revision: 11662
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
Log:
Fix NPE in test setUp when using RemoteProcessHornetQServer
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-11-04 10:09:55 UTC (rev 11661)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-11-04 10:34:47 UTC (rev 11662)
@@ -306,6 +306,8 @@
protected void waitForServer(HornetQServer server) throws InterruptedException
{
+ if (server == null)
+ return;
long timetowait = System.currentTimeMillis() + 5000;
while (!server.isStarted() && System.currentTimeMillis() < timetowait)
{
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2011-11-04 10:09:55 UTC (rev 11661)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2011-11-04 10:34:47 UTC (rev 11662)
@@ -63,8 +63,7 @@
}
catch (Exception e)
{
- e.printStackTrace();
- return false;
+ throw new RuntimeException(e);
}
return false;
}
@@ -118,7 +117,7 @@
{
crash(true, sessions);
}
-
+
public void crash(final boolean waitFailure, ClientSession... sessions) throws Exception
{
final CountDownLatch latch = new CountDownLatch(sessions.length);
@@ -149,7 +148,7 @@
{
// Wait to be informed of failure
boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
-
+
Assert.assertTrue(ok);
}
}
13 years, 2 months
JBoss hornetq SVN: r11661 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-04 06:09:55 -0400 (Fri, 04 Nov 2011)
New Revision: 11661
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
Log:
HORNETQ-720 Do not try to announce a remote backup (as it does not have the live's nodeID at this point).
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-11-04 10:09:32 UTC (rev 11660)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-11-04 10:09:55 UTC (rev 11661)
@@ -206,7 +206,7 @@
for (ClusterConnection conn : clusterConnections.values())
{
conn.start();
- if (backup)
+ if (backup && configuration.isSharedStore())
{
conn.informTopology();
conn.announceBackup();
@@ -378,8 +378,7 @@
if (connector == null)
{
- log.warn("No connector with name '" + config.getConnectorName() +
- "'. backup cannot be announced.");
+ log.warn("No connector with name '" + config.getConnectorName() + "'. backup cannot be announced.");
return;
}
liveChannel.send(new BackupRegistrationMessage(nodeUUID.toString(), connector));
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-11-04 10:09:32 UTC (rev 11660)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-11-04 10:09:55 UTC (rev 11661)
@@ -815,7 +815,7 @@
}
SimpleString id = (SimpleString)message.getObjectProperty(Message.HDR_GROUP_ID);
- System.out.println("received " + id + " on consumer " + consumerIDs[i]);
+
if (groupIdsReceived.get(id) == null)
{
groupIdsReceived.put(id, i);
@@ -932,9 +932,8 @@
message.getObjectProperty(ClusterTestBase.COUNT_PROP);
}
outOfOrder = true;
- System.out.println("Message j=" + j +
- " was received out of order = " +
- message.getObjectProperty(ClusterTestBase.COUNT_PROP));
+ System.out.println("Message j=" + j + " was received out of order = " +
+ message.getObjectProperty(ClusterTestBase.COUNT_PROP));
log.info("Message j=" + j +
" was received out of order = " +
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
@@ -1991,13 +1990,10 @@
}
timeStarts[node] = System.currentTimeMillis();
- servers[node].setIdentity("server " + node);
log.info("starting server " + servers[node]);
servers[node].start();
log.info("started server " + servers[node]);
-
- log.info("started server " + node);
}
for (int node : nodes)
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-11-04 10:09:32 UTC (rev 11660)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-11-04 10:09:55 UTC (rev 11661)
@@ -44,7 +44,7 @@
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
- startServers(2, 0, 1);
+ startServers(0, 1, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -109,7 +109,7 @@
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
- startServers(2, 0, 1);
+ startServers(0, 1, 2);
setupSessionFactory(0, isNetty());
@@ -146,7 +146,7 @@
closeSessionFactory(0);
Thread.sleep(1000);
-
+
servers[0].stop(true);
waitForServerRestart(2);
13 years, 2 months
JBoss hornetq SVN: r11660 - in trunk/hornetq-core/src/main/java/org/hornetq/core/server: impl and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-04 06:09:32 -0400 (Fri, 04 Nov 2011)
New Revision: 11660
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Remove unused method
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-11-04 09:13:17 UTC (rev 11659)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-11-04 10:09:32 UTC (rev 11660)
@@ -17,8 +17,6 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
-import javax.management.MBeanServer;
-
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -80,8 +78,6 @@
HornetQSecurityManager getSecurityManager();
- MBeanServer getMBeanServer();
-
Version getVersion();
NodeManager getNodeManager();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-04 09:13:17 UTC (rev 11659)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-04 10:09:32 UTC (rev 11660)
@@ -705,11 +705,6 @@
return configuration;
}
- public MBeanServer getMBeanServer()
- {
- return mbeanServer;
- }
-
public PagingManager getPagingManager()
{
return pagingManager;
13 years, 2 months
JBoss hornetq SVN: r11659 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-04 05:13:17 -0400 (Fri, 04 Nov 2011)
New Revision: 11659
Removed:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FileStorageStaticClusterWithBackupFailoverTest.java
Log:
Delete duplicate test class (it does not add anything to StaticClusterWithBackupFailoverTest)
Deleted: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FileStorageStaticClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FileStorageStaticClusterWithBackupFailoverTest.java 2011-11-04 09:13:03 UTC (rev 11658)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FileStorageStaticClusterWithBackupFailoverTest.java 2011-11-04 09:13:17 UTC (rev 11659)
@@ -1,34 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.hornetq.tests.integration.cluster.failover;
-
-/**
- * A FileStorageClusterWithBackupFailoverTest
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- *
- */
-public class FileStorageStaticClusterWithBackupFailoverTest extends StaticClusterWithBackupFailoverTest
-{
-}
13 years, 2 months
JBoss hornetq SVN: r11658 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-04 05:13:03 -0400 (Fri, 04 Nov 2011)
New Revision: 11658
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
Delete unused inner classes
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-11-04 09:12:46 UTC (rev 11657)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-11-04 09:13:03 UTC (rev 11658)
@@ -15,7 +15,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
@@ -28,7 +27,6 @@
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.DelegatingSession;
@@ -56,20 +54,6 @@
private final Object lockFail = new Object();
- class MyListener implements SessionFailureListener
- {
- CountDownLatch latch = new CountDownLatch(1);
-
- public void connectionFailed(final HornetQException me, boolean failedOver)
- {
- latch.countDown();
- }
-
- public void beforeReconnect(final HornetQException me)
- {
- }
- }
-
public void testNonTransactional() throws Throwable
{
runTest(new TestRunner()
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-11-04 09:12:46 UTC (rev 11657)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-11-04 09:13:03 UTC (rev 11658)
@@ -25,7 +25,6 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -34,7 +33,6 @@
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.Configuration;
@@ -379,13 +377,6 @@
// Inner classes -------------------------------------------------
- abstract class BaseListener implements SessionFailureListener
- {
- public void beforeReconnect(final HornetQException me)
- {
- }
- }
-
class LatchClusterTopologyListener implements ClusterTopologyListener
{
final CountDownLatch latch;
13 years, 2 months
JBoss hornetq SVN: r11657 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-04 05:12:46 -0400 (Fri, 04 Nov 2011)
New Revision: 11657
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
Server identity is being set at setUp()
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-11-04 09:12:29 UTC (rev 11656)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-11-04 09:12:46 UTC (rev 11657)
@@ -112,8 +112,6 @@
clearData();
createConfigs();
-
-
liveServer.setIdentity(this.getClass().getSimpleName() + "/liveServer");
liveServer.start();
@@ -193,7 +191,6 @@
ReplicatedBackupUtils.createClusterConnectionConf(backupConfig, backupConnector.getName(),
liveConnector.getName());
backupServer = createBackupServer();
- backupServer.getServer().setIdentity("bkpIdentityServer");
liveConfig = super.createDefaultConfig();
liveConfig.getAcceptorConfigurations().clear();
@@ -226,14 +223,11 @@
backupConfig.setSecurityEnabled(false);
backupServer = createBackupServer();
- backupServer.getServer().setIdentity("idBackup");
-
liveConfig.getAcceptorConfigurations().clear();
liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
liveServer = createLiveServer();
- liveServer.getServer().setIdentity("idLive");
}
@Override
@@ -320,16 +314,16 @@
{
fail("backup server never started (" + backupServer.isStarted() + "), or never finished synchronizing (" +
actualServer.isRemoteBackupUpToDate() + ")");
- }
+ }
try
{
Thread.sleep(100);
}
catch (InterruptedException e)
{
- //ignore
+ fail(e.getMessage());
}
- }
+ }
}
protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live)
@@ -338,15 +332,13 @@
{
return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY);
}
- else
- {
- Map<String, Object> server1Params = new HashMap<String, Object>();
- server1Params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
- org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+ Map<String, Object> server1Params = new HashMap<String, Object>();
- return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
- }
+ server1Params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
+ org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+
+ return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
}
protected TransportConfiguration getNettyConnectorTransportConfiguration(final boolean live)
@@ -355,15 +347,12 @@
{
return new TransportConfiguration(NETTY_CONNECTOR_FACTORY);
}
- else
- {
- Map<String, Object> server1Params = new HashMap<String, Object>();
- server1Params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
- org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+ Map<String, Object> server1Params = new HashMap<String, Object>();
- return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
- }
+ server1Params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
+ org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+ return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
}
protected abstract TransportConfiguration getAcceptorTransportConfiguration(boolean live);
13 years, 2 months
JBoss hornetq SVN: r11656 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-04 05:12:29 -0400 (Fri, 04 Nov 2011)
New Revision: 11656
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
Log:
clean up
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-03 13:42:42 UTC (rev 11655)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-04 09:12:29 UTC (rev 11656)
@@ -1783,9 +1783,9 @@
// Inner classes
// --------------------------------------------------------------------------------
- class FailbackChecker implements Runnable
+ private class FailbackChecker implements Runnable
{
- boolean restarting = false;
+ private boolean restarting = false;
public void run()
{
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-11-03 13:42:42 UTC (rev 11655)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-11-04 09:12:29 UTC (rev 11656)
@@ -79,6 +79,7 @@
public class PagingTest extends ServiceTestBase
{
private ServerLocator locator;
+ static final int MESSAGE_SIZE = 1024; // 1k
public PagingTest(final String name)
{
@@ -119,8 +120,7 @@
@Override
protected void tearDown() throws Exception
{
- locator.close();
-
+ closeServerLocator(locator);
super.tearDown();
}
@@ -140,8 +140,6 @@
server.start();
- final int messageSize = 1024;
-
final int numberOfMessages = 5000;
final int numberOfTX = 10;
@@ -166,11 +164,11 @@
ClientMessage message = null;
- byte[] body = new byte[messageSize];
+ byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
- for (int j = 1; j <= messageSize; j++)
+ for (int j = 1; j <= MESSAGE_SIZE; j++)
{
bb.put(getSamplebyte(j));
}
@@ -373,8 +371,8 @@
server.start();
- final int messageSize = 1024;
+
final int numberOfMessages = 1000;
try
@@ -395,11 +393,11 @@
ClientMessage message = null;
- byte[] body = new byte[messageSize];
+ byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
- for (int j = 1; j <= messageSize; j++)
+ for (int j = 1; j <= MESSAGE_SIZE; j++)
{
bb.put(getSamplebyte(j));
}
@@ -520,8 +518,8 @@
server.start();
- final int messageSize = 1024;
+
final int numberOfMessages = 1000;
try
@@ -542,11 +540,11 @@
ClientMessage message = null;
- byte[] body = new byte[messageSize];
+ byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
- for (int j = 1; j <= messageSize; j++)
+ for (int j = 1; j <= MESSAGE_SIZE; j++)
{
bb.put(getSamplebyte(j));
}
@@ -591,16 +589,12 @@
assertEquals(numberOfMessages, queue.getMessageCount());
- LinkedList<Xid> xids = new LinkedList<Xid>();
-
- int msgReceived = 0;
ClientSession sessionConsumer = sf.createSession(false, false, false);
sessionConsumer.start();
ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
{
log.info("Received " + msgCount);
- msgReceived++;
ClientMessage msg = consumer.receiveImmediate();
if (msg == null)
{
@@ -697,16 +691,12 @@
// assertEquals(numberOfMessages, queue.getMessageCount());
- xids = new LinkedList<Xid>();
-
- msgReceived = 0;
sessionConsumer = sf.createSession(false, false, false);
sessionConsumer.start();
consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
{
log.info("Received " + msgCount);
- msgReceived++;
ClientMessage msg = consumer.receiveImmediate();
if (msg == null)
{
@@ -758,8 +748,8 @@
server.start();
- final int messageSize = 1024;
+
final int numberOfMessages = 5000;
final int numberOfTX = 10;
@@ -786,11 +776,11 @@
ClientMessage message = null;
- byte[] body = new byte[messageSize];
+ byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
- for (int j = 1; j <= messageSize; j++)
+ for (int j = 1; j <= MESSAGE_SIZE; j++)
{
bb.put(getSamplebyte(j));
}
@@ -940,8 +930,8 @@
server.start();
- final int messageSize = 1024;
+
final int numberOfMessages = 6;
final int numberOfTX = 2;
@@ -970,11 +960,11 @@
ClientMessage message = null;
- byte[] body = new byte[messageSize];
+ byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
- for (int j = 1; j <= messageSize; j++)
+ for (int j = 1; j <= MESSAGE_SIZE; j++)
{
bb.put(getSamplebyte(j));
}
@@ -1104,8 +1094,8 @@
server.start();
- final int messageSize = 1024;
+
final int numberOfMessages = 1000;
try
@@ -1130,7 +1120,7 @@
ClientMessage message = null;
- byte[] body = new byte[messageSize];
+ byte[] body = new byte[MESSAGE_SIZE];
for (int i = 0; i < numberOfMessages; i++)
{
@@ -1264,15 +1254,15 @@
server.start();
- final int messageSize = 1024;
+
final int numberOfMessages = 3000;
- final byte[] body = new byte[messageSize];
+ final byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
- for (int j = 1; j <= messageSize; j++)
+ for (int j = 1; j <= MESSAGE_SIZE; j++)
{
bb.put(getSamplebyte(j));
}
@@ -1540,15 +1530,15 @@
server.start();
- final int messageSize = 1024;
+
final int numberOfMessages = 3000;
- final byte[] body = new byte[messageSize];
+ final byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
- for (int j = 1; j <= messageSize; j++)
+ for (int j = 1; j <= MESSAGE_SIZE; j++)
{
bb.put(getSamplebyte(j));
}
@@ -1873,8 +1863,6 @@
server.start();
- final int messageSize = 1024; // 1k
-
try
{
ServerLocator locator = createInVMNonHALocator();
@@ -1890,7 +1878,7 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
- byte[] body = new byte[messageSize];
+ byte[] body = new byte[MESSAGE_SIZE];
// HornetQBuffer bodyLocal = HornetQChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
ClientMessage message = null;
@@ -2017,8 +2005,8 @@
server.start();
- final int messageSize = 1024; // 1k
+
try
{
ServerLocator locator = createInVMNonHALocator();
@@ -2028,7 +2016,7 @@
ClientSessionFactory sf = locator.createSessionFactory();
- byte[] body = new byte[messageSize];
+ byte[] body = new byte[MESSAGE_SIZE];
ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
ClientProducer producerTransacted = sessionTransacted.createProducer(PagingTest.ADDRESS);
@@ -2167,8 +2155,8 @@
server.start();
- final int messageSize = 1024; // 1k
+
try
{
ServerLocator locator = createInVMNonHALocator();
@@ -2178,7 +2166,7 @@
ClientSessionFactory sf = locator.createSessionFactory();
- byte[] body = new byte[messageSize];
+ byte[] body = new byte[MESSAGE_SIZE];
ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
ClientProducer producerTransacted = sessionTransacted.createProducer(PagingTest.ADDRESS);
@@ -2302,7 +2290,7 @@
final AtomicInteger errors = new AtomicInteger(0);
- final int messageSize = 1024; // 1k
+
final int numberOfMessages = 10000;
ServerLocator locator = createInVMNonHALocator();
@@ -2316,7 +2304,7 @@
final ClientSessionFactory sf = locator.createSessionFactory();
- final byte[] body = new byte[messageSize];
+ final byte[] body = new byte[MESSAGE_SIZE];
Thread producerThread = new Thread()
{
@@ -2433,7 +2421,7 @@
final AtomicInteger errors = new AtomicInteger(0);
- final int messageSize = 1024;
+
final int numberOfMessages = 2000;
try
@@ -2445,7 +2433,7 @@
final CountDownLatch ready = new CountDownLatch(1);
- final byte[] body = new byte[messageSize];
+ final byte[] body = new byte[MESSAGE_SIZE];
Thread producerThread = new Thread()
{
@@ -3945,8 +3933,8 @@
server.start();
- final int messageSize = 1024;
+
final int numberOfMessages = 200;
try
@@ -3980,7 +3968,7 @@
ClientMessage message = null;
- byte[] body = new byte[messageSize];
+ byte[] body = new byte[MESSAGE_SIZE];
for (int i = 0; i < numberOfMessages; i++)
{
@@ -4070,8 +4058,8 @@
server.start();
- final int messageSize = 1024;
+
final int numberOfMessages = 1000;
try
@@ -4097,7 +4085,7 @@
ClientMessage message = null;
- byte[] body = new byte[messageSize];
+ byte[] body = new byte[MESSAGE_SIZE];
for (int i = 0; i < numberOfMessages; i++)
{
@@ -4193,8 +4181,8 @@
server.start();
- final int messageSize = 1024;
+
ServerLocator locator = null;
ClientSessionFactory sf = null;
ClientSession session = null;
@@ -4227,7 +4215,7 @@
message.putStringProperty("id", "str" + i);
- message.setBodyInputStream(createFakeLargeStream(messageSize));
+ message.setBodyInputStream(createFakeLargeStream(MESSAGE_SIZE));
producer.send(message);
@@ -4255,7 +4243,7 @@
assertEquals("str" + msgNr, msg.getStringProperty("id"));
- for (int j = 0; j < messageSize; j++)
+ for (int j = 0; j < MESSAGE_SIZE; j++)
{
assertEquals(getSamplebyte(j), msg.getBodyBuffer().readByte());
}
@@ -4370,7 +4358,7 @@
assertEquals("str" + msgNr, msg.getStringProperty("id"));
- for (int i = 0; i < messageSize; i++)
+ for (int i = 0; i < MESSAGE_SIZE; i++)
{
assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
}
13 years, 2 months
JBoss hornetq SVN: r11655 - in trunk/hornetq-core/src/main/java/org/hornetq/core: replication/impl and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-03 09:42:42 -0400 (Thu, 03 Nov 2011)
New Revision: 11655
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
Log:
HORNETQ-720 Use ClusterTopologyListener to monitor the live's status in the remote replication case
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2011-11-03 13:42:27 UTC (rev 11654)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2011-11-03 13:42:42 UTC (rev 11655)
@@ -19,6 +19,7 @@
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.impl.QuorumManager;
/**
* A ReplicationEndpoint
@@ -38,4 +39,11 @@
void registerJournal(final byte id, final Journal journal);
+ /**
+ * Sets the quorumManager used by the server in the replicationEndpoint. It is used to inform the
+ * backup server of the live's nodeID.
+ * @param quorumManager
+ */
+ void setQuorumManager(QuorumManager quorumManager);
+
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-03 13:42:27 UTC (rev 11654)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-03 13:42:42 UTC (rev 11655)
@@ -68,6 +68,7 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.QuorumManager;
/**
*
@@ -119,6 +120,8 @@
private boolean deletePages = true;
private boolean started;
+ private QuorumManager quorumManager;
+
// Constructors --------------------------------------------------
public ReplicationEndpointImpl(final HornetQServerImpl server, IOCriticalErrorListener criticalErrorListener)
{
@@ -416,7 +419,7 @@
// Private -------------------------------------------------------
- private void finishSynchronization(String nodeID) throws Exception
+ private void finishSynchronization(String liveID) throws Exception
{
for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
@@ -477,7 +480,8 @@
}
}
journalsHolder = null;
- server.setRemoteBackupUpToDate(nodeID);
+ quorumManager.setLiveID(liveID);
+ server.setRemoteBackupUpToDate(liveID);
log.info("Backup server " + server + " is synchronized with live-server.");
return;
}
@@ -903,4 +907,10 @@
return "JournalSyncFile(file=" + file.getAbsolutePath() + ")";
}
}
+
+ @Override
+ public void setQuorumManager(QuorumManager quorumManager)
+ {
+ this.quorumManager = quorumManager;
+ }
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-03 13:42:27 UTC (rev 11654)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-03 13:42:42 UTC (rev 11655)
@@ -1959,11 +1959,8 @@
while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
{
nodeManager.interrupt();
-
backupActivationThread.interrupt();
-
backupActivationThread.join(1000);
-
}
if (System.currentTimeMillis() - start >= timeout)
@@ -2049,6 +2046,7 @@
final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
final QuorumManager quorumManager = new QuorumManager(serverLocator);
+ replicationEndpoint.setQuorumManager(quorumManager);
serverLocator.setReconnectAttempts(-1);
@@ -2086,14 +2084,18 @@
// Server node (i.e. Life node) is not running, now the backup takes over.
// we must remember to close stuff we don't need any more
+ synchronized (quorumManager)
+ {
while (true)
{
- nodeManager.awaitLiveNode();
+ quorumManager.wait();
+ // nodeManager.awaitLiveNode();
break;
// if (!started || quorumManager.isNodeDown())
// {
// break;
// }
+ }
}
serverLocator.close();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2011-11-03 13:42:27 UTC (rev 11654)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2011-11-03 13:42:42 UTC (rev 11655)
@@ -25,14 +25,14 @@
* quorum will help a remote backup in deciding whether to replace its 'live' server or to wait for
* it.
*/
-final class QuorumManager implements ClusterTopologyListener
+public final class QuorumManager implements ClusterTopologyListener
{
// private static final Logger LOG = Logger.getLogger(QuorumManager.class);
// volatile boolean started;
private final ServerLocator locator;
- private final String targetServerName = "";
+ private String targetServerID = "";
private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
private static final long DISCOVERY_TIMEOUT = 3;
@@ -47,7 +47,7 @@
public void nodeUP(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last)
{
- if (targetServerName.equals(nodeID))
+ if (targetServerID.equals(nodeID))
{
return;
}
@@ -57,20 +57,25 @@
@Override
public void nodeDown(long eventUID, String nodeID)
{
- if (targetServerName.equals(nodeID))
+ if (targetServerID.equals(nodeID))
{
- // targetReturned = false;
- // trigger action
-
- // decide to wake backup
- locator.removeClusterTopologyListener(this);
+ if (!targetServerID.isEmpty())
+ synchronized (this)
+ {
+ notify();
+ }
}
nodes.remove(nodeID);
}
+ public void setLiveID(String liveID)
+ {
+ targetServerID = liveID;
+ }
+
public boolean isNodeDown()
{
- boolean liveShutdownCleanly = !nodes.containsKey(targetServerName);
+ boolean liveShutdownCleanly = !nodes.containsKey(targetServerID);
boolean noOtherServersAround = nodes.size() == 0;
if (liveShutdownCleanly || noOtherServersAround)
return true;
@@ -85,7 +90,7 @@
{
for (Entry<String, Pair<TransportConfiguration, TransportConfiguration>> pair : nodes.entrySet())
{
- if (targetServerName.equals(pair.getKey()))
+ if (targetServerID.equals(pair.getKey()))
continue;
TransportConfiguration serverTC = pair.getValue().getA();
ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
13 years, 2 months
JBoss hornetq SVN: r11654 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-03 09:42:27 -0400 (Thu, 03 Nov 2011)
New Revision: 11654
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
Log:
Don't cluter std.out.
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-11-03 13:42:07 UTC (rev 11653)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-11-03 13:42:27 UTC (rev 11654)
@@ -71,7 +71,7 @@
* A PagingTest
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
+ *
* Created Dec 5, 2008 8:25:58 PM
*
*
@@ -262,7 +262,7 @@
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();
-
+
waitForServer(server);
queue = server.locateQueue(ADDRESS);
@@ -281,7 +281,7 @@
ClientMessage msg = consumer.receive(5000);
if (msg != null)
{
- System.out.println("Msg " + msg.getIntProperty("id"));
+ // System.out.println("Msg " + msg.getIntProperty("id"));
while (true)
{
@@ -301,7 +301,7 @@
Xid xid = xids.get(i);
session.rollback(xid);
}
- System.out.println("msgCount = " + queue.getMessageCount());
+ // System.out.println("msgCount = " + queue.getMessageCount());
xids.clear();
@@ -646,8 +646,8 @@
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();
-
-
+
+
locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
@@ -661,7 +661,7 @@
session = sf.createSession(false, false, false);
producer = session.createProducer(PagingTest.ADDRESS);
-
+
for (int i = 0; i < numberOfMessages; i++)
{
message = session.createMessage(true);
@@ -678,9 +678,9 @@
session.commit();
}
}
-
+
session.commit();
-
+
server.stop();
server = createServer(true,
@@ -1276,13 +1276,13 @@
{
bb.put(getSamplebyte(j));
}
-
+
final AtomicBoolean running = new AtomicBoolean(true);
-
+
class TCount extends Thread
{
Queue queue;
-
+
TCount(Queue queue)
{
this.queue = queue;
@@ -1307,11 +1307,11 @@
}
}
};
-
+
TCount tcount1 = null;
TCount tcount2 = null;
-
+
try
{
{
@@ -1337,8 +1337,8 @@
session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-2", null, true);
}
-
-
+
+
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
@@ -1376,21 +1376,21 @@
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();
-
+
Queue queue1 = server.locateQueue(PagingTest.ADDRESS.concat("-1"));
-
+
Queue queue2 = server.locateQueue(PagingTest.ADDRESS.concat("-2"));
-
+
assertNotNull(queue1);
-
+
assertNotNull(queue2);
-
+
assertNotSame(queue1, queue2);
tcount1 = new TCount(queue1);
-
+
tcount2 = new TCount(queue2);
-
+
tcount1.start();
tcount2.start();
@@ -1500,19 +1500,19 @@
finally
{
running.set(false);
-
+
if (tcount1 != null)
{
tcount1.interrupt();
tcount1.join();
}
-
+
if (tcount2 != null)
{
tcount2.interrupt();
tcount2.join();
}
-
+
try
{
server.stop();
@@ -1708,7 +1708,7 @@
private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
{
- System.out.println("PageDir:" + getPageDir());
+ // System.out.println("PageDir:" + getPageDir());
clearData();
Configuration config = createDefaultConfig();
@@ -1857,7 +1857,7 @@
* - Consume the entire destination (not in page mode any more)
* - Add stuff to a transaction again
* - Check order
- *
+ *
*/
public void testDepageDuringTransaction() throws Exception
{
@@ -1998,9 +1998,9 @@
* - Consume the entire destination (not in page mode any more)
* - Add stuff to a transaction again
* - Check order
- *
+ *
* Test under discussion at : http://community.jboss.org/thread/154061?tstart=0
- *
+ *
*/
public void testDepageDuringTransaction2() throws Exception
{
@@ -2091,9 +2091,9 @@
if (msgReceived != null)
{
- System.out.println("new = " + msgReceived.getBooleanProperty("new") +
- " id = " +
- msgReceived.getIntProperty("id"));
+// System.out.println("new = " + msgReceived.getBooleanProperty("new") +
+// " id = " +
+// msgReceived.getIntProperty("id"));
}
Assert.assertNull(msgReceived);
@@ -3838,13 +3838,13 @@
for (int i = 0; i < numberOfMessages; i++)
{
ClientMessage msg = cons.receive(PagingTest.RECEIVE_TIMEOUT);
- System.out.println("Message " + i + " consumed");
+ // System.out.println("Message " + i + " consumed");
assertNotNull(msg);
msg.acknowledge();
if (i % 20 == 0)
{
- System.out.println("Commit consumer");
+ // System.out.println("Commit consumer");
sessionConsumer.commit();
}
}
@@ -3881,7 +3881,7 @@
{
message = session.createMessage(persistentMessages);
- System.out.println("Message " + i + " sent");
+ // System.out.println("Message " + i + " sent");
HornetQBuffer bodyLocal = message.getBodyBuffer();
@@ -4132,7 +4132,7 @@
// assertEquals(msg, message.getIntProperty("propTest").intValue());
- System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
+ // System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
}
session.commit();
13 years, 2 months