Author: pferraro
Date: 2009-02-05 13:55:04 -0500 (Thu, 05 Feb 2009)
New Revision: 2267
Modified:
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/ClusteredMCMPHandler.java
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/ClusteredMCMPHandlerImpl.java
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/HAModClusterService.java
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/rpc/MCMPServerDiscoveryEvent.java
Log:
[MODCLUSTER-47] Master node enters tight loop calling getClusterCoordinatorState on remote
node
Modified:
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/ClusteredMCMPHandler.java
===================================================================
---
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/ClusteredMCMPHandler.java 2009-02-05
18:12:08 UTC (rev 2266)
+++
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/ClusteredMCMPHandler.java 2009-02-05
18:55:04 UTC (rev 2267)
@@ -25,10 +25,11 @@
import java.util.List;
import java.util.Set;
+import org.jboss.modcluster.ha.rpc.MCMPServerDiscoveryEvent;
+import org.jboss.modcluster.ha.rpc.PeerMCMPDiscoveryStatus;
import org.jboss.modcluster.mcmp.MCMPHandler;
import org.jboss.modcluster.mcmp.MCMPServer;
import org.jboss.modcluster.mcmp.MCMPServerState;
-import org.jboss.modcluster.ha.rpc.MCMPServerDiscoveryEvent;
/**
* @author Brian Stansberry
@@ -38,7 +39,7 @@
public static final String HA_SERVICE_NAME = "ModClusterService";
List<MCMPServerDiscoveryEvent> getPendingDiscoveryEvents();
- void discoveryEventsReceived(MCMPServerDiscoveryEvent lastReceived);
+ void discoveryEventsReceived(PeerMCMPDiscoveryStatus status);
Set<MCMPServerState> updateServersFromMasterNode(Set<MCMPServer>
masterList);
Modified:
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/ClusteredMCMPHandlerImpl.java
===================================================================
---
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/ClusteredMCMPHandlerImpl.java 2009-02-05
18:12:08 UTC (rev 2266)
+++
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/ClusteredMCMPHandlerImpl.java 2009-02-05
18:55:04 UTC (rev 2267)
@@ -27,6 +27,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -45,6 +46,7 @@
import org.jboss.modcluster.ha.rpc.GroupRpcResponse;
import org.jboss.modcluster.ha.rpc.GroupRpcResponseFilter;
import org.jboss.modcluster.ha.rpc.MCMPServerDiscoveryEvent;
+import org.jboss.modcluster.ha.rpc.PeerMCMPDiscoveryStatus;
import org.jboss.modcluster.ha.rpc.StringGroupRpcResponse;
import org.jboss.modcluster.ha.rpc.ThrowableGroupRpcResponse;
import org.jboss.modcluster.mcmp.AbstractMCMPHandler;
@@ -74,8 +76,8 @@
@GuardedBy("errorState")
private final List<Boolean> errorState = new ArrayList<Boolean>();
- @GuardedBy("this")
- private List<MCMPServerDiscoveryEvent> pendingDiscoveryEvents = new
ArrayList<MCMPServerDiscoveryEvent>();
+ @GuardedBy("pendingDiscoveryEvents")
+ private List<MCMPServerDiscoveryEvent> pendingDiscoveryEvents = new
LinkedList<MCMPServerDiscoveryEvent>();
private AtomicInteger discoveryEventIndex = new AtomicInteger();
@@ -124,30 +126,32 @@
* @{inheritDoc}
* @see org.jboss.modcluster.ha.ClusteredMCMPHandler#getPendingDiscoveryEvents()
*/
- public synchronized List<MCMPServerDiscoveryEvent> getPendingDiscoveryEvents()
+ public List<MCMPServerDiscoveryEvent> getPendingDiscoveryEvents()
{
- return new ArrayList<MCMPServerDiscoveryEvent>(this.pendingDiscoveryEvents);
+ synchronized (this.pendingDiscoveryEvents)
+ {
+ return new
ArrayList<MCMPServerDiscoveryEvent>(this.pendingDiscoveryEvents);
+ }
}
/**
* @{inheritDoc}
* @see
org.jboss.modcluster.ha.ClusteredMCMPHandler#discoveryEventsReceived(org.jboss.modcluster.ha.rpc.MCMPServerDiscoveryEvent)
*/
- public synchronized void discoveryEventsReceived(MCMPServerDiscoveryEvent
lastReceived)
+ public void discoveryEventsReceived(PeerMCMPDiscoveryStatus status)
{
- if (lastReceived != null)
+ MCMPServerDiscoveryEvent latestEvent = status.getLatestDiscoveryEvent();
+
+ if (latestEvent != null)
{
- for (Iterator<MCMPServerDiscoveryEvent> it =
this.pendingDiscoveryEvents.iterator(); it.hasNext();)
+ synchronized (this.pendingDiscoveryEvents)
{
- MCMPServerDiscoveryEvent event = it.next();
- if (event.getEventIndex() <= lastReceived.getEventIndex())
+ Iterator<MCMPServerDiscoveryEvent> events =
this.pendingDiscoveryEvents.iterator();
+
+ while (events.hasNext() && (latestEvent.compareTo(events.next())
>= 0))
{
- it.remove();
+ events.remove();
}
- else
- {
- return;
- }
}
}
}
@@ -156,7 +160,7 @@
* @{inheritDoc}
* @see
org.jboss.modcluster.ha.ClusteredMCMPHandler#updateServersFromMasterNode(java.util.Set)
*/
- public synchronized Set<MCMPServerState>
updateServersFromMasterNode(Set<MCMPServer> masterList)
+ public Set<MCMPServerState> updateServersFromMasterNode(Set<MCMPServer>
masterList)
{
for (MCMPServer server : masterList)
{
@@ -184,7 +188,7 @@
{
synchronized (this.errorState)
{
- return this.errorState.size() > 0 &&
(this.errorState.get(this.errorState.size() - 1).booleanValue() == false);
+ return !this.errorState.isEmpty() &&
!this.errorState.get(this.errorState.size() - 1).booleanValue();
}
}
@@ -196,7 +200,7 @@
{
synchronized (this.errorState)
{
- if (this.errorState.size() > 0)
+ if (!this.errorState.isEmpty())
{
this.errorState.set(0, Boolean.TRUE);
}
@@ -224,7 +228,7 @@
* @{inheritDoc}
* @see org.jboss.modcluster.mcmp.MCMPHandler#addProxy(java.net.InetAddress, int)
*/
- public synchronized void addProxy(InetAddress address, int port)
+ public void addProxy(InetAddress address, int port)
{
if (this.singleton.isMasterNode())
{
@@ -249,7 +253,7 @@
* @{inheritDoc}
* @see org.jboss.modcluster.mcmp.MCMPHandler#removeProxy(java.net.InetAddress, int)
*/
- public synchronized void removeProxy(InetAddress address, int port)
+ public void removeProxy(InetAddress address, int port)
{
if (this.singleton.isMasterNode())
{
@@ -424,19 +428,25 @@
}
}
- private synchronized void sendDiscoveryEventToPartition(InetAddress address, int port,
boolean addition)
+ private void sendDiscoveryEventToPartition(InetAddress address, int port, boolean
addition)
{
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
- MCMPServerDiscoveryEvent event = new
MCMPServerDiscoveryEvent(this.serviceKeyProvider.getHAPartition().getClusterNode(),
socketAddress, addition, this.discoveryEventIndex.incrementAndGet());
- this.pendingDiscoveryEvents.add(event);
- GroupRpcResponse response = this.rpcStub.mcmpServerDiscoveryEvent(event);
-
- if (response instanceof ThrowableGroupRpcResponse)
+ synchronized (this.pendingDiscoveryEvents)
{
- // Just log it; we'll retry later
- String msg = addition ? "modcluster.error.discovery.add" :
"modcluster.error.discovery.remove";
- log.error(this.sm.getString(msg, address, Integer.valueOf(port)),
((ThrowableGroupRpcResponse) response).getValue());
+ // Ensure discovery event enters queue sequentially by index
+ MCMPServerDiscoveryEvent event = new
MCMPServerDiscoveryEvent(this.serviceKeyProvider.getHAPartition().getClusterNode(),
socketAddress, addition, this.discoveryEventIndex.incrementAndGet());
+
+ this.pendingDiscoveryEvents.add(event);
+
+ GroupRpcResponse response = this.rpcStub.mcmpServerDiscoveryEvent(event);
+
+ if (response instanceof ThrowableGroupRpcResponse)
+ {
+ // Just log it; we'll retry later
+ String msg = addition ? "modcluster.error.discovery.add" :
"modcluster.error.discovery.remove";
+ log.error(this.sm.getString(msg, address, Integer.valueOf(port)),
((ThrowableGroupRpcResponse) response).getValue());
+ }
}
}
Modified:
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/HAModClusterService.java
===================================================================
---
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/HAModClusterService.java 2009-02-05
18:12:08 UTC (rev 2266)
+++
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/HAModClusterService.java 2009-02-05
18:55:04 UTC (rev 2267)
@@ -103,8 +103,7 @@
final MCMPRequestFactory requestFactory;
final ClusteredMCMPHandler clusteredHandler;
final HASingletonAwareResetRequestSource resetRequestSource;
- final Map<ClusterNode, MCMPServerDiscoveryEvent> proxyChangeDigest =
- new HashMap<ClusterNode, MCMPServerDiscoveryEvent>();
+ final Map<ClusterNode, MCMPServerDiscoveryEvent> proxyChangeDigest = new
HashMap<ClusterNode, MCMPServerDiscoveryEvent>();
final ModClusterServiceDRMEntry drmEntry;
/**
@@ -524,6 +523,7 @@
/**
* This is the object that gets invoked on via reflection by HAPartition.
*/
+ @SuppressWarnings("synthetic-access")
protected class RpcHandler extends HASingletonImpl<HAServiceEvent>.RpcHandler
implements ModClusterServiceRpcHandler<GroupRpcResponse, MCMPServer>,
ClusteredMCMPHandlerRpcHandler, ResetRequestSourceRpcHandler<GroupRpcResponse>
{
private final HAModClusterService coord = HAModClusterService.this;
@@ -597,23 +597,23 @@
{
HAPartition partition = this.coord.getHAPartition();
ClusterNode cn = partition.getClusterNode();
- PeerMCMPDiscoveryStatus newStatus = statuses.get(cn);
- if (newStatus != null)
+ PeerMCMPDiscoveryStatus status = statuses.get(cn);
+ if (status != null)
{
// Notify our handler that discovery events have been processed
-
this.coord.clusteredHandler.discoveryEventsReceived(newStatus.getLatestDiscoveryEvent());
+ this.coord.clusteredHandler.discoveryEventsReceived(status);
// Notify our handler that any reset requests have been processed
this.coord.clusteredHandler.recordResetSuccess();
DistributedReplicantManager drm =
partition.getDistributedReplicantManager();
String key = this.coord.getHAServiceKey();
- ModClusterServiceDRMEntry oldStatus = (ModClusterServiceDRMEntry)
drm.lookupLocalReplicant(key);
- if (!newStatus.equals(oldStatus))
+ ModClusterServiceDRMEntry previousStatus = (ModClusterServiceDRMEntry)
drm.lookupLocalReplicant(key);
+ if (!status.equals(previousStatus))
{
try
{
- drm.add(key, new ModClusterServiceDRMEntry(cn,
newStatus.getMCMPServerStates(), oldStatus.getJvmRoutes()));
+ drm.add(key, new ModClusterServiceDRMEntry(cn,
status.getMCMPServerStates(), previousStatus.getJvmRoutes()));
}
catch (Exception e)
{
@@ -722,6 +722,7 @@
}
}
+ @SuppressWarnings("synthetic-access")
private class ClusteredCatalinaEventHandler extends CatalinaEventHandler
{
private final HAModClusterService coord = HAModClusterService.this;
@@ -767,7 +768,7 @@
{
this.checkInit();
- log.debug(this.coord.sm.getString("modcluster.engine.status",
engine.getName()));
+
this.coord.log.debug(this.coord.sm.getString("modcluster.engine.status",
engine.getName()));
this.coord.latestLoad = this.getLoadBalanceFactor();
@@ -794,7 +795,7 @@
HAPartition partition = this.coord.getHAPartition();
DistributedReplicantManager drm = partition.getDistributedReplicantManager();
boolean resync = false;
-
+
do
{
resync = false;
@@ -817,7 +818,7 @@
nonresponsive.remove(partition.getClusterNode());
// FIXME -- what about our own dropped discovery events if we just became
master?
- List responses = this.coord.getClusterCoordinatorState(masterList);
+ List<?> responses = this.coord.getClusterCoordinatorState(masterList);
// Gather up all the reset requests in one list
// FIXME -- what about our own dropped requests if we just became master?
@@ -826,7 +827,7 @@
// Gather all the load balance factors
loadBalanceFactors.clear();
- // Add our own lbf - it is not returned via getclusterCoordinatorState(...)
+ // Add our own lbf - it is not returned via getClusterCoordinatorState(...)
for (String jvmRoute: this.coord.drmEntry.getJvmRoutes())
{
loadBalanceFactors.put(jvmRoute, Integer.valueOf(this.coord.latestLoad));
@@ -840,33 +841,31 @@
if (response instanceof ModClusterServiceStateGroupRpcResponse)
{
ModClusterServiceStateGroupRpcResponse mcssgrr =
(ModClusterServiceStateGroupRpcResponse) response;
- ClusterNode cn = mcssgrr.getSender();
+ ClusterNode node = mcssgrr.getSender();
// Check for discovery events we haven't processed
- MCMPServerDiscoveryEvent latestEvent = latestEvents.get(cn);
-
- for (MCMPServerDiscoveryEvent toCheck:
mcssgrr.getUnacknowledgedEvents())
+ MCMPServerDiscoveryEvent latestEvent = latestEvents.get(node);
+
+ for (MCMPServerDiscoveryEvent event:
mcssgrr.getUnacknowledgedEvents())
{
- if (latestEvent != null && latestEvent.getEventIndex() <=
toCheck.getEventIndex())
+ if ((latestEvent == null) || (latestEvent.compareTo(event) < 0))
{
- continue; // already processed it
+ InetSocketAddress socketAddress = event.getMCMPServer();
+ if (event.isAddition())
+ {
+ this.coord.localHandler.addProxy(socketAddress.getAddress(),
socketAddress.getPort());
+ }
+ else
+ {
+
this.coord.localHandler.removeProxy(socketAddress.getAddress(), socketAddress.getPort());
+ }
+ resync = true;
}
-
- InetSocketAddress socketAddress = toCheck.getMCMPServer();
- if (toCheck.isAddition())
- {
- this.coord.localHandler.addProxy(socketAddress.getAddress(),
socketAddress.getPort());
- }
- else
- {
- this.coord.localHandler.removeProxy(socketAddress.getAddress(),
socketAddress.getPort());
- }
- resync = true;
}
if (!resync) // don't bother if we are going to start over
{
- statuses.put(cn, new PeerMCMPDiscoveryStatus(cn,
mcssgrr.getStates(), latestEvent));
+ statuses.put(node, new PeerMCMPDiscoveryStatus(node,
mcssgrr.getStates(), latestEvent));
List<MCMPRequest> toAdd = mcssgrr.getResetRequests();
if (toAdd != null)
@@ -874,7 +873,7 @@
resetRequests.addAll(toAdd);
}
- ModClusterServiceDRMEntry removed = nonresponsive.remove(cn);
+ ModClusterServiceDRMEntry removed = nonresponsive.remove(node);
if (removed != null)
{
Integer lbf = Integer.valueOf(mcssgrr.getLoadBalanceFactor());
@@ -890,18 +889,18 @@
ThrowableGroupRpcResponse tgrr = (ThrowableGroupRpcResponse) response;
ClusterNode cn = tgrr.getSender();
-
log.warn(this.coord.sm.getString("modcluster.error.rpc.known",
"getClusterCoordinatorState", cn), tgrr.getValue());
+
this.coord.log.warn(this.coord.sm.getString("modcluster.error.rpc.known",
"getClusterCoordinatorState", cn), tgrr.getValue());
// Don't remove from nonresponsive list and we'll pass back an
error
// status (null server list) to this peer
}
else if (response instanceof Throwable)
{
-
log.warn(this.coord.sm.getString("modcluster.error.rpc.unknown",
"getClusterCoordinatorState"), (Throwable) response);
+
this.coord.log.warn(this.coord.sm.getString("modcluster.error.rpc.unknown",
"getClusterCoordinatorState"), (Throwable) response);
}
else
{
-
log.error(this.coord.sm.getString("modcluster.error.rpc.unexpected", response,
"getClusterCoordinatorState"));
+
this.coord.log.error(this.coord.sm.getString("modcluster.error.rpc.unexpected",
response, "getClusterCoordinatorState"));
}
}
}
@@ -931,6 +930,7 @@
{
statusRequests.add(this.coord.requestFactory.createStatusRequest(entry.getKey(),
entry.getValue().intValue()));
}
+
this.coord.localHandler.sendRequests(statusRequests);
// Advise the members the process is done and that they should update DRM
Modified:
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/rpc/MCMPServerDiscoveryEvent.java
===================================================================
---
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/rpc/MCMPServerDiscoveryEvent.java 2009-02-05
18:12:08 UTC (rev 2266)
+++
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ha/rpc/MCMPServerDiscoveryEvent.java 2009-02-05
18:55:04 UTC (rev 2267)
@@ -37,7 +37,7 @@
* @author Brian Stansberry
*/
@Immutable
-public class MCMPServerDiscoveryEvent implements Serializable
+public class MCMPServerDiscoveryEvent implements Serializable,
Comparable<MCMPServerDiscoveryEvent>
{
/** The serialVersionUID */
private static final long serialVersionUID = -4615651826967237065L;
@@ -85,9 +85,30 @@
{
return this.addition;
}
-
+
public int getEventIndex()
{
return this.eventIndex;
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{sender=").append(this.sender);
+ builder.append(", mcmpServer=").append(this.mcmpServer);
+ builder.append(", addition=").append(this.addition);
+ builder.append(", eventIndex=").append(this.eventIndex);
+ builder.append("}");
+ return builder.toString();
+ }
+
+ /**
+ * @{inheritDoc}
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ public int compareTo(MCMPServerDiscoveryEvent event)
+ {
+ return this.eventIndex - event.eventIndex;
+ }
}