[jboss-cvs] JBoss Messaging SVN: r1773 - in trunk: src/main/org/jboss/jms/client/container src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests tests/src/org/jboss/test/messaging/jms/clustering
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 12 15:04:50 EST 2006
Author: timfox
Date: 2006-12-12 15:04:43 -0500 (Tue, 12 Dec 2006)
New Revision: 1773
Modified:
trunk/src/main/org/jboss/jms/client/container/HAAspect.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Server hop code
Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-12 19:13:02 UTC (rev 1772)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-12 20:04:43 UTC (rev 1773)
@@ -199,47 +199,84 @@
log.info("calling createFailoverConnectionDelegate");
- //Create a connection using that connection factory
- CreateConnectionResult res =
- newCF.createConnectionDelegate(state.getUser(), state.getPassword(), state.getServerID());
+ int tries = 0;
- log.info("returned from createFailoverConnectionDelegate");
-
- if (res.getDelegate() != null)
- {
- log.info("Got connection");
+ //We try a maximum of 10 hops
+ final int MAX_TRIES = 10;
+
+ while (tries < MAX_TRIES)
+ {
+ //Create a connection using that connection factory
+ CreateConnectionResult res =
+ newCF.createConnectionDelegate(state.getUser(), state.getPassword(), state.getServerID());
- //We got the right server and created a new connection ok
-
- ClientConnectionDelegate newConnection = (ClientConnectionDelegate)res.getDelegate();
-
- log.info("newconnection is " + newConnection);
-
- failover(failedConnection, newConnection);
- }
- else
- {
- if (res.getActualFailoverNode() == -1)
+ log.info("returned from createFailoverConnectionDelegate");
+
+ if (res.getDelegate() != null)
{
- //No trace of failover was detected on the server side - this might happen if the client side
- //network fails temporarily so the client connection breaks but the server side network is still
- //up and running - in this case we want to retry back on the original server
+ log.info("Got connection");
- //TODO TODO TODO
+ //We got the right server and created a new connection ok
- log.info("No failover is occurring on server side");
-
+ ClientConnectionDelegate newConnection = (ClientConnectionDelegate)res.getDelegate();
+
+ log.info("newconnection is " + newConnection);
+
+ failover(failedConnection, newConnection);
+
+ break;
}
else
{
- //Server side failover has occurred / is occurring but we tried the wrong node
- //Now we must try the correct node
-
- //TODO TODO TODO
-
- log.info("*** Got wrong server!");
+ if (res.getActualFailoverNode() == -1)
+ {
+ //No trace of failover was detected on the server side - this might happen if the client side
+ //network fails temporarily so the client connection breaks but the server side network is still
+ //up and running - in this case we don't failover
+
+ //TODO Is this the right thing to do?
+
+ log.trace("Client attempted to failover, but no failover had occurred on the server side");
+
+ break;
+
+ }
+ else
+ {
+ //Server side failover has occurred / is occurring but we tried the wrong node
+ //Now we must try the correct node
+
+ newCF = null;
+
+ tries++;
+
+ for (int i = 0; i < delegates.length; i++)
+ {
+ ClientConnectionFactoryDelegate del = delegates[i];
+
+ if (del.getServerId() == res.getActualFailoverNode())
+ {
+ newCF = del;
+
+ break;
+ }
+ }
+
+ if (newCF == null)
+ {
+ //Houston, we have a problem
+
+ //TODO Could this ever happen? Should we send back the cf, or update it instead of just the id??
+ throw new JMSException("Cannot find server with id " + res.getActualFailoverNode());
+ }
+ }
}
}
+
+ if (tries == MAX_TRIES)
+ {
+ throw new JMSException("Cannot find correct server to failover onto");
+ }
}
private ClientConnectionFactoryDelegate getFailoverDelegate(ClientConnectionDelegate currentDelegate) throws JMSException
@@ -355,8 +392,7 @@
oldCallbackManager);
// We add the new consumer id to the list of old ids
- consumerIds.add(new Integer(((ConsumerState)sessionChild).getConsumerID()));
-
+ consumerIds.add(new Integer(((ConsumerState)sessionChild).getConsumerID()));
}
else if (sessionChild instanceof BrowserState)
{
@@ -411,17 +447,10 @@
}
}
-// problem - what if the consumer has closed - but there are still acks in the session or rm?
-//
-// we still need to replace them but with what?
-//
-// in this case we can't recreate a consumer on the server since it has closed
-//
-// solution here is to store by session id - major reworking!!!!!!!!
+ //TODO
+ //If the session had consumers which are now closed then there is no way to recreate them on the server
+ //we need to store with session id
-
- // todo need to replace consumer id
-
//We must not start the connection until the end
if (failedState.isStarted())
{
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java 2006-12-12 19:13:02 UTC (rev 1772)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java 2006-12-12 20:04:43 UTC (rev 1773)
@@ -97,16 +97,8 @@
class SendStatsTimerTask extends TimerTask
{
- private boolean stopping;
-
- private boolean stopped;
-
- private Object stopLock = new Object();
-
- public void run()
+ public synchronized void run()
{
- checkStop();
-
try
{
office.sendQueueStats();
@@ -115,45 +107,11 @@
{
log.error("Failed to send statistics", e);
}
-
- checkStop();
}
-
- private void checkStop()
+
+ synchronized void stop()
{
- synchronized (stopLock)
- {
- if (stopping)
- {
- cancel();
-
- stopped = true;
-
- stopLock.notify();
-
- return;
- }
- }
+ cancel();
}
-
- void stop()
- {
- synchronized (stopLock)
- {
- stopping = true;
-
- while (!stopped)
- {
- try
- {
- stopLock.wait();
- }
- catch (InterruptedException e)
- {
- //Ignore
- }
- }
- }
- }
}
}
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2006-12-12 19:13:02 UTC (rev 1772)
+++ trunk/tests/build.xml 2006-12-12 20:04:43 UTC (rev 1773)
@@ -669,7 +669,7 @@
- <!-- Start 3 rmi servers -->
+ <!-- Start 4 rmi servers -->
<antcall target="start-rmi-server-clustering">
<param name="test.server.index" value="0"/>
@@ -745,7 +745,7 @@
haltonerror="${junit.batchtest.haltonerror}">
<formatter type="plain" usefile="${junit.formatter.usefile}"/>
<fileset dir="${build.tests.classes}">
- <include name="**/jms/clustering/HATest.class"/>
+ <include name="**/jms/clustering/*Test.class"/>
<!--
<include name="**/jms/clustering/SimpleClusteringTest.class"/>
-->
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-12 19:13:02 UTC (rev 1772)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-12 20:04:43 UTC (rev 1773)
@@ -75,484 +75,484 @@
* Test that connections created using a clustered connection factory are created round robin on
* different servers
*/
-// public void testRoundRobinConnectionCreation() throws Exception
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// log.info ("number of delegates = " + delegate.getDelegates().length);
-// log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
-//
-// assertEquals(3, delegate.getDelegates().length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//
-// assertEquals(0, cf1.getServerId());
-//
-// assertEquals(1, cf2.getServerId());
-//
-// assertEquals(2, cf3.getServerId());
-//
-// assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-//
-// Connection conn1 = null;
-//
-// Connection conn2 = null;
-//
-// Connection conn3 = null;
-//
-// Connection conn4 = null;
-//
-// Connection conn5 = null;
-//
-// try
-// {
-// conn1 = factory.createConnection();
-//
-// conn2 = factory.createConnection();
-//
-// conn3 = factory.createConnection();
-//
-// conn4 = factory.createConnection();
-//
-// conn5 = factory.createConnection();
-//
-// ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
-//
-// ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
-//
-// ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
-//
-// ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
-//
-// ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
-//
-// int serverID1 = state1.getServerID();
-//
-// int serverID2 = state2.getServerID();
-//
-// int serverID3 = state3.getServerID();
-//
-// int serverID4 = state4.getServerID();
-//
-// int serverID5 = state5.getServerID();
-//
-// log.info("server id 1: " + serverID1);
-//
-// log.info("server id 2: " + serverID2);
-//
-// log.info("server id 3: " + serverID3);
-//
-// log.info("server id 4: " + serverID4);
-//
-// log.info("server id 5: " + serverID5);
-//
-// assertEquals(0, serverID1);
-//
-// assertEquals(1, serverID2);
-//
-// assertEquals(2, serverID3);
-//
-// assertEquals(0, serverID4);
-//
-// assertEquals(1, serverID5);
-// }
-// finally
-// {
-// if (conn1 != null)
-// {
-// conn1.close();
-// }
-//
-// if (conn2 != null)
-// {
-// conn2.close();
-// }
-//
-// if (conn3 != null)
-// {
-// conn3.close();
-// }
-//
-// if (conn4 != null)
-// {
-// conn4.close();
-// }
-//
-// if (conn5 != null)
-// {
-// conn5.close();
-// }
-// }
-//
-// }
-//
-// /*
-// * Test that the failover mapping is created correctly and updated properly when nodes leave
-// * or join
-// */
-// public void testDefaultFailoverMap() throws Exception
-// {
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//
-// //The order here depends on the order the servers were started in
-//
-// //If any servers get stopped and then started then the order will change
-//
-// log.info("cf1 serverid=" + cf1.getServerId());
-//
-// log.info("cf2 serverid=" + cf2.getServerId());
-//
-// log.info("cf3 serverid=" + cf3.getServerId());
-//
-//
-// assertEquals(0, cf1.getServerId());
-//
-// assertEquals(1, cf2.getServerId());
-//
-// assertEquals(2, cf3.getServerId());
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// assertEquals(3, delegates.length);
-//
-// assertEquals(3, failoverMap.size());
-//
-// // Default failover policy just chooses the node to the right
-//
-// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//
-// assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-// }
-//
-// //Now cleanly stop one of the servers
-//
-// log.info("************** STOPPING SERVER 0");
-// ServerManagement.stop(0);
-//
-// log.info("server stopped");
-//
-// assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
-//
-// {
-// //Lookup another connection factory
-//
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-//
-// log.info("Got connection factory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info("Got failover map");
-//
-// assertEquals(2, delegates.length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// //Order here depends on order servers were started in
-//
-// log.info("cf1 serverid=" + cf1.getServerId());
-//
-// log.info("cf2 serverid=" + cf2.getServerId());
-//
-// assertEquals(1, cf1.getServerId());
-//
-// assertEquals(2, cf2.getServerId());
-//
-//
-// assertEquals(2, failoverMap.size());
-//
-// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-// }
-//
-// //Cleanly stop another server
-//
-// log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
-//
-// ServerManagement.stop(1);
-//
-// assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
-//
-// {
-// //Lookup another connection factory
-//
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// assertEquals(1, delegates.length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// assertEquals(2, cf1.getServerId());
-//
-//
-// assertEquals(1, failoverMap.size());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-// }
-//
-// //Restart server 0
-//
-// ServerManagement.start("all", 0);
-//
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// log.info("Got connection factory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info("Got failover map");
-//
-// assertEquals(2, delegates.length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// log.info("cf1 serverid=" + cf1.getServerId());
-//
-// log.info("cf2 serverid=" + cf2.getServerId());
-//
-// assertEquals(2, cf1.getServerId());
-//
-// assertEquals(0, cf2.getServerId());
-//
-//
-// assertEquals(2, failoverMap.size());
-//
-// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-// }
-//
-//
-// //Restart server 1
-//
-// ServerManagement.start("all", 1);
-//
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-//
-// log.info("Got connection factory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info("Got failover map");
-//
-// assertEquals(3, delegates.length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//
-// log.info("cf1 serverid=" + cf1.getServerId());
-//
-// log.info("cf2 serverid=" + cf2.getServerId());
-//
-// log.info("cf3 serverid=" + cf3.getServerId());
-//
-// assertEquals(2, cf1.getServerId());
-//
-// assertEquals(0, cf2.getServerId());
-//
-// assertEquals(1, cf3.getServerId());
-//
-//
-// assertEquals(3, failoverMap.size());
-//
-// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//
-// assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-// }
-// }
-//
-// public void testSimpleFailover() throws Exception
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
-// assertEquals(3, nodeIDView.size());
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// ClientConnectionFactoryDelegate cf1 = delegates[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegates[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegates[2];
-//
-// int server0Id = cf1.getServerId();
-//
-// int server1Id = cf2.getServerId();
-//
-// int server2Id = cf3.getServerId();
-//
-// log.info("server 0 id: " + server0Id);
-//
-// log.info("server 1 id: " + server1Id);
-//
-// log.info("server 2 id: " + server2Id);
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info(failoverMap.get(new Integer(server0Id)));
-// log.info(failoverMap.get(new Integer(server1Id)));
-// log.info(failoverMap.get(new Integer(server2Id)));
-//
-// int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
-//
-// // server 1 should failover onto server 2
-//
-// assertEquals(server2Id, server1FailoverId);
-//
-// Connection conn = null;
-//
-// try
-// {
-//
-// //Get a connection on server 1
-// conn = factory.createConnection(); //connection on server 0
-//
-// conn.close();
-//
-// conn = factory.createConnection(); //connection on server 1
-//
-// JBossConnection jbc = (JBossConnection)conn;
-//
-// ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
-//
-// ConnectionState state = (ConnectionState)del.getState();
-//
-// int initialServerID = state.getServerID();
-//
-// assertEquals(1, initialServerID);
-//
-// Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageProducer prod = sess.createProducer(queue1);
-//
-// MessageConsumer cons = sess.createConsumer(queue1);
-//
-// final int NUM_MESSAGES = 100;
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = sess.createTextMessage("message:" + i);
-//
-// prod.send(tm);
-// }
-//
-// //So now, messages should be in queue1 on server 1
-// //So we now kill server 1
-// //Which should cause transparent failover of connection conn onto server 1
-//
-// log.info("************ KILLING (CRASHING) SERVER 1");
-//
-// ServerManagement.kill(1);
-//
-// log.info("killed server, now waiting");
-//
-// Thread.sleep(5000);
-//
-// log.info("done wait");
-//
-// state = (ConnectionState)del.getState();
-//
-// int finalServerID = state.getServerID();
-//
-// log.info("final server id= " + finalServerID);
-//
-// //server id should now be 2
-//
-// assertEquals(2, finalServerID);
-//
-// conn.start();
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = (TextMessage)cons.receive(1000);
-//
-// log.info("message is " + tm);
-//
-// assertNotNull(tm);
-//
-// assertEquals("message:" + i, tm.getText());
-// }
-// log.info("done");
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// try
-// {
-// conn.close();
-// }
-// catch (Exception e)
-// {
-// e.printStackTrace();
-// }
-// }
-// }
-//
-// }
+ public void testRoundRobinConnectionCreation() throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ log.info ("number of delegates = " + delegate.getDelegates().length);
+ log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
+
+ assertEquals(3, delegate.getDelegates().length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+
+ assertEquals(0, cf1.getServerId());
+
+ assertEquals(1, cf2.getServerId());
+
+ assertEquals(2, cf3.getServerId());
+
+ assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+
+ Connection conn3 = null;
+
+ Connection conn4 = null;
+
+ Connection conn5 = null;
+
+ try
+ {
+ conn1 = factory.createConnection();
+
+ conn2 = factory.createConnection();
+
+ conn3 = factory.createConnection();
+
+ conn4 = factory.createConnection();
+
+ conn5 = factory.createConnection();
+
+ ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
+
+ ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
+
+ ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
+
+ ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
+
+ ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
+
+ int serverID1 = state1.getServerID();
+
+ int serverID2 = state2.getServerID();
+
+ int serverID3 = state3.getServerID();
+
+ int serverID4 = state4.getServerID();
+
+ int serverID5 = state5.getServerID();
+
+ log.info("server id 1: " + serverID1);
+
+ log.info("server id 2: " + serverID2);
+
+ log.info("server id 3: " + serverID3);
+
+ log.info("server id 4: " + serverID4);
+
+ log.info("server id 5: " + serverID5);
+
+ assertEquals(0, serverID1);
+
+ assertEquals(1, serverID2);
+
+ assertEquals(2, serverID3);
+
+ assertEquals(0, serverID4);
+
+ assertEquals(1, serverID5);
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+
+ if (conn4 != null)
+ {
+ conn4.close();
+ }
+
+ if (conn5 != null)
+ {
+ conn5.close();
+ }
+ }
+
+ }
+
+ /*
+ * Test that the failover mapping is created correctly and updated properly when nodes leave
+ * or join
+ */
+ public void testDefaultFailoverMap() throws Exception
+ {
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+
+ //The order here depends on the order the servers were started in
+
+ //If any servers get stopped and then started then the order will change
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ log.info("cf3 serverid=" + cf3.getServerId());
+
+
+ assertEquals(0, cf1.getServerId());
+
+ assertEquals(1, cf2.getServerId());
+
+ assertEquals(2, cf3.getServerId());
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ assertEquals(3, delegates.length);
+
+ assertEquals(3, failoverMap.size());
+
+ // Default failover policy just chooses the node to the right
+
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+ }
+
+ //Now cleanly stop one of the servers
+
+ log.info("************** STOPPING SERVER 0");
+ ServerManagement.stop(0);
+
+ log.info("server stopped");
+
+ assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
+
+ {
+ //Lookup another connection factory
+
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+
+ log.info("Got connection factory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info("Got failover map");
+
+ assertEquals(2, delegates.length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ //Order here depends on order servers were started in
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ assertEquals(1, cf1.getServerId());
+
+ assertEquals(2, cf2.getServerId());
+
+
+ assertEquals(2, failoverMap.size());
+
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+ }
+
+ //Cleanly stop another server
+
+ log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
+
+ ServerManagement.stop(1);
+
+ assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
+
+ {
+ //Lookup another connection factory
+
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ assertEquals(1, delegates.length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ assertEquals(2, cf1.getServerId());
+
+
+ assertEquals(1, failoverMap.size());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+ }
+
+ //Restart server 0
+
+ ServerManagement.start("all", 0);
+
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ log.info("Got connection factory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info("Got failover map");
+
+ assertEquals(2, delegates.length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ assertEquals(2, cf1.getServerId());
+
+ assertEquals(0, cf2.getServerId());
+
+
+ assertEquals(2, failoverMap.size());
+
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+ }
+
+
+ //Restart server 1
+
+ ServerManagement.start("all", 1);
+
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+
+ log.info("Got connection factory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info("Got failover map");
+
+ assertEquals(3, delegates.length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ log.info("cf3 serverid=" + cf3.getServerId());
+
+ assertEquals(2, cf1.getServerId());
+
+ assertEquals(0, cf2.getServerId());
+
+ assertEquals(1, cf3.getServerId());
+
+
+ assertEquals(3, failoverMap.size());
+
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+ }
+ }
+ public void testSimpleFailover() throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+ assertEquals(3, nodeIDView.size());
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ ClientConnectionFactoryDelegate cf1 = delegates[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegates[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegates[2];
+
+ int server0Id = cf1.getServerId();
+
+ int server1Id = cf2.getServerId();
+
+ int server2Id = cf3.getServerId();
+
+ log.info("server 0 id: " + server0Id);
+
+ log.info("server 1 id: " + server1Id);
+
+ log.info("server 2 id: " + server2Id);
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info(failoverMap.get(new Integer(server0Id)));
+ log.info(failoverMap.get(new Integer(server1Id)));
+ log.info(failoverMap.get(new Integer(server2Id)));
+
+ int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+
+ // server 1 should failover onto server 2
+
+ assertEquals(server2Id, server1FailoverId);
+
+ Connection conn = null;
+
+ try
+ {
+
+ //Get a connection on server 1
+ conn = factory.createConnection(); //connection on server 0
+
+ conn.close();
+
+ conn = factory.createConnection(); //connection on server 1
+
+ JBossConnection jbc = (JBossConnection)conn;
+
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ int initialServerID = state.getServerID();
+
+ assertEquals(1, initialServerID);
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("message:" + i);
+
+ prod.send(tm);
+ }
+
+ //So now, messages should be in queue1 on server 1
+ //So we now kill server 1
+ //Which should cause transparent failover of connection conn onto server 1
+
+ log.info("************ KILLING (CRASHING) SERVER 1");
+
+ ServerManagement.kill(1);
+
+ log.info("killed server, now waiting");
+
+ Thread.sleep(5000);
+
+ log.info("done wait");
+
+ state = (ConnectionState)del.getState();
+
+ int finalServerID = state.getServerID();
+
+ log.info("final server id= " + finalServerID);
+
+ //server id should now be 2
+
+ assertEquals(2, finalServerID);
+
+ conn.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(1000);
+
+ log.info("message is " + tm);
+
+ assertNotNull(tm);
+
+ assertEquals("message:" + i, tm.getText());
+ }
+ log.info("done");
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }
+
public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
@@ -723,183 +723,183 @@
}
}
-//
-// public void testFailoverWithUnackedMessagesTransactional() throws Exception
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
-// assertEquals(3, nodeIDView.size());
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// ClientConnectionFactoryDelegate cf1 = delegates[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegates[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegates[2];
-//
-// int server0Id = cf1.getServerId();
-//
-// int server1Id = cf2.getServerId();
-//
-// int server2Id = cf3.getServerId();
-//
-// log.info("server 0 id: " + server0Id);
-//
-// log.info("server 1 id: " + server1Id);
-//
-// log.info("server 2 id: " + server2Id);
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info(failoverMap.get(new Integer(server0Id)));
-// log.info(failoverMap.get(new Integer(server1Id)));
-// log.info(failoverMap.get(new Integer(server2Id)));
-//
-// int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
-//
-// // server 1 should failover onto server 2
-//
-// assertEquals(server2Id, server1FailoverId);
-//
-// Connection conn = null;
-//
-// try
-// {
-// //Get a connection on server 1
-// conn = factory.createConnection(); //connection on server 0
-//
-// conn.close();
-//
-// conn = factory.createConnection(); //connection on server 1
-//
-// JBossConnection jbc = (JBossConnection)conn;
-//
-// ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
-//
-// ConnectionState state = (ConnectionState)del.getState();
-//
-// int initialServerID = state.getServerID();
-//
-// assertEquals(1, initialServerID);
-//
-// Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
-//
-// MessageProducer prod = sess.createProducer(queue1);
-//
-// MessageConsumer cons = sess.createConsumer(queue1);
-//
-// final int NUM_MESSAGES = 100;
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = sess.createTextMessage("message:" + i);
-//
-// prod.send(tm);
-// }
-//
-// sess.commit();
-//
-// conn.start();
-//
-// //Now consume half of the messages but don't commit them these will end up in
-// //client side resource manager
-//
-// for (int i = 0; i < NUM_MESSAGES / 2; i++)
-// {
-// TextMessage tm = (TextMessage)cons.receive(500);
-//
-// assertNotNull(tm);
-//
-// assertEquals("message:" + i, tm.getText());
-// }
-//
-// //So now, messages should be in queue1 on server 1
-// //So we now kill server 1
-// //Which should cause transparent failover of connection conn onto server 1
-//
-// log.info("************ KILLING (CRASHING) SERVER 1");
-//
-// ServerManagement.kill(1);
-//
-// log.info("killed server, now waiting");
-//
-// Thread.sleep(5000);
-//
-// log.info("done wait");
-//
-// state = (ConnectionState)del.getState();
-//
-// int finalServerID = state.getServerID();
-//
-// log.info("final server id= " + finalServerID);
-//
-// //server id should now be 2
-//
-// assertEquals(2, finalServerID);
-//
-// conn.start();
-//
-// //Now should be able to consume the rest of the messages
-//
-// log.info("here1");
-//
-// TextMessage tm = null;
-//
-// for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
-// {
-// tm = (TextMessage)cons.receive(500);
-//
-// log.info("message is " + tm.getText());
-//
-// assertNotNull(tm);
-//
-// assertEquals("message:" + i, tm.getText());
-// }
-//
-// log.info("here2");
-//
-// //Now should be able to commit them
-//
-// sess.commit();
-//
-// //Now check there are no more messages there
-// sess.close();
-//
-// sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// cons = sess.createConsumer(queue1);
-//
-// Message m = cons.receive(500);
-//
-// assertNull(m);
-//
-// log.info("got to end of test");
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// try
-// {
-// conn.close();
-// }
-// catch (Exception e)
-// {
-// e.printStackTrace();
-// }
-// }
-// }
-//
-// }
-//
+ public void testFailoverWithUnackedMessagesTransactional() throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+ assertEquals(3, nodeIDView.size());
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ ClientConnectionFactoryDelegate cf1 = delegates[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegates[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegates[2];
+
+ int server0Id = cf1.getServerId();
+
+ int server1Id = cf2.getServerId();
+
+ int server2Id = cf3.getServerId();
+
+ log.info("server 0 id: " + server0Id);
+
+ log.info("server 1 id: " + server1Id);
+
+ log.info("server 2 id: " + server2Id);
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info(failoverMap.get(new Integer(server0Id)));
+ log.info(failoverMap.get(new Integer(server1Id)));
+ log.info(failoverMap.get(new Integer(server2Id)));
+
+ int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+
+ // server 1 should failover onto server 2
+
+ assertEquals(server2Id, server1FailoverId);
+
+ Connection conn = null;
+
+ try
+ {
+ //Get a connection on server 1
+ conn = factory.createConnection(); //connection on server 0
+
+ conn.close();
+
+ conn = factory.createConnection(); //connection on server 1
+
+ JBossConnection jbc = (JBossConnection)conn;
+
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ int initialServerID = state.getServerID();
+
+ assertEquals(1, initialServerID);
+
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("message:" + i);
+
+ prod.send(tm);
+ }
+
+ sess.commit();
+
+ conn.start();
+
+ //Now consume half of the messages but don't commit them these will end up in
+ //client side resource manager
+
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(500);
+
+ assertNotNull(tm);
+
+ assertEquals("message:" + i, tm.getText());
+ }
+
+ //So now, messages should be in queue1 on server 1
+ //So we now kill server 1
+ //Which should cause transparent failover of connection conn onto server 1
+
+ log.info("************ KILLING (CRASHING) SERVER 1");
+
+ ServerManagement.kill(1);
+
+ log.info("killed server, now waiting");
+
+ Thread.sleep(5000);
+
+ log.info("done wait");
+
+ state = (ConnectionState)del.getState();
+
+ int finalServerID = state.getServerID();
+
+ log.info("final server id= " + finalServerID);
+
+ //server id should now be 2
+
+ assertEquals(2, finalServerID);
+
+ conn.start();
+
+ //Now should be able to consume the rest of the messages
+
+ log.info("here1");
+
+ TextMessage tm = null;
+
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+ {
+ tm = (TextMessage)cons.receive(500);
+
+ log.info("message is " + tm.getText());
+
+ assertNotNull(tm);
+
+ assertEquals("message:" + i, tm.getText());
+ }
+
+ log.info("here2");
+
+ //Now should be able to commit them
+
+ sess.commit();
+
+ //Now check there are no more messages there
+ sess.close();
+
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ cons = sess.createConsumer(queue1);
+
+ Message m = cons.receive(500);
+
+ assertNull(m);
+
+ log.info("got to end of test");
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }
+
+
// public void testEvenSimplerFailover() throws Exception
// {
// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
More information about the jboss-cvs-commits
mailing list