Author: clebert.suconic(a)jboss.com
Date: 2010-11-23 22:36:30 -0500 (Tue, 23 Nov 2010)
New Revision: 9924
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
Log:
test fixes
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-23
20:12:05 UTC (rev 9923)
+++
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-24
03:36:30 UTC (rev 9924)
@@ -361,7 +361,7 @@
started = true;
- log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "]
started");
+ log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "]
started, waiting live to fail before it gets active");
nodeManager.awaitLiveNode();
@@ -1373,12 +1373,12 @@
{
// Load the journal and populate queues, transactions and caches in memory
+ pagingManager.reloadStores();
+
JournalLoadInformation[] journalInfo = loadJournals();
compareJournals(journalInfo);
- pagingManager.resumeDepages();
-
final ServerInfo dumper = new ServerInfo(this, pagingManager);
long dumpInfoInterval = configuration.getServerDumpInterval();
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-23
20:12:05 UTC (rev 9923)
+++
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-11-24
03:36:30 UTC (rev 9924)
@@ -1651,7 +1651,28 @@
}
session.commit();
+
+ session.close();
+
+ locator.close();
+
+ locator = createInVMNonHALocator();
+
+ server.stop();
+
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(null, null, false, false, false, false, 0);
+
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-11-23
20:12:05 UTC (rev 9923)
+++
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-11-24
03:36:30 UTC (rev 9924)
@@ -68,7 +68,8 @@
protected Configuration backupConfig;
protected Configuration liveConfig;
- private NodeManager nodeManager;
+
+ protected NodeManager nodeManager;
// Static --------------------------------------------------------
@@ -287,6 +288,8 @@
}
System.out.println("FailoverTestBase.waitForNewLive");
}
+
+
protected TransportConfiguration getInVMConnectorTransportConfiguration(final boolean
live)
{
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2010-11-23
20:12:05 UTC (rev 9923)
+++
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2010-11-24
03:36:30 UTC (rev 9924)
@@ -22,11 +22,14 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -35,6 +38,8 @@
/**
* A PagingFailoverTest
+ *
+ * TODO: validate replication failover also
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
@@ -52,6 +57,7 @@
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
// Attributes ----------------------------------------------------
+ private ServerLocator locator;
// Static --------------------------------------------------------
@@ -59,6 +65,26 @@
// Public --------------------------------------------------------
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ locator = getServerLocator();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ locator.close();
+ super.tearDown();
+ }
+
+ public void testPageFailBeforeConsume() throws Exception
+ {
+ internalTestPage(false, true);
+ }
+
+
public void testPage() throws Exception
{
internalTestPage(false, false);
@@ -76,17 +102,11 @@
public void internalTestPage(final boolean transacted, final boolean
failBeforeConsume) throws Exception
{
- throw new Exception("must change the test to reflect the new replication
code");
-
- /*
- ServerLocator locator = getServerLocator();
-
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
- //waitForTopology(locator, 1, 1);
-
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
ClientSession session = sf.createSession(!transacted, !transacted, 0);
try
@@ -94,23 +114,6 @@
session.createQueue(PagingFailoverTest.ADDRESS, PagingFailoverTest.ADDRESS,
true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener implements SessionFailureListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
-
- public void beforeReconnect(final HornetQException exception)
- {
- }
-
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer prod = session.createProducer(PagingFailoverTest.ADDRESS);
final int TOTAL_MESSAGES = 2000;
@@ -128,12 +131,15 @@
session.commit();
- ReplicationEndpointImpl endpoint = null;
-
if (failBeforeConsume)
{
- failSession(session, latch);
+ crash(session);
}
+
+
+ session.close();
+
+ session = sf.createSession(!transacted, !transacted, 0);
session.start();
@@ -143,6 +149,7 @@
for (int i = 0; i < MIDDLE; i++)
{
+ System.out.println("msg " + i);
ClientMessage msg = cons.receive(20000);
Assert.assertNotNull(msg);
msg.acknowledge();
@@ -154,19 +161,19 @@
}
session.commit();
+
+ cons.close();
+
+ Thread.sleep(1000);
- if (endpoint != null)
- {
- endpoint.setDeletePages(true);
- }
-
if (!failBeforeConsume)
{
- failSession(session, latch);
+ crash(session);
+ // failSession(session, latch);
}
session.close();
-
+
session = sf.createSession(true, true, 0);
cons = session.createConsumer(PagingFailoverTest.ADDRESS);
@@ -193,7 +200,6 @@
{
}
}
- */
}
/**
@@ -217,18 +223,12 @@
// Protected -----------------------------------------------------
- /* (non-Javadoc)
- * @see
org.hornetq.tests.integration.cluster.failover.FailoverTestBase#getAcceptorTransportConfiguration(boolean)
- */
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean
live)
{
return getInVMTransportAcceptorConfiguration(live);
}
- /* (non-Javadoc)
- * @see
org.hornetq.tests.integration.cluster.failover.FailoverTestBase#getConnectorTransportConfiguration(boolean)
- */
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean
live)
{
@@ -238,11 +238,12 @@
@Override
protected HornetQServer createServer(final boolean realFiles, final Configuration
configuration)
{
- return createServer(realFiles,
- configuration,
- PagingFailoverTest.PAGE_SIZE,
- PagingFailoverTest.PAGE_MAX,
- new HashMap<String, AddressSettings>());
+ return createInVMFailoverServer(true,
+ configuration,
+ PagingFailoverTest.PAGE_SIZE,
+ PagingFailoverTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>(),
+ nodeManager);
}
@Override
@@ -250,36 +251,13 @@
{
return new SameProcessHornetQServer(createServer(true, backupConfig));
}
-
+
@Override
protected TestableServer createLiveServer()
{
return new SameProcessHornetQServer(createServer(true, liveConfig));
}
-
- /**
- * @throws Exception
- */
- @Override
- protected void createConfigs() throws Exception
- {
- Configuration config1 = super.createDefaultConfig();
- config1.getAcceptorConfigurations().clear();
- config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
- config1.setSecurityEnabled(false);
- config1.setSharedStore(true);
- config1.setBackup(true);
- backupServer = createBackupServer();
-
- Configuration config0 = super.createDefaultConfig();
- config0.getAcceptorConfigurations().clear();
- config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
- config0.setSecurityEnabled(false);
- config0.setSharedStore(true);
- liveServer = createLiveServer();
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-11-23
20:12:05 UTC (rev 9923)
+++
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-11-24
03:36:30 UTC (rev 9924)
@@ -13,7 +13,6 @@
package org.hornetq.tests.integration.cluster.util;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -24,7 +23,6 @@
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
-import org.hornetq.spi.core.protocol.RemotingConnection;
/**
* A SameProcessHornetQServer