[jboss-cvs] JBoss Messaging SVN: r1733 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Dec 8 20:15:24 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-12-08 20:15:18 -0500 (Fri, 08 Dec 2006)
New Revision: 1733
Added:
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
Log:
Changing how the failoverMapping is being calculated
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-09 01:15:18 UTC (rev 1733)
@@ -275,7 +275,7 @@
public synchronized void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added)
{
- log.info("Got replication call " + key + " node=" + serverPeer.getServerPeerID() + " replicants=" + updatedReplicantMap);
+ log.info("Got replication call " + key + " node=" + serverPeer.getServerPeerID() + " replicants=" + updatedReplicantMap + " added=");
try
{
// The list of connection factories across the cluster has changed, so we need to update
@@ -285,7 +285,7 @@
String sKey = (String)key;
- if (added && key instanceof String && sKey.startsWith(CF_PREFIX))
+ if (key instanceof String && sKey.startsWith(CF_PREFIX))
{
//We only need to rebind if the cf is being added
@@ -293,21 +293,17 @@
ClusteredClientConnectionFactoryDelegate clusteredDelegate = createClusteredDelegate(updatedReplicantMap);
- //It could be null if the cf is being undeployed
- if (clusteredDelegate != null)
- {
- // Now rebind ...
-
- ServerConnectionFactoryEndpoint endpoint =
- (ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
-
- if (endpoint == null)
- {
- throw new IllegalStateException("Cannot find endpoint " + uniqueName );
- }
-
- rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), clusteredDelegate);
+ // Now rebind ...
+
+ ServerConnectionFactoryEndpoint endpoint =
+ (ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
+
+ if (endpoint == null)
+ {
+ throw new IllegalStateException("Cannot find endpoint " + uniqueName );
}
+
+ rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), clusteredDelegate);
}
}
catch (NamingException e)
@@ -345,6 +341,9 @@
//TODO: make it trace after the code is stable
log.info("Updating FailoverDelegates " + localDelegates.size() + " on serverPeer:" + serverPeer.getServerPeerID());
+ // Calculates the failoverMap based on the current list of localDelegates
+ Map failoverMap = replicator.getFailoverMapper().generateMapping(localDelegates.keySet());
+
int s = localDelegates.size();
int[] nodesIDs = new int[s];
@@ -360,13 +359,13 @@
{
Map.Entry entry = (Map.Entry)i.next();
- int nodeID = ((Integer)entry.getKey()).intValue();
- int failoverNodeID = replicator.getFailoverNodeID(nodeID);
+ Integer nodeID = (Integer)entry.getKey();
+ Integer failoverNodeID = (Integer)failoverMap.get(nodeID);
log.trace("setFailoverDelegates inside forIterator::nodeID=" + nodeID + " failoverNodeID=" + failoverNodeID);
- nodesIDs[idx] = nodeID;
- failoverNodeIDs[idx] = failoverNodeID;
+ nodesIDs[idx] = nodeID.intValue();
+ failoverNodeIDs[idx] = failoverNodeID.intValue();
delegates[idx] = (ClientConnectionFactoryDelegate)entry.getValue();
if (delegates[idx].getServerId() == this.serverPeer.getServerPeerID())
@@ -374,15 +373,13 @@
// sanity check
if (mainDelegate != null)
{
- throw new IllegalStateException("There are two server with serverID, verify your clustering configuration");
+ throw new IllegalStateException("There are two servers with serverID=" + this.serverPeer.getServerPeerID() + ", verify your clustering configuration");
}
mainDelegate = delegates[idx];
}
idx++;
}
- // sanity check
-
// Generate the failover indexes. This could probably be optimised if need be.
//MainDelegate would be null in the case the cf is being undeployed locally
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2006-12-09 01:15:18 UTC (rev 1733)
@@ -142,8 +142,11 @@
{
try
{
- ServerPeer peer = (ServerPeer )serverPeer.getInstance();
- return peer.getReplicator().getFailoverNodeID(node);
+ //ServerPeer peer = (ServerPeer )serverPeer.getInstance();
+ //return peer.getReplicator().getFailoverNodeID(node);
+
+ // not implemented yet
+ return node;
}
catch (Throwable t)
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java 2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java 2006-12-09 01:15:18 UTC (rev 1733)
@@ -21,7 +21,8 @@
*/
package org.jboss.messaging.core.plugin.contract;
-import java.util.List;
+import java.util.Collection;
+import java.util.Map;
/**
* A FailoverMapper
@@ -36,6 +37,12 @@
*/
public interface FailoverMapper
{
- List generateMapping(List nodes);
+ /** This receives a List<Integer> of nodes and returns a Map<Integer,Integer> of nodes to be used on failover logic.
+ *
+ * An implementation of this method should aways sort received nodes first before calculating as the parameter list
+ * might be in a different order, and it should aways return the same result for the same set of nodes whatever is
+ * the order of parameter nodes
+ * */
+ Map generateMapping(Collection nodes);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java 2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java 2006-12-09 01:15:18 UTC (rev 1733)
@@ -78,7 +78,11 @@
* @return the failover node ID. If there is no failover node (one-node cluster), the method
* returns the original nodeID.
*/
- int getFailoverNodeID(int nodeID);
+ //int getFailoverNodeID(int nodeID);
+ /** TODO - this method doesn't belong here... We should have POJOTized containers updating dependencies
+ * between ConnectionFActoryJNDIMapper and DefaultClusteredPostOffice */
+ FailoverMapper getFailoverMapper();
+
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-09 01:15:18 UTC (rev 1733)
@@ -626,6 +626,11 @@
}
}
+ public FailoverMapper getFailoverMapper()
+ {
+ return failoverMapper;
+ }
+
// Replicator implementation --------------------------------------------------------------------------
public Map get(Serializable key) throws Exception
@@ -1858,24 +1863,24 @@
{
lock.writeLock().release();
}
-
+
synchronized (replicatedData)
- {
+ {
// We need to remove any replicant data for the node. This includes the node-address info.
for(Iterator i = replicatedData.entrySet().iterator(); i.hasNext(); )
{
Map.Entry entry = (Map.Entry)i.next();
-
+
String key = (String)entry.getKey();
Map replicants = (Map)entry.getValue();
-
+
replicants.remove(nodeID);
-
+
if (replicants.isEmpty())
{
i.remove();
- }
-
+ }
+
// Need to trigger listeners
notifyListeners(key, replicants, false);
}
@@ -2479,34 +2484,8 @@
private void generateFailoverMap(Map nodeAddressMap)
{
- List nodes = new ArrayList(nodeAddressMap.keySet());
-
- log.info("generating failover map");
-
- log.info("I have " + nodes.size() + " nodes");
-
- List failoverNodes = failoverMapper.generateMapping(nodes);
-
- log.info("I generated " + failoverNodes.size() +" failover nodes");
-
- // Now put this in the map of node -> failover node
-
- synchronized (failoverMap)
- {
- failoverMap.clear();
-
- Iterator iter = nodes.iterator();
- Iterator iter2 = failoverNodes.iterator();
-
- while (iter.hasNext())
- {
- Integer node = (Integer)iter.next();
-
- Integer failoverNode = (Integer)iter2.next();
-
- failoverMap.put(node, failoverNode);
- }
- }
+
+ failoverMap = failoverMapper.generateMapping(nodeAddressMap.keySet());
}
}
}
\ No newline at end of file
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java 2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java 2006-12-09 01:15:18 UTC (rev 1733)
@@ -21,9 +21,10 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
-import java.util.ArrayList;
-import java.util.List;
-
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.plugin.contract.FailoverMapper;
@@ -42,21 +43,23 @@
public class DefaultFailoverMapper implements FailoverMapper
{
private static final Logger log = Logger.getLogger(DefaultFailoverMapper.class);
-
- public List generateMapping(List nodes)
+
+ /** This receives a List<Integer> of nodes and returns a Map<Integer,Integer> of nodes to be used on failover logic */
+ public Map generateMapping(Collection nodes)
{
- int s = nodes.size();
+
+ Object[] arrayNodes = (Object[])nodes.toArray(new Integer[nodes.size()]);
+ Arrays.sort(arrayNodes);
+
+ int s = arrayNodes.length;
log.info("Genertaing failover mapping, node size="+ s);
-
- List failoverNodes = new ArrayList(s);
+
+
+
+
+ Map failoverNodes = new LinkedHashMap(s);
- if (!(nodes instanceof ArrayList))
- {
- //So we can ensure fast index based access
- nodes = new ArrayList(nodes);
- }
-
for (int i = 0; i < s; i++)
{
int j = i + 1;
@@ -66,13 +69,9 @@
j = 0;
}
- Object failoverNode = nodes.get(j);
-
- failoverNodes.add(failoverNode);
+ failoverNodes.put(arrayNodes[i], arrayNodes[j]);
}
- log.info("Returning " + failoverNodes.size() + " nodes");
-
return failoverNodes;
}
Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java 2006-12-08 21:56:34 UTC (rev 1732)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/FailoverMapperTest.java 2006-12-09 01:15:18 UTC (rev 1733)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.ArrayList;
+import java.util.Map;
+import junit.framework.TestCase;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision:$</tt>
+ * <p/>
+ * $Id:$
+ */
+public class FailoverMapperTest extends TestCase
+{
+
+ public void testMapper()
+ {
+ ArrayList list = new ArrayList();
+
+ list.add(new Integer(50));
+ list.add(new Integer(25));
+ list.add(new Integer(15));
+
+ DefaultFailoverMapper mapper = new DefaultFailoverMapper();
+ Map map = mapper.generateMapping(list);
+
+ assertEquals(new Integer(15),map.get(new Integer(50)));
+ assertEquals(new Integer(50),map.get(new Integer(25)));
+ assertEquals(new Integer(25),map.get(new Integer(15)));
+
+ }
+
+
+}
More information about the jboss-cvs-commits
mailing list