[jboss-cvs] JBoss Messaging SVN: r1694 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss: jms/client/delegate messaging/core/plugin messaging/core/plugin/contract messaging/core/plugin/postoffice messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Dec 3 17:41:34 EST 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-12-03 17:41:30 -0500 (Sun, 03 Dec 2006)
New Revision: 1694
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PutReplicantRequest.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoveReplicantRequest.java
Log:
minor renaming, formatting and logging changes
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-03 22:41:30 UTC (rev 1694)
@@ -38,56 +38,76 @@
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id$
- *
*/
-public class ClusteredClientConnectionFactoryDelegate extends
- ClientConnectionFactoryDelegate
+public class ClusteredClientConnectionFactoryDelegate extends ClientConnectionFactoryDelegate
{
+ // Constants -----------------------------------------------------
+
private static final long serialVersionUID = 8286850860206289277L;
-
-
- /*
- * If delegates[i] is the current delegate, then the failover delegate
- * is given by delegates[failoverIndexes[i]]
- */
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ /**
+ * If delegates[i] is the current delegate, then the failover delegate is given by
+ * delegates[failoverIndexes[i]]
+ */
private ClientConnectionFactoryDelegate[] delegates;
-
+
private int[] failoverIndexes;
-
- public ClusteredClientConnectionFactoryDelegate(int objectID,
- String serverLocatorURI, Version serverVersion, boolean clientPing)
+
+ // Constructors --------------------------------------------------
+
+ public ClusteredClientConnectionFactoryDelegate(int objectID, String serverLocatorURI,
+ Version serverVersion, boolean clientPing)
{
super(objectID, serverLocatorURI, serverVersion, clientPing);
}
-
+
+ // DelegateSupport overrides -------------------------------------
+
public void init()
{
super.init();
-
- for (int i=0; i < delegates.length; i++)
+
+ for (int i = 0; i < delegates.length; i++)
{
delegates[i].init();
}
-
- //We add this to the meta data so the failOver aspect can get access to it
- getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_DELEGATES,
+
+ // We add this to the meta data so the failOver aspect can get access to it
+ getMetaData().addMetaData(MetaDataConstants.JMS,
+ MetaDataConstants.CF_DELEGATES,
delegates, PayloadKey.TRANSIENT);
-
- getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_FAILOVER_INDEXES,
+
+ getMetaData().addMetaData(MetaDataConstants.JMS,
+ MetaDataConstants.CF_FAILOVER_INDEXES,
failoverIndexes, PayloadKey.TRANSIENT);
}
-
- //Only be used in testing
+
+ // Public --------------------------------------------------------
+
+
+ public void setFailoverDelegates(ClientConnectionFactoryDelegate[] delegates,
+ int[] failoverIndexes)
+ {
+ this.delegates = delegates;
+ this.failoverIndexes = failoverIndexes;
+ }
+
+ // Only be used in testing
public ClientConnectionFactoryDelegate[] getDelegates()
{
return delegates;
}
-
- public void setFailoverDelegates(ClientConnectionFactoryDelegate[] delegates, int[] failoverIndexes)
- {
- this.delegates = delegates;
-
- this.failoverIndexes = failoverIndexes;
- }
-
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-12-03 22:41:30 UTC (rev 1694)
@@ -240,16 +240,16 @@
FilterFactory ff = new SelectorFactory();
FailoverMapper mapper = new DefaultFailoverMapper();
-
+
postOffice = new DefaultClusteredPostOffice(ds, tm, sqlProperties, createTablesOnStartup,
- nodeId, officeName, ms,
- pm, tr, ff, pool,
- groupName,
- syncChannelConfig, asyncChannelConfig,
- stateTimeout, castTimeout,
- pullPolicy, rf,
- mapper,
- statsSendPeriod);
+ nodeId, officeName, ms,
+ pm, tr, ff, pool,
+ groupName,
+ syncChannelConfig, asyncChannelConfig,
+ stateTimeout, castTimeout,
+ pullPolicy, rf,
+ mapper,
+ statsSendPeriod);
postOffice.start();
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java 2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java 2006-12-03 22:41:30 UTC (rev 1694)
@@ -45,7 +45,10 @@
void putReplicant(Serializable key, Serializable replicant) throws Exception;
boolean removeReplicant(Serializable key) throws Exception;
-
+
+ /**
+ * @return an empty map if no replicants are found for 'key', but never null.
+ */
Map getReplicants(Serializable key) throws Exception;
void registerListener(ReplicationListener listener);
@@ -53,9 +56,10 @@
void unregisterListener(ReplicationListener listener);
/**
- * Gets the current failover node id for the specified node id
- * @param nodeId The node to get the failover node id for
- * @return the failover node id
+ * Gets the current failover node id for the specified node id.
+ * @param nodeId The node to get the failover node id for.
+ * @return the failover node id. If there is no failover node (one-node cluster), the method
+ * returns the original nodeID.
*/
int getFailoverNodeForNode(int nodeId);
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-12-03 22:41:30 UTC (rev 1694)
@@ -122,13 +122,13 @@
public void start() throws Exception
{
- if (trace) { log.trace(this + " starting"); }
+ log.debug(this + " starting");
super.start();
loadBindings();
- if (trace) { log.trace(this + " started"); }
+ log.debug(this + " started");
}
public void stop() throws Exception
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-03 22:41:30 UTC (rev 1694)
@@ -137,13 +137,13 @@
private Set leftSet;
- private Element syncChannelConfigE;
+ private Element syncChannelConfigElement;
- private Element asyncChannelConfigE;
+ private Element asyncChannelConfigElement;
- private String syncChannelConfigS;
+ private String syncChannelConfig;
- private String asyncChannelConfigS;
+ private String asyncChannelConfig;
private long stateTimeout;
@@ -186,76 +186,87 @@
/*
* Constructor using Element for configuration
*/
- public DefaultClusteredPostOffice(DataSource ds, TransactionManager tm, Properties sqlProperties,
- boolean createTablesOnStartup,
- int nodeId, String officeName, MessageStore ms,
- PersistenceManager pm,
- TransactionRepository tr,
- FilterFactory filterFactory,
- QueuedExecutorPool pool,
- String groupName,
- Element syncChannelConfig,
- Element asyncChannelConfig,
- long stateTimeout, long castTimeout,
- MessagePullPolicy redistributionPolicy,
- ClusterRouterFactory rf,
- FailoverMapper failoverMapper,
- long statsSendPeriod) throws Exception
- {
+ public DefaultClusteredPostOffice(DataSource ds,
+ TransactionManager tm,
+ Properties sqlProperties,
+ boolean createTablesOnStartup,
+ int nodeId,
+ String officeName,
+ MessageStore ms,
+ PersistenceManager pm,
+ TransactionRepository tr,
+ FilterFactory filterFactory,
+ QueuedExecutorPool pool,
+ String groupName,
+ Element syncChannelConfig,
+ Element asyncChannelConfig,
+ long stateTimeout, long castTimeout,
+ MessagePullPolicy redistributionPolicy,
+ ClusterRouterFactory rf,
+ FailoverMapper failoverMapper,
+ long statsSendPeriod)
+ throws Exception
+ {
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
rf, failoverMapper, statsSendPeriod);
- this.syncChannelConfigE = syncChannelConfig;
- this.asyncChannelConfigE = asyncChannelConfig;
+ this.syncChannelConfigElement = syncChannelConfig;
+ this.asyncChannelConfigElement = asyncChannelConfig;
}
/*
* Constructor using String for configuration
*/
- public DefaultClusteredPostOffice(DataSource ds, TransactionManager tm, Properties sqlProperties,
- boolean createTablesOnStartup,
- int nodeId, String officeName, MessageStore ms,
- PersistenceManager pm,
- TransactionRepository tr,
- FilterFactory filterFactory,
- QueuedExecutorPool pool,
- String groupName,
- String syncChannelConfig,
- String asyncChannelConfig,
- long stateTimeout, long castTimeout,
- MessagePullPolicy redistributionPolicy,
- ClusterRouterFactory rf,
- FailoverMapper failoverMapper,
- long statsSendPeriod) throws Exception
+ public DefaultClusteredPostOffice(DataSource ds,
+ TransactionManager tm,
+ Properties sqlProperties,
+ boolean createTablesOnStartup,
+ int nodeId,
+ String officeName,
+ MessageStore ms,
+ PersistenceManager pm,
+ TransactionRepository tr,
+ FilterFactory filterFactory,
+ QueuedExecutorPool pool,
+ String groupName,
+ String syncChannelConfig,
+ String asyncChannelConfig,
+ long stateTimeout, long castTimeout,
+ MessagePullPolicy redistributionPolicy,
+ ClusterRouterFactory rf,
+ FailoverMapper failoverMapper,
+ long statsSendPeriod) throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
rf, failoverMapper, statsSendPeriod);
- this.syncChannelConfigS = syncChannelConfig;
- this.asyncChannelConfigS = asyncChannelConfig;
+ this.syncChannelConfig = syncChannelConfig;
+ this.asyncChannelConfig = asyncChannelConfig;
}
-
- private DefaultClusteredPostOffice(DataSource ds, TransactionManager tm, Properties sqlProperties,
- boolean createTablesOnStartup,
- int nodeId, String officeName, MessageStore ms,
- PersistenceManager pm,
- TransactionRepository tr,
- FilterFactory filterFactory,
- QueuedExecutorPool pool,
- String groupName,
- long stateTimeout, long castTimeout,
- MessagePullPolicy redistributionPolicy,
- ClusterRouterFactory rf,
- FailoverMapper failoverMapper,
- long statsSendPeriod)
+
+ private DefaultClusteredPostOffice(DataSource ds,
+ TransactionManager tm,
+ Properties sqlProperties,
+ boolean createTablesOnStartup,
+ int nodeId,
+ String officeName,
+ MessageStore ms,
+ PersistenceManager pm,
+ TransactionRepository tr,
+ FilterFactory filterFactory,
+ QueuedExecutorPool pool,
+ String groupName,
+ long stateTimeout, long castTimeout,
+ MessagePullPolicy redistributionPolicy,
+ ClusterRouterFactory rf,
+ FailoverMapper failoverMapper,
+ long statsSendPeriod)
{
- super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr, filterFactory,
- pool);
+ super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
+ filterFactory, pool);
- this.pm = pm;
-
this.groupName = groupName;
this.stateTimeout = stateTimeout;
@@ -287,18 +298,18 @@
log.warn("Attempt to start() but " + this + " is already started");
}
- if (syncChannelConfigE != null)
+ if (syncChannelConfigElement != null)
{
- this.syncChannel = new JChannel(syncChannelConfigE);
- this.asyncChannel = new JChannel(asyncChannelConfigE);
+ this.syncChannel = new JChannel(syncChannelConfigElement);
+ this.asyncChannel = new JChannel(asyncChannelConfigElement);
}
else
{
- this.syncChannel = new JChannel(syncChannelConfigS);
- this.asyncChannel = new JChannel(asyncChannelConfigS);
+ this.syncChannel = new JChannel(syncChannelConfig);
+ this.asyncChannel = new JChannel(asyncChannelConfig);
}
- //We don't want to receive local messages on any of the channels
+ // We don't want to receive local messages on any of the channels
syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
@@ -309,8 +320,10 @@
this.controlMembershipListener = new ControlMembershipListener();
- this.controlMessageDispatcher = new MessageDispatcher(syncChannel, controlMessageListener,
- controlMembershipListener, requestHandler, true);
+ this.controlMessageDispatcher =
+ new MessageDispatcher(syncChannel, controlMessageListener,
+ controlMembershipListener, requestHandler, true);
+
this.dataReceiver = new DataReceiver();
asyncChannel.setReceiver(dataReceiver);
@@ -596,7 +609,7 @@
if (failoverNode == null)
{
- throw new IllegalArgumentException("Cannot find failover node for node " + nodeId);
+ return nodeId;
}
return failoverNode.intValue();
@@ -612,7 +625,7 @@
{
Map m = (Map)replicatedData.get(key);
- return m == null ? null : Collections.unmodifiableMap(m);
+ return m == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(m);
}
finally
{
@@ -622,16 +635,16 @@
public void putReplicant(Serializable key, Serializable replicant) throws Exception
{
- putReplicantFromCluster(this.nodeId, key, replicant);
+ putReplicantLocally(nodeId, key, replicant);
- PutReplicantRequest request = new PutReplicantRequest(this.nodeId, key, replicant);
+ PutReplicantRequest request = new PutReplicantRequest(nodeId, key, replicant);
syncSendRequest(request);
}
public boolean removeReplicant(Serializable key) throws Exception
{
- if (removeReplicantFromCluster(this.nodeId, key))
+ if (removeReplicantLocally(this.nodeId, key))
{
RemoveReplicantRequest request = new RemoveReplicantRequest(this.nodeId, key);
@@ -679,8 +692,12 @@
lock.writeLock().release();
}
}
-
- public void putReplicantFromCluster(int nodeId, Serializable key, Serializable replicant) throws Exception
+
+ /**
+ * @param originatorNodeID - the ID of the node that initiated the modification.
+ */
+ public void putReplicantLocally(int originatorNodeID, Serializable key, Serializable replicant)
+ throws Exception
{
lock.writeLock().acquire();
@@ -697,15 +714,18 @@
m.put(new Integer(nodeId), replicant);
- notifyListeners(key, m);
+ notifyListeners(key, m);
}
finally
{
lock.writeLock().release();
}
}
-
- public boolean removeReplicantFromCluster(int nodeId, Serializable key) throws Exception
+
+ /**
+ * @param originatorNodeID - the ID of the node that initiated the modification.
+ */
+ public boolean removeReplicantLocally(int originatorNodeID, Serializable key) throws Exception
{
lock.writeLock().acquire();
@@ -730,7 +750,7 @@
replicatedData.remove(key);
}
- notifyListeners(key, m);
+ notifyListeners(key, m);
return true;
}
@@ -1658,7 +1678,7 @@
protected void loadBindings() throws Exception
{
- if (trace) { log.trace(this.nodeId + " loading bindings"); }
+ log.debug(this.nodeId + " loading bindings");
boolean isState = syncChannel.getState(null, stateTimeout);
@@ -1728,18 +1748,17 @@
}
/*
- * Removes all non durable binding data, and any local replicant data for the specified
- * node
+ * Removes all non durable binding data, and any local replicant data for the specified node.
*/
- private void removeDataForNode(Integer parameterNodeId) throws Exception
+ private void removeDataForNode(Integer nodeID) throws Exception
{
- log.info("Node " + parameterNodeId + " requested to leave cluster");
+ log.info("Node " + nodeID + " requested to leave cluster");
lock.writeLock().acquire();
try
{
- Map nameMap = (Map)nameMaps.get(parameterNodeId);
+ Map nameMap = (Map)nameMaps.get(nodeID);
if (nameMap != null)
{
@@ -1765,30 +1784,26 @@
{
Binding binding = (Binding)iter.next();
- removeBinding(parameterNodeId.intValue(), binding.getQueue().getName());
+ removeBinding(nodeID.intValue(), binding.getQueue().getName());
}
}
- //We need to remove any replicant data for the node
- //this includes the node-address info
- Iterator iter = replicatedData.entrySet().iterator();
-
- while (iter.hasNext())
+ // We need to remove any replicant data for the node. This includes the node-address info.
+ for(Iterator i = replicatedData.entrySet().iterator(); i.hasNext(); )
{
- Map.Entry entry = (Map.Entry)iter.next();
+ Map.Entry entry = (Map.Entry)i.next();
String key = (String)entry.getKey();
-
Map replicants = (Map)entry.getValue();
- replicants.remove(parameterNodeId);
+ replicants.remove(nodeID);
if (replicants.isEmpty())
{
- iter.remove();
+ i.remove();
}
- //Need to trigger listeners
+ // Need to trigger listeners
notifyListeners(key, replicants);
}
}
@@ -1800,12 +1815,9 @@
private void notifyListeners(Serializable key, Map replicants)
{
- Iterator iter = replicationListeners.iterator();
-
- while (iter.hasNext())
+ for (Iterator i = replicationListeners.iterator(); i.hasNext(); )
{
- ReplicationListener listener = (ReplicationListener)iter.next();
-
+ ReplicationListener listener = (ReplicationListener)i.next();
listener.onReplicationChange(key, replicants);
}
}
@@ -1815,17 +1827,15 @@
*/
private void syncSendRequest(ClusterRequest request) throws Exception
{
- if (trace) { log.info(this.nodeId + " sending synch request to group, request: " + request); }
+ if (trace) { log.trace(this.nodeId + " sending synch request to group, request: " + request); }
- System.out.println("***************Request Sent **************");
-
byte[] bytes = writeRequest(request);
Message message = new Message(null, null, bytes);
controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
- if (trace) { log.info(this.nodeId + " sent and executed ok"); }
+ if (trace) { log.trace(this.nodeId + " sent and executed ok"); }
}
@@ -1835,7 +1845,7 @@
lock.readLock().acquire();
try
{
- Map map = this.getReplicants(ADDRESS_INFO_KEY);
+ Map map = getReplicants(ADDRESS_INFO_KEY);
if (map == null)
{
@@ -2049,25 +2059,29 @@
}
/*
- * Given a JGroups view, generate a map of node to failover node
- * The mapping is determined by a pluggable policy
+ * Given a JGroups view, generate a map of node to failover node.
+ * The mapping is determined by a pluggable policy.
*/
private void generateFailoverMap(View view) throws Exception
{
List nodes = new ArrayList();
- Iterator iter = view.getMembers().iterator();
-
- while (iter.hasNext())
+ for (Iterator i = view.getMembers().iterator(); i.hasNext(); )
{
- Address address = (Address)iter.next();
+ Address address = (Address)i.next();
+
+ // Ignore own address, a node can't be its own failover node
+ if (syncChannel.getLocalAddress().equals(address))
+ {
+ continue;
+ }
+
+ // Convert to node id
+ // TODO this should be optimised - currently the implementation of the lookup
+ // is a bit tortuous
- //Convert to node id
- //TODO this should be optimised - currently the implementation of the lookup
- //is a bit tortuous
+ Integer n = getNodeIdForSyncAddress(address);
- Integer n = this.getNodeIdForSyncAddress(address);
-
if (n == null)
{
throw new IllegalStateException("Cannot find node id for address: " + address);
@@ -2078,23 +2092,75 @@
List failoverNodes = failoverMapper.generateMapping(nodes);
- //Now put this in the map of node -> failover node
+ // Now put this in the map of node -> failover node
failoverMap.clear();
-
- iter = nodes.iterator();
+
+ Iterator iter = nodes.iterator();
Iterator iter2 = failoverNodes.iterator();
-
+
while (iter.hasNext())
{
Integer node = (Integer)iter.next();
-
+
Integer failoverNode = (Integer)iter2.next();
-
+
failoverMap.put(node, failoverNode);
- }
+ }
}
-
+
+ /*
+ * A new node has joined the group
+ */
+ private void nodeJoined(Address address) throws Exception
+ {
+ // We need to regenerate the failover map
+
+ generateFailoverMap(currentView);
+ }
+
+ /*
+ * A node has left the group
+ */
+ private void nodeLeft(Address address) throws Throwable
+ {
+ Integer nodeId = getNodeIdForSyncAddress(address);
+
+ if (nodeId != null)
+ {
+ throw new IllegalStateException("Cannot find node id for address " + address);
+ }
+
+ boolean crashed = !this.leaveMessageReceived(nodeId);
+
+ if (trace) { log.trace("Node " + address + " id: " + nodeId +" has left the group, crashed = " + crashed); }
+
+ //Cleanup any hanging transactions - we do this irrespective of whether we crashed
+ check(nodeId);
+
+ //Need to evaluate this before we regenerate the failover map
+ boolean isFailover = isFailoverNodeForNode(nodeId.intValue());
+
+ //Now we recalculate the failover mapping - this needs to be done before removeDataForNode is called
+ //since that may cause connection factories to be rebound
+ generateFailoverMap(currentView);
+
+ //Remove any replicant data and non durable bindings for the node - again we need to do this
+ //irrespective of whether we crashed
+ //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
+ removeDataForNode(nodeId);
+
+ if (crashed && isFailover)
+ {
+ //The node crashed and we are the failover node
+ //so let's perform failover
+
+ //TODO server side valve
+
+ failOver(nodeId.intValue());
+ }
+ }
+
// Inner classes -------------------------------------------------------------------
/*
@@ -2171,58 +2237,6 @@
}
/*
- * A new node has joined the group
- */
- private void nodeJoined(Address address) throws Exception
- {
- //We need to regenerate the failover map
-
- generateFailoverMap(currentView);
- }
-
- /*
- * A node has left the group
- */
- private void nodeLeft(Address address) throws Throwable
- {
- Integer nodeId = getNodeIdForSyncAddress(address);
-
- if (nodeId != null)
- {
- throw new IllegalStateException("Cannot find node id for address " + address);
- }
-
- boolean crashed = !this.leaveMessageReceived(nodeId);
-
- if (trace) { log.trace("Node " + address + " id: " + nodeId +" has left the group, crashed = " + crashed); }
-
- //Cleanup any hanging transactions - we do this irrespective of whether we crashed
- check(nodeId);
-
- //Need to evaluate this before we regenerate the failover map
- boolean isFailover = isFailoverNodeForNode(nodeId.intValue());
-
- //Now we recalculate the failover mapping - this needs to be done before removeDataForNode is called
- //since that may cause connection factories to be rebound
- generateFailoverMap(currentView);
-
- //Remove any replicant data and non durable bindings for the node - again we need to do this
- //irrespective of whether we crashed
- //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
- removeDataForNode(nodeId);
-
- if (crashed && isFailover)
- {
- //The node crashed and we are the failover node
- //so let's perform failover
-
- //TODO server side valve
-
- failOver(nodeId.intValue());
- }
- }
-
- /*
* We use this class so we notice when members leave the group
*/
private class ControlMembershipListener implements MembershipListener
@@ -2241,7 +2255,7 @@
{
if (trace) { log.trace(nodeId + " Got new view, size=" + view.size()); }
- //JGroups will make sure this method is never called by more than one thread concurrently
+ // JGroups will make sure this method is never called by more than one thread concurrently
View oldView = currentView;
@@ -2359,9 +2373,7 @@
ClusterRequest request = readRequest(bytes);
- Object result = request.execute(DefaultClusteredPostOffice.this);
-
- return result;
+ return request.execute(DefaultClusteredPostOffice.this);
}
catch (Throwable e)
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-12-03 22:41:30 UTC (rev 1694)
@@ -52,9 +52,9 @@
void handleNodeLeft(int nodeId) throws Exception;
- void putReplicantFromCluster(int nodeId, Serializable key, Serializable replicant) throws Exception;
+ void putReplicantLocally(int nodeId, Serializable key, Serializable replicant) throws Exception;
- boolean removeReplicantFromCluster(int nodeId, Serializable key) throws Exception;
+ boolean removeReplicantLocally(int nodeId, Serializable key) throws Exception;
void routeFromCluster(Message message, String routingKey, Map queueNameNodeIdMap) throws Exception;
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PutReplicantRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PutReplicantRequest.java 2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PutReplicantRequest.java 2006-12-03 22:41:30 UTC (rev 1694)
@@ -40,54 +40,74 @@
*/
class PutReplicantRequest extends ClusterRequest
{
+ // Constants -----------------------------------------------------
+
static final int TYPE = 12;
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
private int nodeId;
-
private Serializable key;
-
private Serializable replicant;
-
+
+ // Constructors --------------------------------------------------
+
PutReplicantRequest()
- {
+ {
}
-
+
PutReplicantRequest(int nodeId, Serializable key, Serializable replicant)
{
- this.nodeId = nodeId;
-
+ this.nodeId = nodeId;
+
this.key = key;
-
+
this.replicant = replicant;
}
-
- Object execute(PostOfficeInternal office) throws Exception
- {
- office.putReplicantFromCluster(nodeId, key, replicant);
-
- return null;
- }
-
- byte getType()
- {
- return TYPE;
- }
+ // Streamable implementation -------------------------------------
+
public void read(DataInputStream in) throws Exception
{
nodeId = in.readInt();
-
+
key = (Serializable)StreamUtils.readObject(in, true);
-
+
replicant = (Serializable)StreamUtils.readObject(in, true);
}
public void write(DataOutputStream out) throws Exception
{
- out.writeInt(nodeId);
-
+ out.writeInt(nodeId);
+
StreamUtils.writeObject(out, key, true, true);
-
+
StreamUtils.writeObject(out, replicant, true, true);
}
+
+ // ClusterRequest overrides --------------------------------------
+
+ Object execute(PostOfficeInternal office) throws Exception
+ {
+ office.putReplicantLocally(nodeId, key, replicant);
+ return null;
+ }
+
+ byte getType()
+ {
+ return TYPE;
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoveReplicantRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoveReplicantRequest.java 2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoveReplicantRequest.java 2006-12-03 22:41:30 UTC (rev 1694)
@@ -48,7 +48,7 @@
Object execute(PostOfficeInternal office) throws Exception
{
- office.removeReplicantFromCluster(nodeId, key);
+ office.removeReplicantLocally(nodeId, key);
return null;
}
More information about the jboss-cvs-commits
mailing list