[jboss-cvs] JBoss Messaging SVN: r1751 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/tools/jmx/rmi
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Dec 9 18:29:11 EST 2006
Author: timfox
Date: 2006-12-09 18:28:59 -0500 (Sat, 09 Dec 2006)
New Revision: 1751
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
Log:
More progress on failover
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-09 23:28:59 UTC (rev 1751)
@@ -130,23 +130,17 @@
if (delegates == null)
{
log.info("Looking for delegates");
- SimpleMetaData metaData = invocation.getMetaData();
- //delegates = (ClientConnectionFactoryDelegate[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_DELEGATES);
-
MethodInvocation methodInvoke = (MethodInvocation)invocation;
- //delegates = (ClientConnectionFactoryDelegate[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_DELEGATES);
-
- // this is a hack, but I couldn't get this working through metaData
// TODO: FIX THIS! metaData should contain CF_DELEGATES
Object target = methodInvoke.getTargetObject();
+
if (target instanceof ClusteredClientConnectionFactoryDelegate)
{
delegates = ((ClusteredClientConnectionFactoryDelegate)target).getDelegates();
}
-
if (delegates != null)
{
//TODO: Fix this! metadata should contain CF_FAILOVER_INDEXES
@@ -191,6 +185,12 @@
//TODO implement client side valve to prevent invocations occurring whilst failover is occurring
ConnectionState state = (ConnectionState)((DelegateSupport)failedConnection).getState();
+
+ log.info("*** about to failover - waiting for server to finish");
+
+ //FIXME - this is only temporarily necessary since we need to give the server enough time to
+ //failover before we reconnect - this would be handled by the server side valve
+ Thread.sleep(10000);
log.info("Creating new connection");
ClientConnectionDelegate newConnection = createConnection(newCF, state.getUser(), state.getPassword());
@@ -357,6 +357,8 @@
int oldServerID)
throws JMSException
{
+ log.info("Failing over consumer");
+
ClientConsumerDelegate failedConsumerDelegate =
(ClientConsumerDelegate)failedConsumerState.getDelegate();
@@ -379,10 +381,17 @@
ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
failedConsumerState.copy(newState);
- failedConnectionState.getResourceManager().
- handleFailover(failedSessionState.getCurrentTxId(),
- oldConsumerID,
- failedConsumerState.getConsumerID());
+ if (failedSessionState.isTransacted())
+ {
+ //Replace the old consumer id with the new consumer id
+
+ //TODO what about XA?? - may have done work in many transactions - so need to replace all
+
+ failedConnectionState.getResourceManager().
+ handleFailover(failedSessionState.getCurrentTxId(),
+ oldConsumerID,
+ failedConsumerState.getConsumerID());
+ }
CallbackManager cm = failedConnectionState.getRemotingConnection().getCallbackManager();
@@ -392,7 +401,10 @@
cm.registerHandler(failedConnectionState.getServerID(),
failedConsumerState.getConsumerID(),
handler);
+
failedSessionState.addCallbackHandler(handler);
+
+ log.info("failed over consumer");
}
@@ -444,11 +456,6 @@
{
log.info("********* EXCEPTION DETECTED");
- log.info("handleConnectionException: ", throwable);
-
-
- log.info("Caught connection exception for connection: " + connection);
-
handleFailure(connection);
}
catch (Throwable e)
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-12-09 23:28:59 UTC (rev 1751)
@@ -149,11 +149,6 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- public int getFailoverNode(int node) throws JMSException
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
// Public --------------------------------------------------------
public synchronized Object invoke(Invocation invocation) throws Throwable
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java 2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java 2006-12-09 23:28:59 UTC (rev 1751)
@@ -43,9 +43,5 @@
byte[] getClientAOPConfig() throws JMSException;
IdBlock getIdBlock(int size) throws JMSException;
-
- /** Return the node that should take over a given node */
- int getFailoverNode(int node) throws JMSException;
-
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2006-12-09 23:28:59 UTC (rev 1751)
@@ -137,24 +137,6 @@
}
}
-
- public int getFailoverNode(int node) throws JMSException
- {
- try
- {
- //ServerPeer peer = (ServerPeer )serverPeer.getInstance();
- //return peer.getReplicator().getFailoverNodeID(node);
-
- // not implemented yet
- return node;
- }
- catch (Throwable t)
- {
- throw ExceptionUtil.handleJMSInvocation(t, this + " getFailoverNode");
- }
- }
-
-
public byte[] getClientAOPConfig() throws JMSException
{
try
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2006-12-09 23:28:59 UTC (rev 1751)
@@ -73,11 +73,6 @@
return endpoint.getIdBlock(size);
}
- public int getFailoverNode(int node) throws JMSException
- {
- return endpoint.getFailoverNode(node);
- }
-
// AdvisedSupport override ---------------------------------------
public Object getEndpoint()
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java 2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/ResourceManager.java 2006-12-09 23:28:59 UTC (rev 1751)
@@ -121,14 +121,15 @@
}
/**
- * Navigate on ACK and change clientIDs on every ACK not sent yet.
+ * Navigate on ACK and change consumer ids on every ACK not sent yet.
*/
- public void handleFailover(Object xid, int oldClientId, int newClientId)
+ public void handleFailover(Object xid, int oldConsumerID, int newConsumerID)
{
- if (trace) { log.trace("handleFailover:: Transfering clientIds on ACKs from " + oldClientId + " to " + newClientId); }
+ if (trace) { log.trace("handleFailover:: Transfering consumer id on ACKs from " + oldConsumerID + " to " + newConsumerID); }
TxState tx = getTx(xid);
- tx.handleFailover(oldClientId, newClientId);
+
+ tx.handleFailover(oldConsumerID, newConsumerID);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java 2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/tx/TxState.java 2006-12-09 23:28:59 UTC (rev 1751)
@@ -97,15 +97,16 @@
}
- /** Navigate on ACK and change clientIDs on every ACK not sent yet */
- public void handleFailover(int oldClientId, int newClientId)
+ /** Navigate on ACK and change consumer ids on every ACK not sent yet */
+ public void handleFailover(int oldConsumerID, int newConsumerID)
{
- for (Iterator ackIterator = acks.iterator();ackIterator.hasNext();)
+ for (Iterator ackIterator = acks.iterator(); ackIterator.hasNext(); )
{
AckInfo ackInfo = (AckInfo)ackIterator.next();
- if (ackInfo.getConsumerID()==oldClientId)
+
+ if (ackInfo.getConsumerID() == oldConsumerID)
{
- ackInfo.setConsumerID(newClientId);
+ ackInfo.setConsumerID(newConsumerID);
}
}
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2006-12-09 23:28:59 UTC (rev 1751)
@@ -235,7 +235,7 @@
//Load the unpaged references
InitialLoadInfo ili = pm.getInitialReferenceInfos(channelID, fullSize);
-
+
if (ili.getMaxPageOrdering() != null)
{
firstPagingOrder = ili.getMinPageOrdering().longValue();
@@ -248,67 +248,25 @@
{
firstPagingOrder = nextPagingOrder = 0;
}
-
- pushReferences(ili);
-
- //Maybe we need to load some paged refs
+ log.info("Channel " + this.channelID + " loading " + ili.getRefInfos().size() + " references");
+
+ Map refMap = processReferences(ili.getRefInfos());
+
+ Iterator iter = ili.getRefInfos().iterator();
+ while (iter.hasNext())
+ {
+ ReferenceInfo info = (ReferenceInfo)iter.next();
+
+ addFromRefInfo(info, refMap);
+ }
+
+ //Maybe we need to load some paged refs
+
while (checkLoad()) {}
}
}
-
- /** We extracted this as a method fro mload, as transferChannel (for HA recovery) also needs the same routine. */
- private Map pushReferences(InitialLoadInfo ili) throws Exception {
- Map refMap = processReferences(ili.getRefInfos());
-
- Iterator iter = ili.getRefInfos().iterator();
- while (iter.hasNext())
- {
- ReferenceInfo info = (ReferenceInfo)iter.next();
-
- addFromRefInfo(info, refMap);
- }
- return refMap;
- }
-
- /** Transfer messages for an old channel to a new channel.
- * This is used during HA failoever when a connection fail and messages will need to be transfered to a new node */
- public void transferChannel(long oldchannelID) throws Exception
- {
- log.info("Transfering state from " + oldchannelID +" into " + this.getChannelID());
- synchronized (refLock)
- {
- while(true)
- {
- InitialLoadInfo ili =pm.getInitialReferenceInfos(oldchannelID,fullSize);
- if (ili.getRefInfos().size()==0)
- {
- break;
- }
-
- log.info("got " + ili.getRefInfos().size() + " references to move");
-
-
-
- Map refMap = pushReferences(ili);
- Iterator referencesIterator = ili.getRefInfos().iterator();
- while (referencesIterator.hasNext())
- {
- ReferenceInfo info = (ReferenceInfo)referencesIterator.next();
- log.info("transfering reference " + info.getMessageId() + " from " + oldchannelID + " into " + this.getChannelID());
- MessageReference messageReference = (MessageReference )refMap.get(new Long(info.getMessageId()));
-
- ///// BIG TODOS:
- ///// What to do with transaction here?
- ///// Do we need to remove from old channel? (Consider the case of the Old Server coming back... I guess we should.. bu we have to check this)
- pm.addReference(this.getChannelID(),messageReference,null);
- pm.removeReference(oldchannelID,messageReference, null);
- }
- }
- }
- log.info("transfer state done");
- }
-
+
public void unload() throws Exception
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-09 23:28:59 UTC (rev 1751)
@@ -1357,8 +1357,6 @@
}
}
}
-
-
/**
* This method fails over all the queues from node <nodeId> onto this node
@@ -1424,6 +1422,8 @@
namesToRemove.add(entry);
}
+ log.info("Deleting " + namesToRemove.size() + " bindings from old node");
+
for (Iterator iterNames = namesToRemove.iterator(); iterNames.hasNext();)
{
Map.Entry entry = (Map.Entry)iterNames.next();
@@ -1439,6 +1439,8 @@
//Then deleted from the database
this.deleteBinding(nodeId, queueName);
+
+ log.info("deleted binding for " + queueName);
//Then an unbind request is sent - this cause other nodes to also remove it from the in memory
//condition and name maps
@@ -1461,11 +1463,17 @@
{
log.info("The current node didn't have a queue " + queueName + " so it's assuming the queue as a regular queue");
}
+ else
+ {
+ log.info("There is already a queue with that name so adding to failed map");
+ }
//Create a new binding
Binding newBinding = this.createBinding(this.currentNodeId, binding.getCondition(),
stub.getName(), stub.getChannelID(),
stub.getFilter(), stub.isRecoverable(), failed);
+
+ log.info("Created new binding");
//Insert it into the database
insertBinding(newBinding);
@@ -1476,6 +1484,8 @@
clusteredQueue.load();
clusteredQueue.activate();
+ log.info("Loaded queue");
+
//Add the new binding in memory
addBinding(newBinding);
@@ -2223,11 +2233,11 @@
//Need to evaluate this before we regenerate the failover map
boolean isFailover = isFailoverNodeForNode(theNodeId.intValue());
+
+ log.info("Am I failover node for node " + theNodeId + "? " + isFailover);
+
+ log.info("Crashed: " + crashed);
- //Now we recalculate the failover mapping - this needs to be done before removeDataForNode is called
- //since that may cause connection factories to be rebound
- //generateFailoverMap(currentView);
-
//Remove any replicant data and non durable bindings for the node - again we need to do this
//irrespective of whether we crashed
//This will notify any listeners which will recalculate the connection factory delegates and failover delegates
@@ -2341,8 +2351,6 @@
public void viewAccepted(View newView)
{
- //if (trace) { log.trace(DefaultClusteredPostOffice.this + " got new view: " + newView
- // + DefaultClusteredPostOffice.this.getOfficeName()); }
//TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
//TODO: can't we do the same since this is pretty useful?
log.info(currentNodeId + " got new view: " + newView + " postOffice:"
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-09 23:28:59 UTC (rev 1751)
@@ -74,349 +74,349 @@
* Test that connections created using a clustered connection factory are created round robin on
* different servers
*/
-// public void testRoundRobinConnectionCreation() throws Exception
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// log.info ("number of delegates = " + delegate.getDelegates().length);
-// log.info ("number of servers = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
-//
-// assertEquals(3, delegate.getDelegates().length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//
-// assertEquals(0, cf1.getServerId());
-//
-// assertEquals(1, cf2.getServerId());
-//
-// assertEquals(2, cf3.getServerId());
-//
-// assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
-//
-// Connection conn1 = null;
-//
-// Connection conn2 = null;
-//
-// Connection conn3 = null;
-//
-// Connection conn4 = null;
-//
-// Connection conn5 = null;
-//
-// try
-// {
-// conn1 = factory.createConnection();
-//
-// conn2 = factory.createConnection();
-//
-// conn3 = factory.createConnection();
-//
-// conn4 = factory.createConnection();
-//
-// conn5 = factory.createConnection();
-//
-// ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
-//
-// ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
-//
-// ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
-//
-// ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
-//
-// ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
-//
-// int serverID1 = state1.getServerID();
-//
-// int serverID2 = state2.getServerID();
-//
-// int serverID3 = state3.getServerID();
-//
-// int serverID4 = state4.getServerID();
-//
-// int serverID5 = state5.getServerID();
-//
-// log.info("server id 1: " + serverID1);
-//
-// log.info("server id 2: " + serverID2);
-//
-// log.info("server id 3: " + serverID3);
-//
-// log.info("server id 4: " + serverID4);
-//
-// log.info("server id 5: " + serverID5);
-//
-// assertEquals(0, serverID1);
-//
-// assertEquals(1, serverID2);
-//
-// assertEquals(2, serverID3);
-//
-// assertEquals(0, serverID4);
-//
-// assertEquals(1, serverID5);
-// }
-// finally
-// {
-// if (conn1 != null)
-// {
-// conn1.close();
-// }
-//
-// if (conn2 != null)
-// {
-// conn2.close();
-// }
-//
-// if (conn3 != null)
-// {
-// conn3.close();
-// }
-//
-// if (conn4 != null)
-// {
-// conn4.close();
-// }
-//
-// if (conn5 != null)
-// {
-// conn5.close();
-// }
-// }
-//
-// }
-//
-// /*
-// * Test that the failover mapping is created correctly and updated properly when nodes leave
-// * or join
-// */
-// public void testDefaultFailoverMap() throws Exception
-// {
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//
-// //The order here depends on the order the servers were started in
-//
-// //If any servers get stopped and then started then the order will change
-//
-// log.info("cf1 serverid=" + cf1.getServerId());
-//
-// log.info("cf2 serverid=" + cf2.getServerId());
-//
-// log.info("cf3 serverid=" + cf3.getServerId());
-//
-//
-// assertEquals(0, cf1.getServerId());
-//
-// assertEquals(1, cf2.getServerId());
-//
-// assertEquals(2, cf3.getServerId());
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// assertEquals(3, delegates.length);
-//
-// assertEquals(3, failoverMap.size());
-//
-// // Default failover policy just chooses the node to the right
-//
-// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//
-// assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-// }
-//
-// //Now cleanly stop one of the servers
-//
-//
-// log.info("************** STOPPING SERVER 0");
-// ServerManagement.stop(0, true);
-//
-// log.info("server stopped");
-//
-// assertEquals(2, ServerManagement.getServer(1).getNumberOfNodesOnCluster());
-//
-// {
-// //Lookup another connection factory
-//
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-//
-// log.info("Got connection factory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info("Got failover map");
-//
-// assertEquals(2, delegates.length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// //Order here depends on order servers were started in
-//
-// log.info("cf1 serverid=" + cf1.getServerId());
-//
-// log.info("cf2 serverid=" + cf2.getServerId());
-//
-// assertEquals(1, cf1.getServerId());
-//
-// assertEquals(2, cf2.getServerId());
-//
-//
-// assertEquals(2, failoverMap.size());
-//
-// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-// }
-//
-// //Cleanly stop another server
-//
-// log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
-//
-// ServerManagement.stop(1, true);
-//
-// assertEquals(1, ServerManagement.getServer(2).getNumberOfNodesOnCluster());
-//
-// {
-// //Lookup another connection factory
-//
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// assertEquals(1, delegates.length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// assertEquals(2, cf1.getServerId());
-//
-//
-// assertEquals(1, failoverMap.size());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-// }
-//
-// //Restart server 0
-//
-// ServerManagement.start("all", 0);
-//
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//
-// log.info("Got connection factory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info("Got failover map");
-//
-// assertEquals(2, delegates.length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// log.info("cf1 serverid=" + cf1.getServerId());
-//
-// log.info("cf2 serverid=" + cf2.getServerId());
-//
-// assertEquals(2, cf1.getServerId());
-//
-// assertEquals(0, cf2.getServerId());
-//
-//
-// assertEquals(2, failoverMap.size());
-//
-// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-// }
-//
-//
-// //Restart server 1
-//
-// ServerManagement.start("all", 1);
-//
-// {
-// JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-//
-// log.info("Got connection factory");
-//
-// ClusteredClientConnectionFactoryDelegate delegate =
-// (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-// ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//
-// Map failoverMap = delegate.getFailoverMap();
-//
-// log.info("Got failover map");
-//
-// assertEquals(3, delegates.length);
-//
-// ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//
-// ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//
-// ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//
-// log.info("cf1 serverid=" + cf1.getServerId());
-//
-// log.info("cf2 serverid=" + cf2.getServerId());
-//
-// log.info("cf3 serverid=" + cf3.getServerId());
-//
-// assertEquals(2, cf1.getServerId());
-//
-// assertEquals(0, cf2.getServerId());
-//
-// assertEquals(1, cf3.getServerId());
-//
-//
-// assertEquals(3, failoverMap.size());
-//
-// assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//
-// assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//
-// assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-// }
-// }
+ public void testRoundRobinConnectionCreation() throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ log.info ("number of delegates = " + delegate.getDelegates().length);
+ log.info ("number of servers = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+
+ assertEquals(3, delegate.getDelegates().length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+
+ assertEquals(0, cf1.getServerId());
+
+ assertEquals(1, cf2.getServerId());
+
+ assertEquals(2, cf3.getServerId());
+
+ assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+
+ Connection conn1 = null;
+
+ Connection conn2 = null;
+
+ Connection conn3 = null;
+
+ Connection conn4 = null;
+
+ Connection conn5 = null;
+
+ try
+ {
+ conn1 = factory.createConnection();
+
+ conn2 = factory.createConnection();
+
+ conn3 = factory.createConnection();
+
+ conn4 = factory.createConnection();
+
+ conn5 = factory.createConnection();
+
+ ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
+
+ ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
+
+ ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
+
+ ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
+
+ ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
+
+ int serverID1 = state1.getServerID();
+
+ int serverID2 = state2.getServerID();
+
+ int serverID3 = state3.getServerID();
+
+ int serverID4 = state4.getServerID();
+
+ int serverID5 = state5.getServerID();
+
+ log.info("server id 1: " + serverID1);
+
+ log.info("server id 2: " + serverID2);
+
+ log.info("server id 3: " + serverID3);
+
+ log.info("server id 4: " + serverID4);
+
+ log.info("server id 5: " + serverID5);
+
+ assertEquals(0, serverID1);
+
+ assertEquals(1, serverID2);
+
+ assertEquals(2, serverID3);
+
+ assertEquals(0, serverID4);
+
+ assertEquals(1, serverID5);
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+
+ if (conn4 != null)
+ {
+ conn4.close();
+ }
+
+ if (conn5 != null)
+ {
+ conn5.close();
+ }
+ }
+
+ }
+
+ /*
+ * Test that the failover mapping is created correctly and updated properly when nodes leave
+ * or join
+ */
+ public void testDefaultFailoverMap() throws Exception
+ {
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+
+ //The order here depends on the order the servers were started in
+
+ //If any servers get stopped and then started then the order will change
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ log.info("cf3 serverid=" + cf3.getServerId());
+
+
+ assertEquals(0, cf1.getServerId());
+
+ assertEquals(1, cf2.getServerId());
+
+ assertEquals(2, cf3.getServerId());
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ assertEquals(3, delegates.length);
+
+ assertEquals(3, failoverMap.size());
+
+ // Default failover policy just chooses the node to the right
+
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+ }
+
+ //Now cleanly stop one of the servers
+
+
+ log.info("************** STOPPING SERVER 0");
+ ServerManagement.stop(0, true);
+
+ log.info("server stopped");
+
+ assertEquals(2, ServerManagement.getServer(1).getNumberOfNodesOnCluster());
+
+ {
+ //Lookup another connection factory
+
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+
+ log.info("Got connection factory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info("Got failover map");
+
+ assertEquals(2, delegates.length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ //Order here depends on order servers were started in
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ assertEquals(1, cf1.getServerId());
+
+ assertEquals(2, cf2.getServerId());
+
+
+ assertEquals(2, failoverMap.size());
+
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+ }
+
+ //Cleanly stop another server
+
+ log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
+
+ ServerManagement.stop(1, true);
+
+ assertEquals(1, ServerManagement.getServer(2).getNumberOfNodesOnCluster());
+
+ {
+ //Lookup another connection factory
+
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ assertEquals(1, delegates.length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ assertEquals(2, cf1.getServerId());
+
+
+ assertEquals(1, failoverMap.size());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+ }
+
+ //Restart server 0
+
+ ServerManagement.start("all", 0);
+
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ log.info("Got connection factory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info("Got failover map");
+
+ assertEquals(2, delegates.length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ assertEquals(2, cf1.getServerId());
+
+ assertEquals(0, cf2.getServerId());
+
+
+ assertEquals(2, failoverMap.size());
+
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+ }
+
+
+ //Restart server 1
+
+ ServerManagement.start("all", 1);
+
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+
+ log.info("Got connection factory");
+
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+
+ Map failoverMap = delegate.getFailoverMap();
+
+ log.info("Got failover map");
+
+ assertEquals(3, delegates.length);
+
+ ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+
+ ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+
+ ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ log.info("cf3 serverid=" + cf3.getServerId());
+
+ assertEquals(2, cf1.getServerId());
+
+ assertEquals(0, cf2.getServerId());
+
+ assertEquals(1, cf3.getServerId());
+
+
+ assertEquals(3, failoverMap.size());
+
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+ }
+ }
public void testSimpleFailover() throws Exception
{
@@ -463,13 +463,13 @@
int initialServerID = state.getServerID();
assertEquals(0, initialServerID);
-
-
-
+
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(queue0);
+ MessageConsumer cons = sess.createConsumer(queue0);
+
final int NUM_MESSAGES = 100;
for (int i = 0; i < NUM_MESSAGES; i++)
@@ -489,7 +489,7 @@
log.info("killed server, now waiting");
- Thread.sleep(30000);
+ Thread.sleep(25000);
log.info("done wait");
@@ -505,10 +505,6 @@
log.info("here 2");
- //I should now be able to create a consumer on the same connection and consume the messages
-
- MessageConsumer cons = sess.createConsumer(queue0);
-
conn.start();
log.info("here 3");
@@ -517,6 +513,8 @@
{
TextMessage tm = (TextMessage)cons.receive(1000);
+ log.info("message is " + tm);
+
assertNotNull(tm);
assertEquals("message:" + i, tm.getText());
@@ -533,14 +531,51 @@
}
catch (Exception e)
{
- //Ignore
+ e.printStackTrace();
}
}
}
}
+ public void testEvenSimplerFailover() throws Exception
+ {
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+
+ Connection conn = null;
+
+ try
+ {
+ conn = factory.createConnection();
+
+ log.info("************ KILLING (CRASHING) SERVER 0");
+
+ ServerManagement.getServer(0).destroy();
+
+ log.info("killed server, now waiting");
+
+ Thread.sleep(25000);
+
+ log.info("done wait");
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }
+
// public void testConnectionFactoryConnect() throws Exception
// {
// try
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java 2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java 2006-12-09 23:28:59 UTC (rev 1751)
@@ -106,8 +106,6 @@
ServerManagement.getServer(0).destroy();
}
-
-
public void testDistributedTopic() throws Exception
{
Connection conn = null;
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2006-12-09 20:58:24 UTC (rev 1750)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2006-12-09 23:28:59 UTC (rev 1751)
@@ -222,15 +222,10 @@
public synchronized void destroy() throws Exception
{
- server.destroy();
-
- registry.unbind(RMI_SERVER_PREFIX + index);
- registry.unbind(NAMING_SERVER_PREFIX + index);
+ //Kill the server without doing any graceful shutdown
- // Now shutdown the process. The registry will be taken out with the last RMI server standing
+ //For graceful shutdown use stop()
- //TODO - we should shutdown cleanly - let main() exit - not kill the process
-
new Thread(new VMKiller(), "VM Killer").start();
}
More information about the jboss-cvs-commits
mailing list