[jboss-cvs] JBoss Messaging SVN: r1664 - branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 30 12:39:55 EST 2006
Author: timfox
Date: 2006-11-30 12:39:53 -0500 (Thu, 30 Nov 2006)
New Revision: 1664
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
Log:
Second interim commit for failover refactoring
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-30 15:27:59 UTC (rev 1663)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-30 17:39:53 UTC (rev 1664)
@@ -135,6 +135,8 @@
//Map < node, failover node>
private Map failoverMap;
+ private Set leftSet;
+
private Element syncChannelConfigE;
private Element asyncChannelConfigE;
@@ -177,6 +179,8 @@
replicationListeners = new HashSet();
failoverMap = new LinkedHashMap();
+
+ leftSet = new HashSet();
}
/*
@@ -662,6 +666,20 @@
// PostOfficeInternal implementation ------------------------------------------------------------------
+ public void handleNodeLeft(int nodeId) throws Exception
+ {
+ lock.writeLock().acquire();
+
+ try
+ {
+ leftSet.add(new Integer(nodeId));
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
+ }
+
public void putReplicantFromCluster(int nodeId, Serializable key, Serializable replicant) throws Exception
{
lock.writeLock().acquire();
@@ -1227,78 +1245,9 @@
}
}
- /*
- * Removes all non durable binding data, and any local replicant data for the specified
- * node
- */
- public void removeDataForNode(int parameterNodeId) throws Exception
- {
- log.info("Node " + parameterNodeId + " requested to leave cluster");
-
- lock.writeLock().acquire();
-
- try
- {
- Map nameMap = (Map)nameMaps.get(new Integer(parameterNodeId));
-
- if (nameMap != null)
- {
- List toRemove = new ArrayList();
-
- Iterator iter = nameMap.values().iterator();
-
- while (iter.hasNext())
- {
- Binding binding = (Binding)iter.next();
-
- if (!binding.getQueue().isRecoverable())
- {
- //We only remove the non durable bindings - we still need to be able to handle
- //messages for a durable subscription "owned" by a node that is not active any more!
- toRemove.add(binding);
- }
- }
-
- iter = toRemove.iterator();
-
- while (iter.hasNext())
- {
- Binding binding = (Binding)iter.next();
-
- removeBinding(parameterNodeId, binding.getQueue().getName());
- }
- }
-
- //We need to remove any replicant data for the node
- //this includes the node-address info
- Iterator iter = replicatedData.entrySet().iterator();
-
- while (iter.hasNext())
- {
- Map.Entry entry = (Map.Entry)iter.next();
-
- String key = (String)entry.getKey();
-
- Map replicants = (Map)entry.getValue();
-
- replicants.remove(new Integer(parameterNodeId));
-
- if (replicants.isEmpty())
- {
- iter.remove();
- }
-
- //Need to trigger listeners
- notifyListeners(key, replicants);
- }
- }
- finally
- {
- lock.writeLock().release();
- }
- }
+
public int getNodeId()
{
return nodeId;
@@ -1706,27 +1655,27 @@
routerMap.remove(queueName);
}
}
-
+
protected void loadBindings() throws Exception
{
if (trace) { log.trace(this.nodeId + " loading bindings"); }
-
+
boolean isState = syncChannel.getState(null, stateTimeout);
-
+
if (!isState)
{
//Must be first member in group or non clustered- we load the state ourself from the database
-
+
if (trace) { log.trace(this.nodeId + " First member of group- so loading bindings from db"); }
-
+
super.loadBindings();
}
else
{
//The state will be set in due course via the MessageListener - we must wait until this happens
-
+
if (trace) { log.trace(this.nodeId + " Not first member of group- so waiting for state to arrive...."); }
-
+
synchronized (setStateLock)
{
//TODO we should implement a timeout on this
@@ -1735,45 +1684,132 @@
setStateLock.wait();
}
}
-
+
if (trace) { log.trace(this.nodeId + " Received state"); }
}
}
+
+ protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
+ {
+ Queue queue;
+ if (nodeId == this.nodeId)
+ {
+ QueuedExecutor executor = (QueuedExecutor)pool.get();
+
+ queue = new LocalClusteredQueue(this, nodeId, queueName, channelId, ms, pm, true,
+ durable, executor, filter, tr);
+ }
+ else
+ {
+ queue = new RemoteQueueStub(nodeId, queueName, channelId, durable, pm, filter);
+ }
+
+ Binding binding = new DefaultBinding(nodeId, condition, queue, failed);
+
+ return binding;
+ }
+
+ // Private ------------------------------------------------------------------------------------------
+
+ private boolean leaveMessageReceived(Integer nodeId) throws Exception
+ {
+ lock.writeLock().acquire();
+
+ try
+ {
+ boolean removed = leftSet.remove(nodeId);
+
+ return removed;
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
+ }
+
+ /*
+ * Removes all non durable binding data, and any local replicant data for the specified
+ * node
+ */
+ private void removeDataForNode(Integer parameterNodeId) throws Exception
+ {
+ log.info("Node " + parameterNodeId + " requested to leave cluster");
+
+ lock.writeLock().acquire();
- protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
- {
- Queue queue;
- if (nodeId == this.nodeId)
- {
- QueuedExecutor executor = (QueuedExecutor)pool.get();
+ try
+ {
+ Map nameMap = (Map)nameMaps.get(parameterNodeId);
- queue = new LocalClusteredQueue(this, nodeId, queueName, channelId, ms, pm, true,
- durable, executor, filter, tr);
- }
- else
- {
- queue = new RemoteQueueStub(nodeId, queueName, channelId, durable, pm, filter);
- }
-
- Binding binding = new DefaultBinding(nodeId, condition, queue, failed);
-
- return binding;
- }
-
- // Private ------------------------------------------------------------------------------------------
-
- private void notifyListeners(Serializable key, Map replicants)
- {
- Iterator iter = replicationListeners.iterator();
-
- while (iter.hasNext())
- {
- ReplicationListener listener = (ReplicationListener)iter.next();
-
- listener.onReplicationChange(key, replicants);
- }
- }
-
+ if (nameMap != null)
+ {
+ List toRemove = new ArrayList();
+
+ Iterator iter = nameMap.values().iterator();
+
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ if (!binding.getQueue().isRecoverable())
+ {
+ //We only remove the non durable bindings - we still need to be able to handle
+ //messages for a durable subscription "owned" by a node that is not active any more!
+ toRemove.add(binding);
+ }
+ }
+
+ iter = toRemove.iterator();
+
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ removeBinding(parameterNodeId.intValue(), binding.getQueue().getName());
+ }
+ }
+
+ //We need to remove any replicant data for the node
+ //this includes the node-address info
+ Iterator iter = replicatedData.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ String key = (String)entry.getKey();
+
+ Map replicants = (Map)entry.getValue();
+
+ replicants.remove(parameterNodeId);
+
+ if (replicants.isEmpty())
+ {
+ iter.remove();
+ }
+
+ //Need to trigger listeners
+ notifyListeners(key, replicants);
+ }
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
+ }
+
+ private void notifyListeners(Serializable key, Map replicants)
+ {
+ Iterator iter = replicationListeners.iterator();
+
+ while (iter.hasNext())
+ {
+ ReplicationListener listener = (ReplicationListener)iter.next();
+
+ listener.onReplicationChange(key, replicants);
+ }
+ }
+
/*
* Multicast a sync request
*/
@@ -2135,6 +2171,58 @@
}
/*
+ * A new node has joined the group
+ */
+ private void nodeJoined(Address address) throws Exception
+ {
+ //We need to regenerate the failover map
+
+ generateFailoverMap(currentView);
+ }
+
+ /*
+ * A node has left the group
+ */
+ private void nodeLeft(Address address) throws Throwable
+ {
+ Integer nodeId = getNodeIdForSyncAddress(address);
+
+ if (nodeId != null)
+ {
+ throw new IllegalStateException("Cannot find node id for address " + address);
+ }
+
+ boolean crashed = !this.leaveMessageReceived(nodeId);
+
+ if (trace) { log.trace("Node " + address + " id: " + nodeId +" has left the group, crashed = " + crashed); }
+
+ //Cleanup any hanging transactions - we do this irrespective of whether we crashed
+ check(nodeId);
+
+ //Need to evaluate this before we regenerate the failover map
+ boolean isFailover = isFailoverNodeForNode(nodeId.intValue());
+
+ //Now we recalculate the failover mapping - this needs to be done before removeDataForNode is called
+ //since that may cause connection factories to be rebound
+ generateFailoverMap(currentView);
+
+ //Remove any replicant data and non durable bindings for the node - again we need to do this
+ //irrespective of whether we crashed
+ //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
+ removeDataForNode(nodeId);
+
+ if (crashed && isFailover)
+ {
+ //The node crashed and we are the failover node
+ //so let's perform failover
+
+ //TODO server side valve
+
+ failOver(nodeId.intValue());
+ }
+ }
+
+ /*
* We use this class so we notice when members leave the group
*/
private class ControlMembershipListener implements MembershipListener
@@ -2148,83 +2236,55 @@
{
//NOOP
}
-
+
public void viewAccepted(View view)
{
if (trace) { log.trace(nodeId + " Got new view, size=" + view.size()); }
- log.info("JBoss Messaging DefaultClusteredPostOffice Accepted new view:" + view);
-
//JGroups will make sure this method is never called by more than one thread concurrently
- if (currentView != null)
- {
- Iterator iter = currentView.getMembers().iterator();
+ View oldView = currentView;
+
+ currentView = view;
+
+ try
+ {
+ if (oldView != null)
+ {
+ Iterator iter = oldView.getMembers().iterator();
+
+ while (iter.hasNext())
+ {
+ Address address = (Address)iter.next();
+
+ if (!view.containsMember(address))
+ {
+ nodeLeft(address);
+ }
+ }
+ }
+ Iterator iter = view.getMembers().iterator();
+
while (iter.hasNext())
{
Address address = (Address)iter.next();
- if (!view.containsMember(address))
+ if (oldView == null || !oldView.containsMember(address))
{
- //Member must have left
- if (trace) { log.trace(nodeId + " it seems that member " + address + " has left the group"); }
-
- Address currentAddress = syncChannel.getLocalAddress();
-
- //We don't remove bindings for ourself
- if (!address.equals(currentAddress))
- {
- try
- {
- Integer nodeId = getNodeIdForSyncAddress(address);
-
- //Perform a check - the member might have crashed and left uncommitted transactions
- //we need to resolve this - this should be performed irrespective of
- //whether the node crashed or left cleanly
- if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " Performing cleanup for node " + nodeId); }
-
- check(nodeId);
-
- //Remove any non durable binding and replicant data for the exited node
- //this should be performed irrespective of whether the node crashed or left cleanly
- removeDataForNode(nodeId.intValue());
-
- // nodeId==null means the server left the cluster without a problem. It sent a message before
- // leaving the cluster
- if (nodeId != null)
- {
- //This means the node has crashed
- if (trace) { log.trace("The node " + nodeId + " crashed"); }
-
- //If we are the failover node for the failed node then we should perform failover
- if (isFailoverNodeForNode(nodeId.intValue()))
- {
- failOver(nodeId.intValue());
- }
-
- if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " failover complete"); }
- }
- else
- {
- if (trace) { log.trace("The node " + nodeId + " left cleanly"); }
- }
- }
- catch (Throwable e)
- {
- log.error("Caught Exception in MembershipListener", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
- }
- }
+ nodeJoined(address);
}
}
}
-
- currentView = view;
+ catch (Throwable e)
+ {
+ log.error("Caught Exception in MembershipListener", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
}
-
+
public byte[] getState()
{
//NOOP
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java 2006-11-30 15:27:59 UTC (rev 1663)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java 2006-11-30 17:39:53 UTC (rev 1664)
@@ -4,10 +4,15 @@
import java.io.DataOutputStream;
/**
+ *
+ * A LeaveClusterRequest
+ *
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision:$</tt>
- * <p/>
- * $Id:$
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
*/
public class LeaveClusterRequest extends ClusterRequest
{
@@ -30,7 +35,7 @@
Object execute(PostOfficeInternal office) throws Throwable
{
- office.removeDataForNode(nodeId);
+ office.handleNodeLeft(nodeId);
return null;
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-11-30 15:27:59 UTC (rev 1663)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-11-30 17:39:53 UTC (rev 1664)
@@ -49,8 +49,8 @@
void removeBindingFromCluster(int nodeId, String queueName)
throws Exception;
-
- void removeDataForNode(int nodeId) throws Exception;
+
+ void handleNodeLeft(int nodeId) throws Exception;
void putReplicantFromCluster(int nodeId, Serializable key, Serializable replicant) throws Exception;
More information about the jboss-cvs-commits
mailing list