Author: pferraro
Date: 2008-09-25 21:23:03 -0400 (Thu, 25 Sep 2008)
New Revision: 1907
Modified:
trunk/mod_cluster/src/main/java/org/jboss/modcluster/DefaultJBossWebEventHandler.java
trunk/mod_cluster/src/main/java/org/jboss/modcluster/JBossWebEventHandler.java
trunk/mod_cluster/src/main/java/org/jboss/modcluster/ModClusterService.java
Log:
Refactored ModClusterService to clean up the conflicting strategies of extending
DefaultJBossWebEventHandler and delegating to it.
I'll clean this implementation up later.
JBossWebEventHandler.config(Engine) and removeAll(Engine) are obsolete.
Modified:
trunk/mod_cluster/src/main/java/org/jboss/modcluster/DefaultJBossWebEventHandler.java
===================================================================
---
trunk/mod_cluster/src/main/java/org/jboss/modcluster/DefaultJBossWebEventHandler.java 2008-09-25
16:53:57 UTC (rev 1906)
+++
trunk/mod_cluster/src/main/java/org/jboss/modcluster/DefaultJBossWebEventHandler.java 2008-09-26
01:23:03 UTC (rev 1907)
@@ -191,7 +191,7 @@
}
}
- public void config(Engine engine)
+ protected void config(Engine engine)
{
log.debug(this.sm.getString("modcluster.engine.config",
engine.getName()));
@@ -264,7 +264,7 @@
}
}
- public void removeAll(Engine engine)
+ protected void removeAll(Engine engine)
{
this.checkInit();
Modified: trunk/mod_cluster/src/main/java/org/jboss/modcluster/JBossWebEventHandler.java
===================================================================
---
trunk/mod_cluster/src/main/java/org/jboss/modcluster/JBossWebEventHandler.java 2008-09-25
16:53:57 UTC (rev 1906)
+++
trunk/mod_cluster/src/main/java/org/jboss/modcluster/JBossWebEventHandler.java 2008-09-26
01:23:03 UTC (rev 1907)
@@ -43,12 +43,8 @@
void removeContext(Context context);
- void config(Engine engine);
-
- void removeAll(Engine engine);
-
void status(Engine engine);
-
+
void startServer(Server server);
void stopServer(Server server);
Modified: trunk/mod_cluster/src/main/java/org/jboss/modcluster/ModClusterService.java
===================================================================
--- trunk/mod_cluster/src/main/java/org/jboss/modcluster/ModClusterService.java 2008-09-25
16:53:57 UTC (rev 1906)
+++ trunk/mod_cluster/src/main/java/org/jboss/modcluster/ModClusterService.java 2008-09-26
01:23:03 UTC (rev 1907)
@@ -100,16 +100,16 @@
*/
final StringManager sm = StringManager.getManager(Constants.Package);
- private final LoadBalanceFactorProvider loadManager;
+ final LoadBalanceFactorProvider loadManager;
private final RpcHandler rpcHandler;
private final JBossWebEventHandler eventHandlerDelegate;
private final String domain;
private final boolean masterPerDomain;
volatile int latestLoad;
- private volatile int statusCount = 0;
- private volatile int processStatusFrequency = 1;
- private ModClusterServiceDRMEntry drmEntry;
+ volatile int statusCount = 0;
+ volatile int processStatusFrequency = 1;
+ ModClusterServiceDRMEntry drmEntry;
/**
* Create a new ClusterCoordinator.
@@ -149,10 +149,10 @@
this.resetRequestSource = new HASingletonAwareResetRequestSourceImpl(config,
config, this, this);
this.localHandler = new DefaultMCMPHandler(config, this.resetRequestSource);
-// this.localHandler.init();
this.clusteredHandler = new ClusteredMCMPHandlerImpl(this.localHandler, this,
this);
this.loadManager = loadFactorProvider;
- this.eventHandlerDelegate = new DefaultJBossWebEventHandler(config, config, config,
this.clusteredHandler, loadFactorProvider);
+ this.eventHandlerDelegate = new ClusteredJBossWebEventHandler(config,
loadFactorProvider);
+// this.eventHandlerDelegate = new DefaultJBossWebEventHandler(config, config,
config, this.clusteredHandler, loadFactorProvider);
this.domain = config.getDomain();
this.masterPerDomain = config.isMasterPerDomain();
@@ -259,100 +259,48 @@
public void init()
{
- // Use the standard logic
this.eventHandlerDelegate.init();
}
public void shutdown()
{
- // Use the standard logic
this.eventHandlerDelegate.shutdown();
}
-
public void startServer(Server server)
{
- // Pass on ref to our server
- this.resetRequestSource.setJbossWebServer(server);
-
- // Use the standard logic
this.eventHandlerDelegate.startServer(server);
}
public void stopServer(Server server)
{
- // Use the standard logic
this.eventHandlerDelegate.stopServer(server);
}
- public void config(Engine engine)
- {
- // If needed, create automagical JVM route (address + port + engineName)
- try
- {
- Utils.establishJvmRouteAndConnectorAddress(engine, this.clusteredHandler);
- }
- catch (Exception e) {
- this.clusteredHandler.markProxiesInError();
- this.log.info(this.sm.getString("modcluster.error.addressJvmRoute"),
e);
- return;
- }
-
- this.drmEntry.addJvmRoute(engine.getJvmRoute());
- this.updateLocalDRM(this.drmEntry);
-
- // Use the standard logic
- this.eventHandlerDelegate.config(engine);
- }
-
- public void removeAll(Engine engine)
- {
- // Use the standard logic
- this.eventHandlerDelegate.removeAll(engine);
-
- this.drmEntry.removeJvmRoute(engine.getJvmRoute());
- this.updateLocalDRM(this.drmEntry);
- }
-
public void addContext(Context context)
{
- // Use the standard logic
this.eventHandlerDelegate.addContext(context);
}
public void startContext(Context context)
{
- // Use the standard logic
this.eventHandlerDelegate.startContext(context);
}
public void stopContext(Context context)
{
- // Use the standard logic
this.eventHandlerDelegate.stopContext(context);
}
public void removeContext(Context context)
{
- // Use the standard logic
this.eventHandlerDelegate.removeContext(context);
}
public void status(Engine engine)
{
- this.latestLoad = this.loadManager.getLoadBalanceFactor();
-
- if (this.isMasterNode())
- {
- this.statusCount = (this.statusCount + 1) % this.processStatusFrequency;
-
- if (this.statusCount == 0)
- {
- this.updateClusterStatus();
- }
- }
+ this.eventHandlerDelegate.status(engine);
}
-
// ------------------------------------------------------------- Properties
@@ -370,7 +318,6 @@
{
this.processStatusFrequency = processStatusFrequency;
}
-
// ------------------------------------------------------- Public Overrides
@@ -485,115 +432,129 @@
}
@SuppressWarnings("unchecked")
- private void updateClusterStatus()
+ void updateClusterStatus()
{
- this.localHandler.status();
Set<MCMPServerState> masterList = null;
Map<ClusterNode, MCMPServerDiscoveryEvent> latestEvents = null;
- synchronized (this.proxyChangeDigest)
- {
- masterList = this.localHandler.getProxyStates();
- latestEvents = new HashMap<ClusterNode,
MCMPServerDiscoveryEvent>(this.proxyChangeDigest);
- }
- HAPartition partition = this.getHAPartition();
- List<ModClusterServiceDRMEntry> replicants =
partition.getDistributedReplicantManager().lookupReplicants(this.getHAServiceKey());
Map<ClusterNode, ModClusterServiceDRMEntry> nonresponsive = new
HashMap<ClusterNode, ModClusterServiceDRMEntry>();
- for (ModClusterServiceDRMEntry replicant: replicants)
- {
- nonresponsive.put(replicant.getPeer(), replicant);
- }
- nonresponsive.remove(partition.getClusterNode());
-
- // FIXME -- what about our own dropped discovery events if we just became master?
- List responses = this.getClusterCoordinatorState(masterList);
-
- // Gather up all the reset requests in one list
- // FIXME -- what about our own dropped requests if we just became master?
- List<MCMPRequest> resetRequests = new ArrayList<MCMPRequest>();
-
- // Gather all the load balance factors
Map<String, Integer> loadBalanceFactors = new HashMap<String,
Integer>();
-
- // Gather the info on who knows about what proxies
Map<ClusterNode, PeerMCMPDiscoveryStatus> statuses = new
HashMap<ClusterNode, PeerMCMPDiscoveryStatus>();
+ List<MCMPRequest> resetRequests = new ArrayList<MCMPRequest>();
+ HAPartition partition = this.getHAPartition();
+ DistributedReplicantManager drm = partition.getDistributedReplicantManager();
+ boolean resync = false;
- boolean resync = false;
- for (Object response: responses)
+ do
{
- if (response instanceof ModClusterServiceStateGroupRpcResponse)
+ this.localHandler.status();
+
+ synchronized (this.proxyChangeDigest)
{
- ModClusterServiceStateGroupRpcResponse mcssgrr =
(ModClusterServiceStateGroupRpcResponse) response;
- ClusterNode cn = mcssgrr.getSender();
-
- // Check for discovery events we haven't processed
- MCMPServerDiscoveryEvent latestEvent = latestEvents.get(cn);
- for (MCMPServerDiscoveryEvent toCheck: mcssgrr.getUnacknowledgedEvents())
+ masterList = this.localHandler.getProxyStates();
+ latestEvents = new HashMap<ClusterNode,
MCMPServerDiscoveryEvent>(this.proxyChangeDigest);
+ }
+
+ List<ModClusterServiceDRMEntry> replicants =
drm.lookupReplicants(this.getHAServiceKey());
+ nonresponsive.clear();
+
+ for (ModClusterServiceDRMEntry replicant: replicants)
+ {
+ nonresponsive.put(replicant.getPeer(), replicant);
+ }
+ nonresponsive.remove(partition.getClusterNode());
+
+ // FIXME -- what about our own dropped discovery events if we just became
master?
+ List responses = this.getClusterCoordinatorState(masterList);
+
+ // Gather up all the reset requests in one list
+ // FIXME -- what about our own dropped requests if we just became master?
+ resetRequests.clear();
+
+ // Gather all the load balance factors
+ loadBalanceFactors.clear();
+
+ // Add our own lbf - it is not returned via getclusterCoordinatorState(...)
+ for (String jvmRoute: this.drmEntry.getJvmRoutes())
+ {
+ loadBalanceFactors.put(jvmRoute, this.latestLoad);
+ }
+
+ // Gather the info on who knows about what proxies
+ statuses.clear();
+
+ for (Object response: responses)
+ {
+ if (response instanceof ModClusterServiceStateGroupRpcResponse)
{
- if (latestEvent != null && latestEvent.getEventIndex() <=
toCheck.getEventIndex())
+ ModClusterServiceStateGroupRpcResponse mcssgrr =
(ModClusterServiceStateGroupRpcResponse) response;
+ ClusterNode cn = mcssgrr.getSender();
+
+ // Check for discovery events we haven't processed
+ MCMPServerDiscoveryEvent latestEvent = latestEvents.get(cn);
+
+ for (MCMPServerDiscoveryEvent toCheck: mcssgrr.getUnacknowledgedEvents())
{
- continue; // already processed it
- }
-
- AddressPort ap = toCheck.getMCMPServer();
- if (toCheck.isAddition())
- {
- this.localHandler.addProxy(ap.getAddress(), ap.getPort());
- }
- else
- {
- this.localHandler.removeProxy(ap.getAddress(), ap.getPort());
- }
- resync = true;
- }
-
- if (!resync) // don't bother if we are going to start over
- {
- statuses.put(cn, new PeerMCMPDiscoveryStatus(cn, mcssgrr.getStates(),
latestEvent));
+ if (latestEvent != null && latestEvent.getEventIndex() <=
toCheck.getEventIndex())
+ {
+ continue; // already processed it
+ }
- List<MCMPRequest> toAdd = mcssgrr.getResetRequests();
- if (toAdd != null)
- {
- resetRequests.addAll(toAdd);
+ AddressPort ap = toCheck.getMCMPServer();
+ if (toCheck.isAddition())
+ {
+ this.localHandler.addProxy(ap.getAddress(), ap.getPort());
+ }
+ else
+ {
+ this.localHandler.removeProxy(ap.getAddress(), ap.getPort());
+ }
+ resync = true;
}
- ModClusterServiceDRMEntry removed = nonresponsive.remove(cn);
- if (removed != null)
+ if (!resync) // don't bother if we are going to start over
{
- Integer lbf = Integer.valueOf(mcssgrr.getLoadBalanceFactor());
- for (String jvmRoute: removed.getJvmRoutes())
+ statuses.put(cn, new PeerMCMPDiscoveryStatus(cn, mcssgrr.getStates(),
latestEvent));
+
+ List<MCMPRequest> toAdd = mcssgrr.getResetRequests();
+ if (toAdd != null)
{
- loadBalanceFactors.put(jvmRoute, lbf);
+ resetRequests.addAll(toAdd);
}
+
+ ModClusterServiceDRMEntry removed = nonresponsive.remove(cn);
+ if (removed != null)
+ {
+ Integer lbf = Integer.valueOf(mcssgrr.getLoadBalanceFactor());
+ for (String jvmRoute: removed.getJvmRoutes())
+ {
+ loadBalanceFactors.put(jvmRoute, lbf);
+ }
+ }
}
}
+ else if (response instanceof ThrowableGroupRpcResponse)
+ {
+ ThrowableGroupRpcResponse tgrr = (ThrowableGroupRpcResponse) response;
+ ClusterNode cn = tgrr.getSender();
+
+ this.log.warn(this.sm.getString("modcluster.error.rpc.known",
"getClusterCoordinatorState", cn), tgrr.getValue());
+
+ // Don't remove from nonresponsive list and we'll pass back an
error
+ // status (null server list) to this peer
+ }
+ else if (response instanceof Throwable)
+ {
+ this.log.warn(this.sm.getString("modcluster.error.rpc.unknown",
"getClusterCoordinatorState"), (Throwable) response);
+ }
+ else
+ {
+
this.log.error(this.sm.getString("modcluster.error.rpc.unexpected", response,
"getClusterCoordinatorState"));
+ }
}
- else if (response instanceof ThrowableGroupRpcResponse)
- {
- ThrowableGroupRpcResponse tgrr = (ThrowableGroupRpcResponse) response;
- ClusterNode cn = tgrr.getSender();
-
- this.log.warn(this.sm.getString("modcluster.error.rpc.known",
"getClusterCoordinatorState", cn), tgrr.getValue());
-
- // Don't remove from nonresponsive list and we'll pass back an error
- // status (null server list) to this peer
- }
- else if (response instanceof Throwable)
- {
- this.log.warn(this.sm.getString("modcluster.error.rpc.unknown",
"getClusterCoordinatorState"), (Throwable) response);
- }
- else
- {
- this.log.error(this.sm.getString("modcluster.error.rpc.unexpected",
response, "getClusterCoordinatorState"));
- }
}
+ // We picked up previously unknown discovery events; start over
+ while (resync);
- if (resync)
- {
- // We picked up previously unknown discovery events; start over
- this.updateClusterStatus();
- return;
- }
-
// Add error-state objects for non-responsive peers
Integer lbf = Integer.valueOf(0);
for (Map.Entry<ClusterNode, ModClusterServiceDRMEntry> entry:
nonresponsive.entrySet())
@@ -606,7 +567,7 @@
loadBalanceFactors.put(jvmRoute, lbf);
}
}
-
+ this.log.info(loadBalanceFactors);
// FIXME handle crashed members, gone from DRM
// Advise the proxies of any reset requests
@@ -689,7 +650,7 @@
}
}
- private void updateLocalDRM(ModClusterServiceDRMEntry ourNewStatus)
+ void updateLocalDRM(ModClusterServiceDRMEntry ourNewStatus)
{
try
{
@@ -902,4 +863,82 @@
return new ResetRequestGroupRpcResponse(node, requests);
}
}
+
+ private class ClusteredJBossWebEventHandler extends DefaultJBossWebEventHandler
+ {
+ /**
+ * Create a new ClusteredJBossWebEventHandler.
+ *
+ * @param nodeConfig
+ * @param balancerConfig
+ * @param mcmpHandlerConfig
+ * @param mcmpHandler
+ * @param loadBalanceFactorProvider
+ */
+ public ClusteredJBossWebEventHandler(ModClusterConfig config,
LoadBalanceFactorProvider loadBalanceFactorProvider)
+ {
+ // FIXME ClusteredJBossWebEventHandler constructor
+ super(config, config, config, ModClusterService.this.clusteredHandler,
loadBalanceFactorProvider);
+ }
+
+ @Override
+ public void startServer(Server server)
+ {
+ // Pass on ref to our server
+ ModClusterService.this.resetRequestSource.setJbossWebServer(server);
+
+ super.startServer(server);
+ }
+
+ @Override
+ protected void config(Engine engine)
+ {
+ // If needed, create automagical JVM route (address + port + engineName)
+ try
+ {
+ Utils.establishJvmRouteAndConnectorAddress(engine,
ModClusterService.this.clusteredHandler);
+ }
+ catch (Exception e) {
+ ModClusterService.this.clusteredHandler.markProxiesInError();
+
log.info(ModClusterService.this.sm.getString("modcluster.error.addressJvmRoute"),
e);
+ return;
+ }
+
+ ModClusterService.this.drmEntry.addJvmRoute(engine.getJvmRoute());
+ ModClusterService.this.updateLocalDRM(ModClusterService.this.drmEntry);
+
+ super.config(engine);
+ }
+
+ @Override
+ protected void removeAll(Engine engine)
+ {
+ super.removeAll(engine);
+
+ ModClusterService.this.drmEntry.removeJvmRoute(engine.getJvmRoute());
+ ModClusterService.this.updateLocalDRM(ModClusterService.this.drmEntry);
+ }
+
+ @Override
+ public void status(Engine engine)
+ {
+
log.info(ModClusterService.this.sm.getString("modcluster.engine.status",
engine.getName()));
+
+ ModClusterService.this.latestLoad =
ModClusterService.this.loadManager.getLoadBalanceFactor();
+
+ if (ModClusterService.this.isMasterNode())
+ {
+ ModClusterService.this.statusCount = (ModClusterService.this.statusCount + 1)
% ModClusterService.this.processStatusFrequency;
+
+ if (ModClusterService.this.statusCount == 0)
+ {
+ ModClusterService.this.updateClusterStatus();
+ }
+ }
+ else
+ {
+ log.info("We are not master");
+ }
+ }
+ }
}