[jboss-cvs] JBoss Messaging SVN: r1734 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/jms/server/remoting src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Dec 9 09:40:44 EST 2006
Author: timfox
Date: 2006-12-09 09:40:30 -0500 (Sat, 09 Dec 2006)
New Revision: 1734
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Fixed HA issue with replicating chnages
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-09 14:40:30 UTC (rev 1734)
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import javax.jms.JMSException;
@@ -75,9 +76,9 @@
//Cache this here
private ClientConnectionFactoryDelegate[] delegates;
- private int[] failoverIndexes;
+ private Map failoverMap;
- private int current;
+ private int currentRobinIndex;
public Object handleCreateConnectionDelegate(Invocation invocation) throws Throwable
{
@@ -115,11 +116,11 @@
//TODO this is currently hardcoded as round-robin, this should be made pluggable
private synchronized ClientConnectionFactoryDelegate getDelegateRoundRobin()
{
- ClientConnectionFactoryDelegate currentDelegate = delegates[current++];
+ ClientConnectionFactoryDelegate currentDelegate = delegates[currentRobinIndex++];
- if (current >= delegates.length)
+ if (currentRobinIndex >= delegates.length)
{
- current = 0;
+ currentRobinIndex = 0;
}
return currentDelegate;
}
@@ -150,11 +151,11 @@
{
//TODO: Fix this! metadata should contain CF_FAILOVER_INDEXES
//failoverIndexes = (int[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_FAILOVER_INDEXES);
- failoverIndexes = (int[])((ClusteredClientConnectionFactoryDelegate)target).getFailoverIndexes();
+ failoverMap = ((ClusteredClientConnectionFactoryDelegate)target).getFailoverMap();
- if (failoverIndexes == null)
+ if (failoverMap == null)
{
- throw new IllegalStateException("Cannot find failoverIndexes!");
+ throw new IllegalStateException("Cannot find failoverMap!");
}
}
}
@@ -181,7 +182,7 @@
//The connection has failed
private void handleFailure(ClientConnectionDelegate failedConnection) throws Exception
{
- ClientConnectionFactoryDelegate newCF = getAlternateDelegate(failedConnection);
+ ClientConnectionFactoryDelegate newCF = getFailoverDelegate(failedConnection);
//TODO implement client side valve to prevent invocations occurring whilst failover is occurring
@@ -202,74 +203,69 @@
}
}
- private ClientConnectionFactoryDelegate getAlternateDelegate(ClientConnectionDelegate currentDelegate) throws JMSException
+ private ClientConnectionFactoryDelegate getFailoverDelegate(ClientConnectionDelegate currentDelegate) throws JMSException
{
//We need to choose which delegate to fail over to
- //This is determined by the failoverIndexes array
- //The fail over delegate for delegates[i] is given by
- //delegates[failoverIndexes[i]]
-
ConnectionState currentState = (ConnectionState)((DelegateSupport)currentDelegate).getState();
- String currentLocator =
- currentState.getRemotingConnection().getInvokingClient().getInvoker().getLocator().getLocatorURI();
+ int currentServerID = currentState.getServerID();
- int local = -1;
+ //Lookup in the failover map to see which server to fail over onto
+ Integer failoverServerID = (Integer)failoverMap.get(new Integer(currentServerID));
+
+ if (failoverServerID == null)
+ {
+ throw new IllegalStateException("Cannot find failover node for node " + currentServerID);
+ }
+
+ //Now find the actual delegate
+
+ ClientConnectionFactoryDelegate del = null;
+
for (int i = 0; i < delegates.length; i++)
{
- ConnectionState state = (ConnectionState)((DelegateSupport)delegates[i]).getState();
-
- String locator =
- state.getRemotingConnection().getInvokingClient().getInvoker().getLocator().getLocatorURI();
-
- if (currentLocator.equals(locator))
+ if (delegates[i].getServerId() == failoverServerID.intValue())
{
- local = i;
+ del = delegates[i];
break;
}
}
- //Sanity
- if (local == -1)
+ if (del == null)
{
- throw new IllegalStateException("Cannot find local delegate!");
+ throw new IllegalStateException("Cannot find failover delegate for node " + failoverServerID.intValue());
}
-
- if (delegates.length == 1)
- {
- throw new IllegalStateException("Cannot failover connection since no servers to fail over onto");
- }
-
- ClientConnectionFactoryDelegate delegateFound = delegates[failoverIndexes[local]];
-
-
+
// Redirect connection routine.
// Verify the failureMap on the server and if out of sync find the correct delegate
- int failoverNode = delegateFound.getFailoverNode(currentState.getServerID());
- if (failoverNode!=delegateFound.getServerId())
- {
- delegateFound = null;
- for (int i = 0; i < delegates.length; i++)
- {
- if (delegates[i].getServerId() == failoverNode)
- {
- delegateFound = delegates[i];
- }
- }
+
+ //THIS IS WRONG - cannot use getFailoverNode method since failover node might change
+ //between getting the result and actually failing over
+
+// int failoverNode = delegateFound.getFailoverNode(currentState.getServerID());
+// if (failoverNode!=delegateFound.getServerId())
+// {
+// delegateFound = null;
+// for (int i = 0; i < delegates.length; i++)
+// {
+// if (delegates[i].getServerId() == failoverNode)
+// {
+// delegateFound = delegates[i];
+// }
+// }
+//
+// if (delegateFound==null)
+// {
+// throw new IllegalStateException("Cannot find failover node on current map for nodeId=" + failoverNode);
+// }
+//
+// }
- if (delegateFound==null)
- {
- throw new IllegalStateException("Cannot find failover node on current map for nodeId=" + failoverNode);
- }
-
- }
-
- return delegateFound;
-
+ return del;
}
private void failover(ClientConnectionDelegate failedConnection, ClientConnectionDelegate newConnection) throws Exception
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-09 14:40:30 UTC (rev 1734)
@@ -21,9 +21,9 @@
*/
package org.jboss.jms.client.delegate;
-import org.jboss.aop.util.PayloadKey;
+import java.util.Map;
+
import org.jboss.jms.server.Version;
-import org.jboss.jms.server.remoting.MetaDataConstants;
/**
* A ClusteredClientConnectionFactoryDelegate
@@ -49,24 +49,21 @@
// Attributes ----------------------------------------------------
- /**
- * If delegates[i] is the current delegate, then the failover delegate is given by
- * delegates[failoverIndexes[i]]
- */
private ClientConnectionFactoryDelegate[] delegates;
- private int[] failoverIndexes;
+ //Map <node Id, failover node id>
+ private Map failoverMap;
// Constructors --------------------------------------------------
public ClusteredClientConnectionFactoryDelegate(int objectID, int serverId, String serverLocatorURI,
Version serverVersion, boolean clientPing,
ClientConnectionFactoryDelegate[] delegates,
- int[] failoverIndexes)
+ Map failoverMap)
{
super(objectID, serverId, serverLocatorURI, serverVersion, clientPing);
this.delegates = delegates;
- this.failoverIndexes = failoverIndexes;
+ this.failoverMap = failoverMap;
}
// Some of the properties of ClientConnectionFactoryDelegate are not exposed..
@@ -74,10 +71,10 @@
// So, I created this Constructor so I could have access into protected members inside an extension class
public ClusteredClientConnectionFactoryDelegate(ClientConnectionFactoryDelegate mainDelegate,
ClientConnectionFactoryDelegate[] delegates,
- int[] failoverIndexes)
+ Map failoverMap)
{
this(mainDelegate.getID(), mainDelegate.serverId, mainDelegate.serverLocatorURI,
- mainDelegate.serverVersion, mainDelegate.clientPing, delegates, failoverIndexes);
+ mainDelegate.serverVersion, mainDelegate.clientPing, delegates, failoverMap);
}
// DelegateSupport overrides -------------------------------------
@@ -93,15 +90,17 @@
delegates[i].init();
}
}
+
+ //This doesn't seem to be used so I'm commenting it out
- // We add this to the meta data so the failOver aspect can get access to it
- getMetaData().addMetaData(MetaDataConstants.JMS,
- MetaDataConstants.CF_DELEGATES,
- delegates, PayloadKey.TRANSIENT);
-
- getMetaData().addMetaData(MetaDataConstants.JMS,
- MetaDataConstants.CF_FAILOVER_INDEXES,
- failoverIndexes, PayloadKey.TRANSIENT);
+// // We add this to the meta data so the failOver aspect can get access to it
+// getMetaData().addMetaData(MetaDataConstants.JMS,
+// MetaDataConstants.CF_DELEGATES,
+// delegates, PayloadKey.TRANSIENT);
+//
+// getMetaData().addMetaData(MetaDataConstants.JMS,
+// MetaDataConstants.FAILOVER_MAP,
+// failoverMap, PayloadKey.TRANSIENT);
}
// Public --------------------------------------------------------
@@ -113,11 +112,21 @@
return delegates;
}
- /** As metadata is not working, I'm exposing this temporarily */
- public int[] getFailoverIndexes()
+ /** TODO As metadata is not working, I'm exposing this temporarily */
+ public Map getFailoverMap()
{
- return failoverIndexes;
+ return failoverMap;
}
+
+ public void setFailoverMap(Map failoverMap)
+ {
+ this.failoverMap = failoverMap;
+ }
+
+ public void setDelegates(ClientConnectionFactoryDelegate[] dels)
+ {
+ this.delegates = dels;
+ }
public String toString()
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-09 14:40:30 UTC (rev 1734)
@@ -22,13 +22,16 @@
package org.jboss.jms.server.connectionfactory;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
+
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
@@ -40,8 +43,10 @@
import org.jboss.jms.server.remoting.JMSDispatcher;
import org.jboss.jms.util.JNDIUtil;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.plugin.contract.FailoverMapper;
import org.jboss.messaging.core.plugin.contract.ReplicationListener;
import org.jboss.messaging.core.plugin.contract.Replicator;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
/**
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -75,6 +80,13 @@
protected Replicator replicator;
+ /*
+ We cache the map of node->failover node in here.
+ This is then updated when node joins or leaves the cluster via the replicationListener
+ When new cfs are deployed we use the cached map
+ */
+ protected Map failoverMap;
+
// Constructors --------------------------------------------------
public ConnectionFactoryJNDIMapper(ServerPeer serverPeer) throws Exception
@@ -107,6 +119,7 @@
}
int id = serverPeer.getNextObjectID();
+
Version version = serverPeer.getVersion();
ServerConnectionFactoryEndpoint endpoint =
@@ -128,55 +141,46 @@
log.info("ConnectionFactoryJNDIMapper is non clustered");
}
- /* if (clustered)
+ boolean creatingClustered = clustered && replicator != null;
+
+ ClientConnectionFactoryDelegate localDelegate =
+ new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
+ locatorURI, version, clientPing);
+
+ /*
+ * When registering a new clustered connection factory i should first create it with the available delegates
+ * then send the replication message.
+ * We then listen for connection factories added to global state using the replication listener
+ * and then update their connection factory list.
+ * This will happen locally too, so we will get the replication message locally - to avoid updating it again
+ * we can ignore any "add" replication updates that originate from the current node.
+ */
+
+ if (creatingClustered)
{
- setupReplicator();
+ //Replicate the change - we will ignore this locally
- if (replicator != null)
- {
- //Replicator might still be null since we might be deploying a clustered cf in a non clustered
- //post office (which is ok)
- delegate = new ClusteredClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(), locatorURI, version, clientPing);
- }
+ replicator.put(CF_PREFIX + uniqueName, localDelegate);
+
+ //Create a clustered delegate
+
+ Map localDelegates = replicator.get(CF_PREFIX + uniqueName);
+
+ delegate = createClusteredDelegate(localDelegates);
+
}
+ else
+ {
+ delegate = localDelegate;
+ }
- if (delegate == null)
- {
- //Local
- delegate = new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
- locatorURI, version, clientPing);
- } */
-
- delegate = new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
- locatorURI, version, clientPing);
-
-
- //delegate = new ClusteredClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(), locatorURI, version, clientPing);
-
-
log.trace("Adding delegates factory " + uniqueName + " pointing to " + delegate);
+
delegates.put(uniqueName, delegate);
-
- // We are clustered, we need to propagate the local delegate across the cluster.
-
- // ... and then update it (and the clustered delegate on every node, while we're at it) by
- // replicating the local delegate across the cluster. This way, the clustred delegates on each
- // node will contain an updated list of local delegates, and all ConnectionFactoryJNDIMapper
- // will rebind updated ConnectionFactories in JNDI.
- // This will update the local node too
-
- log.info("CF_PREFIX=" + CF_PREFIX + " uniqueName=" + uniqueName + " delegate = " + delegate + " replicator = " + replicator);
- if (replicator != null)
- {
- replicator.put(CF_PREFIX + uniqueName, delegate);
- }
- else
- {
- //Now bind it in JNDI
- rebindConnectionFactory(initialContext, jndiBindings, delegate);
- }
-
+ //Now bind it in JNDI
+ rebindConnectionFactory(initialContext, jndiBindings, delegate);
+
//Registering with the dispatcher should always be the last thing otherwise a client could use
//a partially initialised object
JMSDispatcher.instance.registerTarget(new Integer(id), new ConnectionFactoryAdvised(endpoint));
@@ -243,19 +247,6 @@
{
initialContext = new InitialContext();
- /*
-
- ConnectionFActoryJNDIMapper is started in a call of ServerPeer, while replicator is started later
- when the postoffices are started. So, I'm keeping the registration of replicator lazy on the first connection
- Allthough this is not a proper design, as we might loose messages when the PostOffice is connected
-
- replicator = serverPeer.getDataReplicator();
-
- if (replicator != null)
- {
- replicator.registerListener(this);
- } */
-
log.debug("started");
}
@@ -273,40 +264,99 @@
// ReplicationListener interface ----------------------------------
- public synchronized void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added)
+ public synchronized void onReplicationChange(Serializable key, Map updatedReplicantMap,
+ boolean added, int originatingNodeId)
{
log.info("Got replication call " + key + " node=" + serverPeer.getServerPeerID() + " replicants=" + updatedReplicantMap + " added=");
try
- {
- // The list of connection factories across the cluster has changed, so we need to update
- // the clustered connection factories in JNDI with new ones with the up to date list.
- // Replication can be used for other stuff (not just connection factories) so we check
- // the prefix.
+ {
+ if (!(key instanceof String))
+ {
+ return;
+ }
String sKey = (String)key;
-
- if (key instanceof String && sKey.startsWith(CF_PREFIX))
+
+ if (sKey.equals(DefaultClusteredPostOffice.ADDRESS_INFO_KEY))
{
- //We only need to rebind if the cf is being added
+ /*
+ We respond to changes in the node-address mapping
+ This will be replicated whan a node joins / leaves the group
+ When this happens we need to recalculate the failoverMap
+ and rebind all connection factories with the new mapping
+ We cannot just reference a single map since the objects are bound in JNDI
+ in serialized form
+ */
+ log.info("responding to node - adress info change. Recalculating mapping and rebinding cfs");
+ recalculateFailoverMap(updatedReplicantMap);
+
+ //rebind
+ Iterator iter = endpoints.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ String uniqueName = (String)entry.getKey();
+
+ ServerConnectionFactoryEndpoint endpoint =
+ (ServerConnectionFactoryEndpoint)entry.getValue();
+
+ ClusteredClientConnectionFactoryDelegate del = (ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
+
+ if (del == null)
+ {
+ throw new IllegalStateException("Cannot find cf with name " + uniqueName);
+ }
+
+ del.setFailoverMap(failoverMap);
+
+ rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
+ }
+
+ }
+ else if (sKey.startsWith(CF_PREFIX) && originatingNodeId != serverPeer.getServerPeerID())
+ {
+ /*
+ A connection factory has been deployed / undeployed - we need to update the local delegate arrays inside the clustered
+ connection factories with the same name
+ We don't recalculate the failover map since the number of nodes in the group hasn't changed
+ We also ignore any local changes since the cf will already be bound locally with the new
+ local delegate in the array
+ */
+
String uniqueName = sKey.substring(CF_PREFIX.length());
- ClusteredClientConnectionFactoryDelegate clusteredDelegate = createClusteredDelegate(updatedReplicantMap);
+ log.info("Connection factory with unique name " + uniqueName + " has been added / removed");
- // Now rebind ...
+ ClusteredClientConnectionFactoryDelegate del = (ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
+
+ if (del == null)
+ {
+ throw new IllegalStateException("Cannot find cf with name " + uniqueName);
+ }
+
+ ClientConnectionFactoryDelegate[] delArr =
+ (ClientConnectionFactoryDelegate[])updatedReplicantMap.values().toArray(new ClientConnectionFactoryDelegate[updatedReplicantMap.size()]);
+ log.info("Updating delsArr with size " + delArr.length);
+
+ del.setDelegates(delArr);
+
ServerConnectionFactoryEndpoint endpoint =
(ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
-
+
if (endpoint == null)
{
- throw new IllegalStateException("Cannot find endpoint " + uniqueName );
+ throw new IllegalStateException("Cannot find endpoint with name " + uniqueName);
}
-
- rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), clusteredDelegate);
- }
+
+ rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
+
+ }
}
- catch (NamingException e)
+ catch (Exception e)
{
log.error("Failed to rebind connection factory", e);
}
@@ -332,96 +382,63 @@
this.serverPeer.getQueuePostOfficeInstance();
}
-
+ private void recalculateFailoverMap(Map nodeAddressMap) throws Exception
+ {
+ List nodes = new ArrayList(nodeAddressMap.keySet());
+
+ FailoverMapper mapper = replicator.getFailoverMapper();
+
+ failoverMap = mapper.generateMapping(nodes);
+ }
+
/**
* @param localDelegates - Map<Integer(nodeId) - ClientConnectionFactoryDelegate>
*/
- private ClusteredClientConnectionFactoryDelegate createClusteredDelegate(Map localDelegates)
+ private ClusteredClientConnectionFactoryDelegate createClusteredDelegate(Map localDelegates) throws Exception
{
//TODO: make it trace after the code is stable
log.info("Updating FailoverDelegates " + localDelegates.size() + " on serverPeer:" + serverPeer.getServerPeerID());
- // Calculates the failoverMap based on the current list of localDelegates
- Map failoverMap = replicator.getFailoverMapper().generateMapping(localDelegates.keySet());
+ ClientConnectionFactoryDelegate[] delArr =
+ (ClientConnectionFactoryDelegate[])localDelegates.values().toArray(new ClientConnectionFactoryDelegate[localDelegates.size()]);
- int s = localDelegates.size();
-
- int[] nodesIDs = new int[s];
- int[] failoverNodeIDs = new int[s];
- ClientConnectionFactoryDelegate[] delegates = new ClientConnectionFactoryDelegate[s];
- // Ths created ClusteredClientFactoryDelegate will copy the properties from this delegate which is
- // the delegate on this current serverPeer
- ClientConnectionFactoryDelegate mainDelegate = null;
-
- int idx = 0;
-
- for(Iterator i = localDelegates.entrySet().iterator(); i.hasNext();)
+ //If the map is not cached - generate it now
+
+ if (failoverMap == null)
{
- Map.Entry entry = (Map.Entry)i.next();
-
- Integer nodeID = (Integer)entry.getKey();
- Integer failoverNodeID = (Integer)failoverMap.get(nodeID);
-
- log.trace("setFailoverDelegates inside forIterator::nodeID=" + nodeID + " failoverNodeID=" + failoverNodeID);
-
- nodesIDs[idx] = nodeID.intValue();
- failoverNodeIDs[idx] = failoverNodeID.intValue();
- delegates[idx] = (ClientConnectionFactoryDelegate)entry.getValue();
-
- if (delegates[idx].getServerId() == this.serverPeer.getServerPeerID())
+ Map nodeAddressMap = replicator.get(DefaultClusteredPostOffice.ADDRESS_INFO_KEY);
+
+ if (nodeAddressMap == null)
{
- // sanity check
- if (mainDelegate != null)
- {
- throw new IllegalStateException("There are two servers with serverID=" + this.serverPeer.getServerPeerID() + ", verify your clustering configuration");
- }
- mainDelegate = delegates[idx];
+ throw new IllegalStateException("Cannot find address node mapping!");
}
-
- idx++;
+
+ recalculateFailoverMap(nodeAddressMap);
}
- // Generate the failover indexes. This could probably be optimised if need be.
- //MainDelegate would be null in the case the cf is being undeployed locally
+ //Now we find the "mainDelegate"
+ //TODO - why do we need a "mainDelegate" ????
- ClusteredClientConnectionFactoryDelegate clusteredDelegate = null;
+ ClientConnectionFactoryDelegate mainDelegate = null;
- if (mainDelegate != null)
- {
- int[] failoverIndexes = new int[s];
-
- for (int i = 0; i < s; i++)
- {
- int failoverNode = failoverNodeIDs[i];
- int failoverIndex = -1;
+ for(Iterator i = localDelegates.values().iterator(); i.hasNext();)
+ {
+ ClientConnectionFactoryDelegate del = (ClientConnectionFactoryDelegate)i.next();
- for (int j = 0; j < s; j++)
- {
- if (nodesIDs[j] == failoverNode)
- {
- failoverIndex = j;
- break;
- }
- }
+ if (del.getServerId() == this.serverPeer.getServerPeerID())
+ {
+ // sanity check
+ if (mainDelegate != null)
+ {
+ throw new IllegalStateException("There are two servers with serverID=" + this.serverPeer.getServerPeerID() + ", verify your clustering configuration");
+ }
+ mainDelegate = del;
+ }
+ }
- if (failoverIndex == -1)
- {
- //throw new IllegalStateException("Failover node " + failoverNode + " is not in list of nodes at node " + serverPeer.getServerPeerID() + "!");
-
- //It is possible that the failover node is not found - this might happen
- //if this is executed when a cf is undeployed but the failover node still contains
- //the old node.
-
- //In this case the node leave event will shortly
- }
-
- failoverIndexes[i] = failoverIndex;
- }
-
- clusteredDelegate =
- new ClusteredClientConnectionFactoryDelegate(mainDelegate, delegates, failoverIndexes);
- }
-
+ ClusteredClientConnectionFactoryDelegate clusteredDelegate =
+ new ClusteredClientConnectionFactoryDelegate(mainDelegate, delArr, failoverMap);
+
return clusteredDelegate;
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java 2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java 2006-12-09 14:40:30 UTC (rev 1734)
@@ -53,7 +53,7 @@
public static final String REMOTING_CONNECTION = "REMOTING_CONNECTION";
- public static final String CF_FAILOVER_INDEXES = "CF_FAIL_IND";
+ public static final String FAILOVER_MAP = "CF_FAIL_IND";
public static final String CONNECTION_VERSION = "CONNECTION_VERSION";
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java 2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java 2006-12-09 14:40:30 UTC (rev 1734)
@@ -21,7 +21,7 @@
*/
package org.jboss.messaging.core.plugin.contract;
-import java.util.Collection;
+import java.util.List;
import java.util.Map;
/**
@@ -37,12 +37,5 @@
*/
public interface FailoverMapper
{
- /** This receives a List<Integer> of nodes and returns a Map<Integer,Integer> of nodes to be used on failover logic.
- *
- * An implementation of this method should aways sort received nodes first before calculating as the parameter list
- * might be in a different order, and it should aways return the same result for the same set of nodes whatever is
- * the order of parameter nodes
- * */
- Map generateMapping(Collection nodes);
-
+ Map generateMapping(List nodes);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java 2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java 2006-12-09 14:40:30 UTC (rev 1734)
@@ -39,5 +39,5 @@
* @param updatedReplicantMap - the updated replicant map. It contains ALL current replicants for
* the given key.
*/
- void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added);
+ void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added, int originatingNodeId);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java 2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java 2006-12-09 14:40:30 UTC (rev 1734)
@@ -67,22 +67,8 @@
void unregisterListener(ReplicationListener listener);
- /**
- * TODO - this method doesn't belong here, as no semantics should be associated with data at this
- * level
- *
- * Gets the current failover node ID for the specified node ID.
- *
- * @param nodeID - The node to get the failover node id for.
- *
- * @return the failover node ID. If there is no failover node (one-node cluster), the method
- * returns the original nodeID.
- */
- //int getFailoverNodeID(int nodeID);
-
- /** TODO - this method doesn't belong here... We should have POJOTized containers updating dependencies
+ /** TODO - this method doesn't belong here... We should have POJOized containers updating dependencies
* between ConnectionFActoryJNDIMapper and DefaultClusteredPostOffice */
FailoverMapper getFailoverMapper();
-
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-09 14:40:30 UTC (rev 1734)
@@ -93,7 +93,7 @@
private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);
// Key for looking up node id -> address info mapping from replicated data
- private static final String ADDRESS_INFO_KEY = "address_info";
+ public static final String ADDRESS_INFO_KEY = "address_info";
//Used for failure testing
private boolean failBeforeCommit;
@@ -722,7 +722,7 @@
m.put(new Integer(originatorNodeID), replicant);
- notifyListeners(key, m, true);
+ notifyListeners(key, m, true, originatorNodeID);
}
}
@@ -753,7 +753,7 @@
{
replicatedData.remove(key);
}
- notifyListeners(key, m, false);
+ notifyListeners(key, m, false, originatorNodeID);
return true;
}
@@ -1821,15 +1821,15 @@
/*
* Removes all non durable binding data, and any local replicant data for the specified node.
*/
- private void removeDataForNode(Integer nodeID) throws Exception
+ private void removeDataForNode(Integer nodeToRemove) throws Exception
{
- log.info("Node " + nodeID + " requested to leave cluster");
+ log.info("Node " + nodeToRemove + " requested to leave cluster");
lock.writeLock().acquire();
try
{
- Map nameMap = (Map)nameMaps.get(nodeID);
+ Map nameMap = (Map)nameMaps.get(nodeToRemove);
if (nameMap != null)
{
@@ -1855,7 +1855,7 @@
{
Binding binding = (Binding)iter.next();
- removeBinding(nodeID.intValue(), binding.getQueue().getName());
+ removeBinding(nodeToRemove.intValue(), binding.getQueue().getName());
}
}
}
@@ -1874,7 +1874,7 @@
String key = (String)entry.getKey();
Map replicants = (Map)entry.getValue();
- replicants.remove(nodeID);
+ replicants.remove(nodeToRemove);
if (replicants.isEmpty())
{
@@ -1882,7 +1882,7 @@
}
// Need to trigger listeners
- notifyListeners(key, replicants, false);
+ notifyListeners(key, replicants, false, nodeToRemove.intValue());
}
}
}
@@ -1891,7 +1891,8 @@
* @param updatedReplicantMap - the updated replicant map. It contains ALL current replicants for
* the given key.
*/
- private void notifyListeners(Serializable key, Map updatedReplicantMap, boolean added)
+ private void notifyListeners(Serializable key, Map updatedReplicantMap, boolean added,
+ int originatorNodeId)
{
synchronized (replicationListeners)
{
@@ -1899,7 +1900,7 @@
{
ReplicationListener listener = (ReplicationListener)i.next();
- listener.onReplicationChange(key, updatedReplicantMap, added);
+ listener.onReplicationChange(key, updatedReplicantMap, added, originatorNodeId);
}
}
}
@@ -2115,7 +2116,7 @@
Map replicants = (Map)entry.getValue();
- Map m = new HashMap();
+ Map m = new LinkedHashMap();
m.putAll(replicants);
@@ -2461,7 +2462,8 @@
private class NodeAddressMapListener implements ReplicationListener
{
- public void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added)
+ public void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added,
+ int originatorNodeId)
{
if (key instanceof String && ((String)key).equals(ADDRESS_INFO_KEY))
{
@@ -2484,8 +2486,7 @@
private void generateFailoverMap(Map nodeAddressMap)
{
-
- failoverMap = failoverMapper.generateMapping(nodeAddressMap.keySet());
+ failoverMap = failoverMapper.generateMapping(new ArrayList(nodeAddressMap.keySet()));
}
}
}
\ No newline at end of file
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java 2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java 2006-12-09 14:40:30 UTC (rev 1734)
@@ -21,10 +21,11 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedHashMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+
import org.jboss.logging.Logger;
import org.jboss.messaging.core.plugin.contract.FailoverMapper;
@@ -45,20 +46,20 @@
private static final Logger log = Logger.getLogger(DefaultFailoverMapper.class);
/** This receives a List<Integer> of nodes and returns a Map<Integer,Integer> of nodes to be used on failover logic */
- public Map generateMapping(Collection nodes)
+ public Map generateMapping(List nodes)
{
-
- Object[] arrayNodes = (Object[])nodes.toArray(new Integer[nodes.size()]);
- Arrays.sort(arrayNodes);
-
- int s = arrayNodes.length;
+ if (!(nodes instanceof ArrayList))
+ {
+ //Convert to array list for fast index access
+ nodes = new ArrayList(nodes);
+ }
- log.info("Genertaing failover mapping, node size="+ s);
+ int s = nodes.size();
+
+ log.info("Generating failover mapping, node size= "+ s);
-
-
-
- Map failoverNodes = new LinkedHashMap(s);
+ //There is no need for the map to be linked
+ Map failoverNodes = new HashMap(s);
for (int i = 0; i < s; i++)
{
@@ -69,7 +70,7 @@
j = 0;
}
- failoverNodes.put(arrayNodes[i], arrayNodes[j]);
+ failoverNodes.put(nodes.get(i), nodes.get(j));
}
return failoverNodes;
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-09 01:15:18 UTC (rev 1733)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-09 14:40:30 UTC (rev 1734)
@@ -22,6 +22,8 @@
package org.jboss.test.messaging.jms.clustering;
+import java.util.Map;
+
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
@@ -187,6 +189,23 @@
}
+ public void testStartStopProblem() throws Exception
+ {
+ //Three nodes should be running now
+
+ //Stop one of them
+
+ ServerManagement.stop(0, true);
+
+ //Stop another
+
+ ServerManagement.stop(1, true);
+
+
+
+ }
+
+
/*
* Test that the failover mapping is created correctly
*/
@@ -207,67 +226,89 @@
ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+ //The order here depends on the order the servers were started in
+
+ log.info("cf1 serverid=" + cf1.getServerId());
+
+ log.info("cf2 serverid=" + cf2.getServerId());
+
+ log.info("cf3 serverid=" + cf3.getServerId());
+
+
assertEquals(0, cf1.getServerId());
assertEquals(1, cf2.getServerId());
assertEquals(2, cf3.getServerId());
- int[] failoverIndexes = delegate.getFailoverIndexes();
+ Map failoverMap = delegate.getFailoverMap();
assertEquals(3, delegates.length);
- assertEquals(3, failoverIndexes.length);
+ assertEquals(3, failoverMap.size());
// Default failover policy just chooses the node to the right
- log.info("failoverindex[0]:" + failoverIndexes[0]);
- log.info("failoverindex[1]:" + failoverIndexes[1]);
- log.info("failoverindex[2]:" + failoverIndexes[2]);
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
- assertEquals(1, failoverIndexes[0]);
+ assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
- assertEquals(2, failoverIndexes[1]);
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
- assertEquals(0, failoverIndexes[2]);
+ assertEquals(2, ((Integer)failoverMap.get(new Integer(1))).intValue());
//Now cleanly stop one of the servers
+
+ log.info("************** STOPPING SERVER 0");
ServerManagement.stop(0, true);
+ log.info("server stopped");
+
assertEquals(2, ServerManagement.getServer(1).getNumberOfNodesOnCluster());
//Lookup another connection factory
JBossConnectionFactory factory2 = (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+ log.info("Got connection factory");
+
ClusteredClientConnectionFactoryDelegate delegate2 =
(ClusteredClientConnectionFactoryDelegate)factory2.getDelegate();
ClientConnectionFactoryDelegate[] delegates2 = delegate2.getDelegates();
- int[] failoverIndexes2 = delegate2.getFailoverIndexes();
+ Map failoverMap2 = delegate2.getFailoverMap();
+ log.info("Got failover map");
+
assertEquals(2, delegates2.length);
cf1 = delegate2.getDelegates()[0];
cf2 = delegate2.getDelegates()[1];
- assertEquals(0, cf1.getServerId());
+ //Order here depends on order servers were started in
- assertEquals(1, cf2.getServerId());
+ log.info("cf1 serverid=" + cf1.getServerId());
+ log.info("cf2 serverid=" + cf2.getServerId());
+ assertEquals(1, cf1.getServerId());
- assertEquals(2, failoverIndexes2.length);
+ assertEquals(2, cf2.getServerId());
- assertEquals(1, failoverIndexes2[0]);
- assertEquals(0, failoverIndexes2[1]);
+ assertEquals(2, failoverMap2.size());
+ assertEquals(cf2.getServerId(), ((Integer)failoverMap2.get(new Integer(cf1.getServerId()))).intValue());
+
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap2.get(new Integer(cf2.getServerId()))).intValue());
+
//Cleanly stop another server
+ log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
+
ServerManagement.stop(1, true);
assertEquals(1, ServerManagement.getServer(2).getNumberOfNodesOnCluster());
@@ -281,7 +322,7 @@
ClientConnectionFactoryDelegate[] delegates3 = delegate3.getDelegates();
- int[] failoverIndexes3 = delegate3.getFailoverIndexes();
+ Map failoverMap3 = delegate3.getFailoverMap();
assertEquals(1, delegates3.length);
@@ -290,11 +331,10 @@
assertEquals(0, cf1.getServerId());
- assertEquals(1, failoverIndexes3.length);
+ assertEquals(1, failoverMap3.size());
- assertEquals(0, failoverIndexes3[0]);
-
-
+ assertEquals(cf1.getServerId(), ((Integer)failoverMap3.get(new Integer(cf1.getServerId()))).intValue());
+
//TODO - Add nodes back into the cluster - test framework currently does not support this
More information about the jboss-cvs-commits
mailing list