[jboss-cvs] JBoss Messaging SVN: r1710 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 5 21:07:11 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-12-05 21:07:06 -0500 (Tue, 05 Dec 2006)
New Revision: 1710
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Fixing ClusteredClientConnectionFactoryDelegate and other stuff
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-05 18:38:08 UTC (rev 1709)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-06 02:07:06 UTC (rev 1710)
@@ -73,7 +73,10 @@
for (int i = 0; i < delegates.length; i++)
{
- delegates[i].init();
+ if (this!=delegates[i])
+ {
+ delegates[i].init();
+ }
}
// We add this to the meta data so the failOver aspect can get access to it
@@ -102,6 +105,11 @@
return delegates;
}
+ public String toString()
+ {
+ return "ClusteredClientConnectionFactoryDelegate[" + id + "] with delegates.length = " + (delegates==null?"null":Integer.toString(delegates.length));
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-05 18:38:08 UTC (rev 1709)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-06 02:07:06 UTC (rev 1710)
@@ -119,8 +119,6 @@
ClientConnectionFactoryDelegate delegate = null;
- boolean replicateChanges = false;
-
if (clustered)
{
setupReplicator();
@@ -138,25 +136,23 @@
//Local
delegate = new ClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
}
-
+
+ log.trace("Adding delegates factory " + uniqueName + " pointing to " + delegate);
delegates.put(uniqueName, delegate);
-
+
//Now bind it in JNDI
rebindConnectionFactory(initialContext, jndiBindings, delegate);
- if (replicateChanges)
- {
- // We are clustered, we need to propagate the local delegate across the cluster.
+ // We are clustered, we need to propagate the local delegate across the cluster.
- // ... and then update it (and the clustered delegate on every node, while we're at it) by
- // replicating the local delegate across the cluster. This way, the clustred delegates on each
- // node will contain an updated list of local delegates, and all ConnectionFactoryJNDIMapper
- // will rebind updated ConnectionFactories in JNDI.
- // This will update the local node too
+ // ... and then update it (and the clustered delegate on every node, while we're at it) by
+ // replicating the local delegate across the cluster. This way, the clustred delegates on each
+ // node will contain an updated list of local delegates, and all ConnectionFactoryJNDIMapper
+ // will rebind updated ConnectionFactories in JNDI.
+ // This will update the local node too
- replicator.put(CF_PREFIX + uniqueName, delegate);
- }
-
+ replicator.put(CF_PREFIX + uniqueName, delegate);
+
//Registering with the dispatcher should always be the last thing otherwise a client could use
//a partially initialised object
JMSDispatcher.instance.registerTarget(new Integer(id), new ConnectionFactoryAdvised(endpoint));
@@ -164,6 +160,7 @@
public synchronized void unregisterConnectionFactory(String uniqueName, boolean clustered) throws Exception
{
+ log.trace("ConnectionFactory " + uniqueName + " being unregistered");
ServerConnectionFactoryEndpoint endpoint =
(ServerConnectionFactoryEndpoint)endpoints.remove(uniqueName);
@@ -184,7 +181,7 @@
log.debug(jndiName + " unregistered");
}
}
-
+
ClientConnectionFactoryDelegate delegate =
(ClientConnectionFactoryDelegate)delegates.remove(uniqueName);
@@ -248,6 +245,7 @@
public synchronized void onReplicationChange(Serializable key, Map updatedReplicantMap)
{
+ log.info("Got replication call " + key + " replicants=" + updatedReplicantMap);
try
{
// The list of connection factories across the cluster has changed, so we need to update
@@ -266,6 +264,12 @@
if (clusteredDelegate == null)
{
+ log.info("Delegates list: delegates.length=" + delegates.size());
+ for (Iterator iterDelegates = delegates.entrySet().iterator(); iterDelegates.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry)iterDelegates.next();
+ log.info("key=" + entry.getKey() + ", value=" + entry.getValue());
+ }
throw new IllegalStateException("Cannot find connection delegate for " +
uniqueName + " to update");
}
@@ -315,9 +319,11 @@
/**
* @param localDelegates - Map<Integer(nodeId) - ClientConnectionFactoryDelegate>
*/
- private void setFailoverDelegates(ClusteredClientConnectionFactoryDelegate clusteredDelegate,
+ private void setFailoverDelegates(ClusteredClientConnectionFactoryDelegate clusteredDelegate,
Map localDelegates)
{
+ //TODO: make it trace after the code is stable
+ log.info("Updating FailoverDelegates " + localDelegates.size() + " on serverPeer:" + serverPeer.getServerPeerID());
int s = localDelegates.size();
int[] nodesIDs = new int[s];
@@ -360,7 +366,7 @@
if (failoverIndex == -1)
{
- throw new IllegalStateException("Failover node is not in list of nodes!");
+ throw new IllegalStateException("Failover node is not in list of nodes at node " + serverPeer.getServerPeerID() + "!");
}
failoverIndexes[i] = failoverIndex;
@@ -382,6 +388,7 @@
for(Iterator i = jndiNames.iterator(); i.hasNext(); )
{
String jndiName = (String)i.next();
+ log.info("Rebinding " + jndiName + " CF=" + cf );
JNDIUtil.rebind(ic, jndiName, cf);
}
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-05 18:38:08 UTC (rev 1709)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-06 02:07:06 UTC (rev 1710)
@@ -21,6 +21,7 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -39,10 +40,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
-
import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
@@ -75,8 +74,6 @@
import org.jgroups.blocks.RequestHandler;
import org.w3c.dom.Element;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
*
* A DefaultClusteredPostOffice
@@ -1306,7 +1303,7 @@
/**
- * Verifies changes on the View deciding if a node joined or lefted the cluster
+ * Verifies changes on the View deciding if a node joined or left the cluster
*
* */
private void verifyMembership(View oldView, View newView) throws Throwable
@@ -1858,7 +1855,7 @@
// Need to trigger listeners
notifyListeners(key, replicants);
- }
+ }
}
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java 2006-12-05 18:38:08 UTC (rev 1709)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java 2006-12-06 02:07:06 UTC (rev 1710)
@@ -117,7 +117,7 @@
private void startServer(int peer)
throws Exception
{
- ServerManagement.start("all-cluster", peer);
+ ServerManagement.start("all", peer);
ServerManagement.deployClusteredQueue("testDistributedQueue", peer);
ServerManagement.deployClusteredTopic("testDistributedTopic", peer);
}
@@ -128,12 +128,15 @@
{
super.tearDown();
+ ServerManagement.log(ServerManagement.INFO,"Undeploying Server 0");
ServerManagement.undeployQueue("testDistributedQueue", 0);
ServerManagement.undeployTopic("testDistributedTopic", 0);
+ ServerManagement.log(ServerManagement.INFO,"Undeploying Server 1");
ServerManagement.undeployQueue("testDistributedQueue", 1);
ServerManagement.undeployTopic("testDistributedTopic", 1);
+ ServerManagement.log(ServerManagement.INFO,"Undeploying Server 2");
ServerManagement.undeployQueue("testDistributedQueue", 2);
ServerManagement.undeployTopic("testDistributedTopic", 2);
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-05 18:38:08 UTC (rev 1709)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-06 02:07:06 UTC (rev 1710)
@@ -24,6 +24,8 @@
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
import org.jboss.jms.message.MessageProxy;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -51,21 +53,35 @@
// Public --------------------------------------------------------
- public void testLeaveFailover() throws Exception
+ public void testConnectionFactoryConnect() throws Exception
{
try
{
- System.out.println("Number of nodes on Cluster = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
- assertEquals(3,ServerManagement.getServer(0).getNumberOfNodesOnCluster());
- assertEquals(3,ServerManagement.getServer(1).getNumberOfNodesOnCluster());
- ServerManagement.stop(0,true);
- System.out.println("Number of nodes on Cluster = " + ServerManagement.getServer(1).getNumberOfNodesOnCluster());
- assertEquals(2,ServerManagement.getServer(1).getNumberOfNodesOnCluster());
+ JBossConnectionFactory factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+ ClusteredClientConnectionFactoryDelegate delegate =
+ (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+ log.info ("number of delegates = " + delegate.getDelegates().length);
+ log.info ("number of servers = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+
+ assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+ assertEquals(3, delegate.getDelegates().length);
+
+
+ ServerManagement.log(ServerManagement.INFO,"Stopping server 2 as part of testConnectionFactoryConnect");
+ ServerManagement.stop(2,true);
+
+ factory = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+ delegate = (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+ assertEquals(2, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+ assertEquals(2, delegate.getDelegates().length);
}
finally
{
- ServerManagement.start("all", 0); // tear down needs the server up
+ ServerManagement.start("all", 2);
}
+
+
}
More information about the jboss-cvs-commits
mailing list