[jboss-cvs] JBossAS SVN: r69539 - trunk/cluster/src/main/org/jboss/ha/framework/server.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 1 10:20:29 EST 2008
Author: bstansberry at jboss.com
Date: 2008-02-01 10:20:29 -0500 (Fri, 01 Feb 2008)
New Revision: 69539
Modified:
trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
Log:
[JBAS-4313] Clean up mechanism for excluding duplicate nodes
Clean up how ClusterPartition handles first view
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2008-02-01 15:16:57 UTC (rev 69538)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2008-02-01 15:20:29 UTC (rev 69539)
@@ -29,8 +29,6 @@
import java.io.Serializable;
import java.lang.ref.WeakReference;
import java.net.InetAddress;
-import java.rmi.dgc.VMID;
-import java.rmi.server.UID;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
@@ -57,10 +55,8 @@
import org.jboss.logging.Logger;
import org.jboss.naming.NonSerializableFactory;
import org.jboss.system.ServiceMBeanSupport;
-import org.jboss.system.server.ServerConfigUtil;
import org.jgroups.Channel;
import org.jgroups.ChannelFactory;
-import org.jgroups.Event;
import org.jgroups.ExtendedMessageListener;
//import org.jgroups.JChannel;
import org.jgroups.ExtendedMembershipListener;
@@ -201,7 +197,7 @@
* An exception occuring upon fetch serviceState.
*/
protected Exception setStateException;
- private final Object stateLock = new Object();
+ private final Object channelLock = new Object();
private final MessageListenerAdapter messageListener = new MessageListenerAdapter();
// Static --------------------------------------------------------
@@ -297,45 +293,21 @@
dispatcher.setRequestMarshaller(new RequestMarshallerImpl());
dispatcher.setResponseMarshaller(new ResponseMarshallerImpl());
- // Store our uniqueId in the channel
- configureUniqueId();
-
channel.connect(getPartitionName());
try
{
- // get current JG group properties
-
+ // get current JG group properties
log.debug("get nodeName");
this.localJGAddress = (IpAddress)channel.getLocalAddress();
this.me = new ClusterNodeImpl(this.localJGAddress);
this.nodeName = this.me.getName();
- // FIXME -- just block waiting for viewAccepted!
log.debug("Get current members");
- View view = channel.getView();
- this.jgmembers = (Vector)view.getMembers().clone();
- this.members = translateAddresses(this.jgmembers); // TRANSLATE
- log.info("Number of cluster members: " + members.size());
- for(int m = 0; m > members.size(); m ++)
- {
- Object node = members.get(m);
- log.debug(node);
- }
- // Keep a list of other members only for "exclude-self" RPC calls
-
- this.jgotherMembers = (Vector)view.getMembers().clone();
- this.jgotherMembers.remove (channel.getLocalAddress());
- this.otherMembers = translateAddresses(this.jgotherMembers); // TRANSLATE
- log.info ("Other members: " + this.otherMembers.size ());
+ waitForView();
- verifyNodeIsUnique(view.getMembers());
+ verifyNodeIsUnique();
- // Update the initial view id
- this.currentViewId = view.getVid().getId();
-
- // We must now synchronize new serviceState transfer subscriber
- //
fetchState();
replicantManager.startService();
@@ -349,12 +321,12 @@
asynchHandler.start();
// Bind ourself in the public JNDI space if configured to do so
- if (!bindIntoJndi)
- return;
-
- Context ctx = new InitialContext();
- this.bind("/HAPartition/" + getPartitionName(), this, ClusterPartition.class, ctx);
- log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
+ if (bindIntoJndi)
+ {
+ Context ctx = new InitialContext();
+ this.bind("/HAPartition/" + getPartitionName(), this, ClusterPartition.class, ctx);
+ log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
+ }
}
catch (Throwable t)
{
@@ -390,7 +362,8 @@
// add the destroyPartition() step
try
{
- channel.disconnect();
+ if (channel.isConnected())
+ channel.disconnect();
}
catch (Exception e)
{
@@ -457,7 +430,7 @@
boolean rc = channel.getState(null, getStateTransferTimeout());
if (rc)
{
- synchronized (stateLock)
+ synchronized (channelLock)
{
while (!isStateSet)
{
@@ -466,7 +439,7 @@
try
{
- stateLock.wait();
+ channelLock.wait();
}
catch (InterruptedException iex)
{
@@ -635,12 +608,11 @@
setStateException = new Exception(t);
}
- private void notifyStateTransferCompleted()
+ private void notifyChannelLock()
{
- synchronized (stateLock)
- {
- // Notify wait that serviceState has been set.
- stateLock.notifyAll();
+ synchronized (channelLock)
+ {
+ channelLock.notifyAll();
}
}
@@ -681,11 +653,9 @@
try
{
// we update the view id
- //
this.currentViewId = newView.getVid().getId();
// Keep a list of other members only for "exclude-self" RPC calls
- //
this.jgotherMembers = (Vector)newView.getMembers().clone();
this.jgotherMembers.remove (channel.getLocalAddress());
this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
@@ -699,18 +669,28 @@
Vector newjgMembers = (Vector)newView.getMembers().clone();
Vector newMembers = translateAddresses(newjgMembers); // TRANSLATE
- if (this.members == null)
+ this.members = newMembers;
+ this.jgmembers = newjgMembers;
+
+ if (oldMembers == null)
{
// Initial viewAccepted
- this.members = newMembers;
- this.jgmembers = newjgMembers;
log.debug("ViewAccepted: initial members set for partition " + getPartitionName() + ": " +
this.currentViewId + " (" + this.members + ")");
+
+ log.info("Number of cluster members: " + members.size());
+ for(int m = 0; m > members.size(); m ++)
+ {
+ Object node = members.get(m);
+ log.debug(node);
+ }
+ log.info ("Other members: " + this.otherMembers.size ());
+
+ // Wake up the deployer thread blocking in waitForView
+ notifyChannelLock();
return;
- }
- this.members = newMembers;
- this.jgmembers = newjgMembers;
-
+ }
+
int difference = 0;
if (oldMembers == null)
difference = newMembers.size () - 1;
@@ -757,6 +737,26 @@
}
}
+ private void waitForView() throws Exception
+ {
+ synchronized (channelLock)
+ {
+ if (this.members == null)
+ {
+ try
+ {
+ channelLock.wait(getMethodCallTimeout());
+ }
+ catch (InterruptedException iex)
+ {
+ }
+
+ if (this.members == null)
+ throw new IllegalStateException("No view received from Channel");
+ }
+ }
+ }
+
// HAPartition implementation ----------------------------------------------
public String getNodeName()
@@ -796,9 +796,12 @@
public ClusterNode[] getClusterNodes ()
{
- ClusterNode[] nodes = new ClusterNode[this.members.size()];
- nodes = (ClusterNode[]) this.members.toArray(nodes);
- return nodes;
+ synchronized (members)
+ {
+ ClusterNode[] nodes = new ClusterNode[this.members.size()];
+ nodes = (ClusterNode[]) this.members.toArray(nodes);
+ return nodes;
+ }
}
public ClusterNode getClusterNode ()
@@ -1196,24 +1199,32 @@
// Protected -----------------------------------------------------
- protected void verifyNodeIsUnique (Vector javaGroupIpAddresses) throws Exception
+ protected void verifyNodeIsUnique () throws IllegalStateException
{
- byte[] localUniqueName = this.localJGAddress.getAdditionalData();
- if (localUniqueName == null)
+ ClusterNodeImpl matched = null;
+ for (ClusterNode member : getClusterNodes())
{
- log.error("No additional information has been found in the JGroups address; " +
- "make sure you are running with a correct version of JGroups and that the protocols " +
- "you are using support 'additionalData' behaviour.");
- throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; local node name is missing.");
- }
-
- for (int i = 0; i < javaGroupIpAddresses.size(); i++)
- {
- IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i);
- if (!address.equals(this.localJGAddress))
+ if (member.equals(me))
{
- if (localUniqueName.equals(address.getAdditionalData()))
- throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; another node (" + address + ") publicizing the same name was already there.");
+ if (matched == null)
+ {
+ // We of course are in the view, so we expect one match
+ // Just track that we've had one
+ matched = (ClusterNodeImpl) member;
+ }
+ else
+ {
+ // Two nodes in view match us; try to figure out which one isn't us
+ ClusterNodeImpl other = matched;
+ if (other.getOriginalJGAddress().equals(((ClusterNodeImpl)me).getOriginalJGAddress()))
+ {
+ other = (ClusterNodeImpl) member;
+ }
+ throw new IllegalStateException("Found member " + other +
+ " in current view that duplicates us (" + me + "). This" +
+ " node cannot join partition until duplicate member has" +
+ " been removed");
+ }
}
}
}
@@ -1460,83 +1471,7 @@
}
// Protected --------------------------------------------------------------
-
- protected void configureUniqueId() throws Exception
- {
- // We push the independent name in the protocol stack
- // before connecting to the cluster
- boolean pushNodeName = true;
- String uniqueId = config.getNodeUniqueId();
- if (uniqueId == null || "".equals(uniqueId)) {
-
- // See if additional_data has already been set
- // (likely by the JChannelFactory); if yes,
- // just use that
- IpAddress ourAddr = (IpAddress) channel.getLocalAddress();
- if (ourAddr != null)
- {
- byte[] additional_data = ourAddr.getAdditionalData();
- if (additional_data != null)
- {
- uniqueId = new String(additional_data);
- pushNodeName = false;
- }
- }
- }
- if (uniqueId == null || "".equals(uniqueId)) {
- uniqueId = generateUniqueId();
- }
-
- if (pushNodeName)
- {
- java.util.HashMap staticNodeName = new java.util.HashMap();
- staticNodeName.put("additional_data", uniqueId.getBytes());
- this.channel.down(new Event(Event.CONFIG, staticNodeName));
- }
-
- config.setNodeUniqueId(uniqueId);
- }
-
- protected String generateUniqueId() throws Exception
- {
- // we first try to find a simple meaningful name:
- // 1st) "local-IP:JNDI_PORT" if JNDI is running on this machine
- // 2nd) "local-IP:JMV_GUID" otherwise
- // 3rd) return a fully GUID-based representation
- //
-
- // Before anything we determine the local host IP (and NOT name as this could be
- // resolved differently by other nodes...)
-
- // But use the specified node address for multi-homing
-
- String hostIP = null;
- InetAddress address = ServerConfigUtil.fixRemoteAddress(config.getNodeAddress());
- if (address == null)
- {
- log.debug ("unable to create a GUID for this cluster, check network configuration is correctly setup (getLocalHost has returned an exception)");
- log.debug ("using a full GUID strategy");
- return new VMID().toString();
- }
- else
- {
- hostIP = address.getHostAddress();
- }
-
- // 1st: is JNDI up and running?
- int namingPort = config.getNamingServicePort();
- if (namingPort > 0)
- {
- return hostIP + ":" + namingPort;
- }
-
- // 2nd: host-GUID strategy
- //
- String uid = new UID().toString();
- return hostIP + ":" + uid;
- }
-
/**
* Creates an object from a byte buffer
*/
@@ -1655,7 +1590,8 @@
}
finally
{
- notifyStateTransferCompleted();
+ // Notify waiting thread that serviceState has been set.
+ notifyChannelLock();
}
}
@@ -1714,7 +1650,8 @@
}
finally
{
- notifyStateTransferCompleted();
+ // Notify waiting thread that serviceState has been set.
+ notifyChannelLock();
}
}
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java 2008-02-01 15:16:57 UTC (rev 69538)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java 2008-02-01 15:20:29 UTC (rev 69539)
@@ -58,6 +58,7 @@
private String nodeName;
private int namingServicePort = -1;
private int state;
+ private boolean assignLogicalAddresses = true;
/**
* Overrides the superclass version by generating a unique node id
@@ -85,7 +86,8 @@
// Channel channel = super.createMultiplexerChannel(stack_name, id, register_for_state_transfer, substate_id);
Channel channel = super.createMultiplexerChannel(stack_name, id, false, null);
- setChannelUniqueId(channel);
+ if (assignLogicalAddresses)
+ setChannelUniqueId(channel);
return channel;
}
@@ -132,7 +134,39 @@
{
this.namingServicePort = jndiPort;
}
+
+ /**
+ * Gets whether this factory should create a "logical address" (or use
+ * one set via {@link #setNodeName(String)} and assign it to
+ * any newly created <code>Channel</code> as JGroups "additional_data".
+ *
+ * @see #setAssignLogicalAddresses(boolean)
+ */
+ public boolean getAssignLogicalAddresses()
+ {
+ return assignLogicalAddresses;
+ }
+ /**
+ * Sets whether this factory should create a "logical address" (or use
+ * one set via {@link #setNodeName(String)} and assign it to
+ * any newly created <code>Channel</code> as JGroups "additional_data".
+ * <p>
+ * Any such logical address will be used by <code>HAPartition</code>
+ * to assign a name to the <code>ClusterNode</code> object representing
+ * this node. If a logical address is not set, the <code>ClusterNode</code>
+ * will use the address and port JGroups is using to receive messages to
+ * create its name.
+ * </p>
+ * <p>
+ * Default is <code>true</code>.
+ * </p>
+ */
+ public void setAssignLogicalAddresses(boolean logicalAddresses)
+ {
+ this.assignLogicalAddresses = logicalAddresses;
+ }
+
@Override
public void create() throws Exception
{
More information about the jboss-cvs-commits
mailing list