[hornetq-commits] JBoss hornetq SVN: r9595 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/api/core and 7 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Aug 25 04:39:02 EDT 2010
Author: ataylor
Date: 2010-08-25 04:39:01 -0400 (Wed, 25 Aug 2010)
New Revision: 9595
Modified:
branches/2_2_0_HA_Improvements/hornetq.ipr
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/TransportConfiguration.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
fixes for shared store backup
Modified: branches/2_2_0_HA_Improvements/hornetq.ipr
===================================================================
--- branches/2_2_0_HA_Improvements/hornetq.ipr 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/hornetq.ipr 2010-08-25 08:39:01 UTC (rev 9595)
@@ -342,11 +342,8 @@
<option name="SKIP_IMPORT_STATEMENTS" value="false" />
</component>
<component name="EclipseCompilerSettings">
- <option name="DEBUGGING_INFO" value="true" />
<option name="GENERATE_NO_WARNINGS" value="true" />
<option name="DEPRECATION" value="false" />
- <option name="ADDITIONAL_OPTIONS_STRING" value="" />
- <option name="MAXIMUM_HEAP_SIZE" value="128" />
</component>
<component name="EclipseEmbeddedCompilerSettings">
<option name="DEBUGGING_INFO" value="true" />
@@ -423,14 +420,6 @@
<option name="LOCALE" />
<option name="OPEN_IN_BROWSER" value="true" />
</component>
- <component name="JikesSettings">
- <option name="JIKES_PATH" value="" />
- <option name="DEBUGGING_INFO" value="true" />
- <option name="DEPRECATION" value="true" />
- <option name="GENERATE_NO_WARNINGS" value="false" />
- <option name="IS_EMACS_ERRORS_MODE" value="true" />
- <option name="ADDITIONAL_OPTIONS_STRING" value="" />
- </component>
<component name="LogConsolePreferences">
<option name="FILTER_ERRORS" value="false" />
<option name="FILTER_WARNINGS" value="false" />
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/TransportConfiguration.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/TransportConfiguration.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/TransportConfiguration.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -167,13 +167,13 @@
if (factoryClassName.equals(kother.factoryClassName))
{
- if (params == null)
+ if (params == null || params.isEmpty())
{
- return kother.params == null;
+ return kother.params == null || kother.params.isEmpty();
}
else
{
- if (kother.params == null)
+ if (kother.params == null || kother.params.isEmpty())
{
return false;
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -657,4 +657,8 @@
void close();
boolean isHA();
+
+ void addClusterTopologyListener(ClusterTopologyListener listener);
+
+ void removeClusterTopologyListener(ClusterTopologyListener listener);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -39,13 +39,9 @@
String getNodeID();
void connect();
-
- void addClusterTopologyListener(ClusterTopologyListener listener);
-
- void removeClusterTopologyListener(ClusterTopologyListener listener);
-
+
void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
-
+
void notifyNodeDown(String nodeID);
void setClusterConnection(boolean clusterConnection);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -38,16 +38,57 @@
*/
private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
+ int nodes = 0;
+
public synchronized boolean addMember(String nodeId, TopologyMember member)
{
- boolean replaced = topology.containsKey(nodeId);
- topology.put(nodeId, member);
+ boolean replaced = false;
+ TopologyMember currentMember = topology.get(nodeId);
+ if(currentMember == null)
+ {
+ topology.put(nodeId, member);
+ replaced = true;
+ if(member.getConnector().a != null)
+ {
+ nodes++;
+ }
+ if(member.getConnector().b != null)
+ {
+ nodes++;
+ }
+ }
+ else
+ {
+ if(currentMember.getConnector().a == null && member.getConnector().a != null)
+ {
+ currentMember.getConnector().a = member.getConnector().a;
+ replaced = true;
+ nodes++;
+ }
+ if(currentMember.getConnector().b == null && member.getConnector().b != null)
+ {
+ currentMember.getConnector().b = member.getConnector().b;
+ replaced = true;
+ nodes++;
+ }
+ }
return replaced;
}
public synchronized boolean removeMember(String nodeId)
{
TopologyMember member = topology.remove(nodeId);
+ if(member != null)
+ {
+ if(member.getConnector().a != null)
+ {
+ nodes--;
+ }
+ if(member.getConnector().b != null)
+ {
+ nodes--;
+ }
+ }
return (member != null);
}
@@ -77,7 +118,7 @@
public int size()
{
- return topology.size();
+ return nodes;
}
public String describe()
@@ -94,5 +135,6 @@
public void clear()
{
topology.clear();
+ nodes = 0;
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -113,7 +113,15 @@
buffer.writeString(nodeID);
if (!exit)
{
- pair.a.encode(buffer);
+ if (pair.a != null)
+ {
+ buffer.writeBoolean(true);
+ pair.a.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
if (pair.b != null)
{
buffer.writeBoolean(true);
@@ -134,9 +142,18 @@
exit = buffer.readBoolean();
nodeID = buffer.readString();
if (!exit)
- {
- TransportConfiguration a = new TransportConfiguration();
- a.decode(buffer);
+ {
+ boolean hasLive = buffer.readBoolean();
+ TransportConfiguration a;
+ if(hasLive)
+ {
+ a = new TransportConfiguration();
+ a.decode(buffer);
+ }
+ else
+ {
+ a = null;
+ }
boolean hasBackup = buffer.readBoolean();
TransportConfiguration b;
if (hasBackup)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -244,26 +244,22 @@
boolean last,
int distance)
{
- if (nodeID.equals(nodeUUID.toString()))
- {
- return;
- }
-
boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
-
- if (distance >= topology.size() || updated)
+ if(!updated)
{
return;
}
-
for (ClusterTopologyListener listener : clientListeners)
{
listener.nodeUP(nodeID, connectorPair, last, distance);
}
- for (ClusterTopologyListener listener : clusterConnectionListeners)
+ if (distance < topology.size())
{
- listener.nodeUP(nodeID, connectorPair, last, distance);
+ for (ClusterTopologyListener listener : clusterConnectionListeners)
+ {
+ listener.nodeUP(nodeID, connectorPair, last, distance);
+ }
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -13,9 +13,12 @@
package org.hornetq.core.server.cluster.impl;
+import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -40,10 +43,9 @@
private final String directory;
- private static Map<String, Lock> locks = new WeakHashMap<String, Lock>();
-
- private Lock lock;
-
+ private final static Map<String, Semaphore> locks = new WeakHashMap<String, Semaphore>();
+
+ private Semaphore semaphore;
/**
* @param fileName
* @param directory
@@ -56,15 +58,32 @@
synchronized (locks)
{
- String key = directory + fileName;
+ String key = directory + "/" + fileName;
- lock = locks.get(key);
+ semaphore = locks.get(key);
- if (lock == null)
+ if (semaphore == null)
{
- lock = new ReentrantLock(true);
+ semaphore = new Semaphore(1, true);
- locks.put(key, lock);
+ locks.put(key, semaphore);
+
+ File f = new File(directory, fileName);
+
+ try
+ {
+ f.createNewFile();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ throw new IllegalStateException(e);
+ }
+
+ if(!f.exists())
+ {
+ throw new IllegalStateException("unable to create " + directory + fileName);
+ }
}
}
}
@@ -81,13 +100,38 @@
public void lock() throws IOException
{
- lock.lock();
+ try
+ {
+ semaphore.acquire();
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
}
public boolean unlock() throws IOException
{
- lock.unlock();
+ semaphore.release();
return true;
}
+
+ public static void unlock(final String fileName, final String directory)
+ {
+ String key = directory + "/" + fileName;
+
+ Semaphore semaphore = locks.get(key);
+
+ semaphore.release();
+ }
+
+ public static void clearLocks()
+ {
+ for (Semaphore semaphore : locks.values())
+ {
+ semaphore.drainPermits();
+ }
+ locks.clear();
+ }
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -557,7 +557,6 @@
initialisePart1();
//TODO TODO at this point the clustermanager needs to announce it's presence so the cluster can know about the backup
-
// We now look for the live.lock file - if it doesn't exist it means the live isn't started yet, so we wait
// for that
@@ -574,17 +573,20 @@
liveLock = createLockFile("live.lock", configuration.getJournalDirectory());
+
+ clusterManager.start();
+
log.info("Live server is up - waiting for failover");
liveLock.lock();
-
+ //todo check if we need this or not
// We need to test if the file exists again, since the live might have shutdown
- if (!liveLockFile.exists())
- {
- liveLock.unlock();
+ // if (!liveLockFile.exists())
+ // {
+ // liveLock.unlock();
- continue;
- }
+ // continue;
+ // }
log.info("Obtained live lock");
@@ -600,13 +602,13 @@
initialisePart2();
- log.info("Server is now live");
+ log.info("Back Up Server is now live");
backupLock.unlock();
}
catch (InterruptedException e)
{
- // This can occur when closing if the thread is blocked - it's ok
+ System.out.println("HornetQServerImpl$SharedStoreBackupActivation.run");
}
catch (Exception e)
{
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -24,11 +24,7 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.api.core.client.*;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.DelegatingSession;
@@ -171,9 +167,9 @@
for (int i = 0; i < numIts; i++)
{
AsynchronousFailoverTest.log.info("Iteration " + i);
+ ServerLocator locator = getServerLocator();
+ sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
- sf = getSessionFactory();
-
sf.getServerLocator().setBlockOnNonDurableSend(true);
sf.getServerLocator().setBlockOnDurableSend(true);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -26,16 +26,13 @@
import junit.framework.Assert;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.*;
import org.hornetq.api.core.client.*;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -84,11 +81,17 @@
public void testNonTransacted() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ClientSessionFactoryInternal sf;
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ ServerLocator locator = getServerLocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -146,6 +149,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -153,11 +158,15 @@
public void testConsumeTransacted() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -242,6 +251,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -251,11 +262,15 @@
* and the servers should be able to connect without any problems. */
public void testRestartServers() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -279,17 +294,14 @@
session.close();
+ server1Service.stop();
server0Service.stop();
- server1Service.stop();
-
+ FakeLockFile.clearLocks();
server1Service.start();
server0Service.start();
- sf = getSessionFactory();
+ sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
-
session = sf.createSession(true, true);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -311,6 +323,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -319,12 +333,16 @@
// https://jira.jboss.org/jira/browse/HORNETQ-285
public void testFailoverOnInitialConnection() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setFailoverOnInitialConnection(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnInitialConnection(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
// Stop live server
this.server0Service.stop();
@@ -367,6 +385,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -377,28 +397,32 @@
* @param latch
* @throws InterruptedException
*/
- private void fail(final ClientSession session, final CountDownLatch latch) throws InterruptedException
+ private void fail(final ClientSession session, final CountDownLatch latch) throws Exception
{
- RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+ //RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
// Simulate failure on connection
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
+ //conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+ server0Service.stop();
// Wait to be informed of failure
- boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+ boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
Assert.assertTrue(ok);
}
public void testTransactedMessagesSentSoRollback() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -456,6 +480,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -467,11 +493,15 @@
*/
public void testTransactedMessagesSentSoRollbackAndContinueWork() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -541,6 +571,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -548,11 +580,15 @@
public void testTransactedMessagesNotSentSoNoRollback() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -622,6 +658,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -629,11 +667,15 @@
public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -711,6 +753,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -718,11 +762,15 @@
public void testTransactedMessagesConsumedSoRollback() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session1 = sf.createSession(false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -794,6 +842,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -801,11 +851,15 @@
public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session1 = sf.createSession(false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -889,6 +943,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -896,11 +952,15 @@
public void testXAMessagesSentSoRollbackOnEnd() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -959,6 +1019,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -966,11 +1028,15 @@
public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1031,6 +1097,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1039,11 +1107,15 @@
// This might happen if 1PC optimisation kicks in
public void testXAMessagesSentSoRollbackOnCommit() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1106,6 +1178,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1113,11 +1187,15 @@
public void testXAMessagesNotSentSoNoRollbackOnCommit() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1195,6 +1273,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1202,11 +1282,15 @@
public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session1 = sf.createSession(false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1280,6 +1364,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1287,11 +1373,15 @@
public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session1 = sf.createSession(false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1376,6 +1466,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1384,11 +1476,14 @@
// 1PC optimisation
public void testXAMessagesConsumedSoRollbackOnCommit() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session1 = sf.createSession(false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1466,6 +1561,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1473,8 +1570,10 @@
public void testCreateNewFactoryAfterFailover() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sendAndConsume(sf, true);
final CountDownLatch latch = new CountDownLatch(1);
@@ -1502,13 +1601,14 @@
session.close();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
session = sendAndConsume(sf, false);
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1516,11 +1616,15 @@
public void testFailoverMultipleSessionsWithConsumers() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
final int numSessions = 5;
final int numConsumersPerSession = 5;
@@ -1620,6 +1724,8 @@
sendSession.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1630,11 +1736,14 @@
*/
public void testFailWithBrowser() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = sf.createSession(true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1703,6 +1812,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1710,11 +1821,15 @@
public void testFailThenReceiveMoreMessagesAfterFailover() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1785,6 +1900,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1792,11 +1909,12 @@
public void testFailThenReceiveMoreMessagesAfterFailover2() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnAcknowledge(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = sf.createSession(true, true, 0);
@@ -1878,6 +1996,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1905,11 +2025,14 @@
private void testSimpleSendAfterFailover(final boolean durable, final boolean temporary) throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnAcknowledge(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = sf.createSession(true, true, 0);
@@ -1970,6 +2093,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1977,8 +2102,10 @@
public void testForceBlockingReturn() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
// Add an interceptor to delay the send method so we can get time to cause failover before it returns
server0Service.getRemotingService().addInterceptor(new DelayInterceptor());
@@ -1986,6 +2113,8 @@
sf.getServerLocator().setBlockOnNonDurableSend(true);
sf.getServerLocator().setBlockOnDurableSend(true);
sf.getServerLocator().setBlockOnAcknowledge(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
final ClientSession session = sf.createSession(true, true, 0);
@@ -2043,6 +2172,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -2050,12 +2181,16 @@
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception
{
- final ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnAcknowledge(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ locator.setBlockOnAcknowledge(true);
+ final ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
final ClientSession session = sf.createSession(false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -2203,6 +2338,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -2210,11 +2347,14 @@
public void testCommitDidNotOccurUnblockedAndResend() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnAcknowledge(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
final ClientSession session = sf.createSession(false, false);
@@ -2340,6 +2480,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -2452,4 +2594,6 @@
}
// Inner classes -------------------------------------------------
+
+
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -13,24 +13,33 @@
package org.hornetq.tests.integration.cluster.failover;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
@@ -80,6 +89,7 @@
{
super.setUp();
clearData();
+ FakeLockFile.clearLocks();
createConfigs();
if (server1Service != null)
@@ -101,6 +111,16 @@
config1.setSecurityEnabled(false);
config1.setSharedStore(true);
config1.setBackup(true);
+ config1.setClustered(true);
+ TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
+ TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
+ List<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(liveConnector.getName());
+ ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
+ staticConnectors);
+ config1.getClusterConfigurations().add(ccc1);
+ config1.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+ config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
server1Service = createFakeLockServer(true, config1);
server1Service.registerActivateCallback(new ActivateCallback()
@@ -129,6 +149,12 @@
config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
config0.setSecurityEnabled(false);
config0.setSharedStore(true);
+ config0.setClustered(true);
+ List<String> pairs = null;
+ ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
+ pairs);
+ config0.getClusterConfigurations().add(ccc0);
+ config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
server0Service = createFakeLockServer(true, config0);
}
@@ -179,6 +205,20 @@
super.tearDown();
}
+ protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator, int topologyMembers)
+ throws Exception
+ {
+ ClientSessionFactoryInternal sf;
+ CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
+
+ locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
+
+ sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
+ assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
+ return sf;
+ }
+
protected TransportConfiguration getInVMConnectorTransportConfiguration(final boolean live)
{
if (live)
@@ -251,14 +291,45 @@
protected abstract TransportConfiguration getConnectorTransportConfiguration(final boolean live);
- protected ClientSessionFactoryInternal getSessionFactory() throws Exception
+ protected ServerLocatorInternal getServerLocator() throws Exception
{
- ServerLocator locator = HornetQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true), getConnectorTransportConfiguration(false));
- return (ClientSessionFactoryInternal) locator.createSessionFactory();
+ ServerLocator locator = HornetQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true));
+ return (ServerLocatorInternal) locator;
}
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+ class LatchClusterTopologyListener implements ClusterTopologyListener
+ {
+ final CountDownLatch latch;
+ int liveNodes = 0;
+ int backUpNodes = 0;
+ List<String> liveNode = new ArrayList<String>();
+ List<String> backupNode = new ArrayList<String>();
+ public LatchClusterTopologyListener(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+ {
+ if(connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
+ {
+ liveNode.add(connectorPair.a.getName());
+ latch.countDown();
+ }
+ if(connectorPair.b != null && !backupNode.contains(connectorPair.b.getName()))
+ {
+ backupNode.add(connectorPair.b.getName());
+ latch.countDown();
+ }
+ }
+
+ public void nodeDown(String nodeID)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ }
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -17,7 +17,9 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.util.UnitTestCase;
@@ -84,11 +86,11 @@
}
- protected ClientSessionFactoryInternal getSessionFactory() throws Exception
+ protected ServerLocatorInternal getServerLocator() throws Exception
{
- ClientSessionFactoryInternal sf = super.getSessionFactory();
- sf.getServerLocator().setMinLargeMessageSize(LARGE_MESSAGE_SIZE);
- return sf;
+ ServerLocator locator = super.getServerLocator();
+ locator.setMinLargeMessageSize(LARGE_MESSAGE_SIZE);
+ return (ServerLocatorInternal) locator;
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -22,11 +22,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.api.core.client.*;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
@@ -79,11 +75,16 @@
public void internalTestPage(final boolean transacted, final boolean failBeforeConsume) throws Exception
{
- ClientSessionFactoryInternal factory = getSessionFactory();
- factory.getServerLocator().setBlockOnDurableSend(true);
- factory.getServerLocator().setBlockOnAcknowledge(true);
- ClientSession session = factory.createSession(!transacted, !transacted, 0);
+ ServerLocator locator = getServerLocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ //waitForTopology(locator, 1, 1);
+
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+ ClientSession session = sf.createSession(!transacted, !transacted, 0);
+
try
{
@@ -170,7 +171,7 @@
session.close();
- session = factory.createSession(true, true, 0);
+ session = sf.createSession(true, true, 0);
cons = session.createConsumer(PagingFailoverTest.ADDRESS);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -10,6 +10,7 @@
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
+
package org.hornetq.tests.util;
import org.hornetq.core.config.Configuration;
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -65,6 +65,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
More information about the hornetq-commits
mailing list