[Jboss-cvs] JBossAS SVN: r56088 - trunk/cluster/src/main/org/jboss/ha/framework/server
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Aug 18 19:04:37 EDT 2006
Author: bstansberry at jboss.com
Date: 2006-08-18 19:04:37 -0400 (Fri, 18 Aug 2006)
New Revision: 56088
Modified:
trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
Log:
[JBAS-3532] Inject DS into ClusterPartition
[JBAS-2950] Don't start if state transfer fails
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java 2006-08-18 23:03:05 UTC (rev 56087)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java 2006-08-18 23:04:37 UTC (rev 56088)
@@ -119,7 +119,7 @@
/** The cluster replicant manager */
protected DistributedReplicantManagerImpl replicantManager;
/** The cluster state manager */
- protected DistributedStateImpl dsManager;
+ protected DistributedState dsManager;
/** The cluster instance log category */
protected Logger log;
protected Logger clusterLifeCycleLog;
@@ -130,6 +130,17 @@
/** Number of ms to wait for state */
protected long state_transfer_timeout=60000;
+ /**
+ * True if state was initialized during start-up.
+ */
+ protected boolean isStateSet = false;
+
+ /**
+ * An exception occuring upon fetch state.
+ */
+ protected Exception setStateException;
+ private final Object stateLock = new Object();
+
// Static --------------------------------------------------------
/**
@@ -213,11 +224,11 @@
this.replicantManager.init();
- // Create the DS and link it to this HAPartition
- log.debug("create distributed state service");
- this.dsManager = new DistributedStateImpl(this, this.server);
- log.debug("init distributed state service");
- this.dsManager.init();
+// // Create the DS and link it to this HAPartition
+// log.debug("create distributed state service");
+// this.dsManager = new DistributedStateImpl(this, this.server);
+// log.debug("init distributed state service");
+// this.dsManager.init();
// Create the asynchronous handler for view changes
asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
@@ -258,16 +269,11 @@
this.currentViewId = view.getVid().getId();
// We must now synchronize new state transfer subscriber
- log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds)");
- boolean rc = channel.getState(null, this.state_transfer_timeout);
- if (rc)
- log.debug("State was retrieved successfully");
- else
- log.debug("State could not be retrieved, (must be first member of group)");
+ //
+ fetchState();
// We are now able to start our DRM and DS
this.replicantManager.start();
- this.dsManager.start();
// Start the asynch listener handler thread
asynchHandler.start();
@@ -279,6 +285,68 @@
}
+
+ protected void fetchState() throws Exception
+ {
+ log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
+ long start, stop;
+ isStateSet = false;
+ start = System.currentTimeMillis();
+ boolean rc = channel.getState(null, this.state_transfer_timeout);
+ if (rc)
+ {
+ synchronized (stateLock)
+ {
+ while (!isStateSet)
+ {
+ if (setStateException != null)
+ throw setStateException;
+
+ try
+ {
+ stateLock.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ }
+ stop = System.currentTimeMillis();
+ log.debug("state was retrieved successfully (in " + (stop - start) + " milliseconds)");
+ }
+ else
+ {
+ // No one provided us with state.
+ // We need to find out if we are the coordinator, so we must
+ // block until viewAccepted() is called at least once
+
+ synchronized (members)
+ {
+ while (members.size() == 0)
+ {
+ log.debug("waiting on viewAccepted()");
+ try
+ {
+ members.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ }
+
+ if (isCurrentNodeCoordinator())
+ {
+ log.debug("State could not be retrieved (we are the first member in group)");
+ }
+ else
+ {
+ throw new IllegalStateException("Initial state transfer failed: " +
+ "Channel.getState() returned false");
+ }
+ }
+ }
+
public void closePartition() throws Exception
{
logHistory ("Closing partition");
@@ -304,14 +372,14 @@
log.error("operation failed", e);
}
- try
- {
- this.dsManager.stop();
- }
- catch (Exception e)
- {
- log.error("operation failed", e);
- }
+// try
+// {
+// this.dsManager.stop();
+// }
+// catch (Exception e)
+// {
+// log.error("operation failed", e);
+// }
// NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
// add the destroyPartition() step
@@ -354,14 +422,14 @@
log.error("operation failed", e);
}
- try
- {
- this.dsManager.destroy();
- }
- catch (Exception e)
- {
- log.error("operation failed", e);
- }
+// try
+// {
+// this.dsManager.destroy();
+// }
+// catch (Exception e)
+// {
+// log.error("operation failed", e);
+// }
try
@@ -436,7 +504,28 @@
HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
if (subscriber != null)
{
- subscriber.setCurrentState((java.io.Serializable)someState);
+ try
+ {
+ subscriber.setCurrentState((java.io.Serializable)someState);
+ }
+ catch (Exception e)
+ {
+ // Don't let issues with one subscriber affect others
+ // unless it is DRM, which is really an internal function
+ // of the HAPartition
+ // FIXME remove this once DRM is JBC-based
+ if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
+ {
+ if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new RuntimeException(e);
+ }
+ else
+ {
+ log.error("Caught exception setting state to " + subscriber, e);
+ }
+ }
}
else
{
@@ -448,11 +537,25 @@
log.debug("received a state of " + state_size + " bytes; expanded memory by " +
(used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
", used memory after: " + used_mem_after + ")");
+
+ isStateSet = true;
}
- catch (Exception ex)
+ catch (Throwable t)
{
- log.error("setState failed", ex);
+ log.error("failed setting state", t);
+ if (t instanceof Exception)
+ setStateException = (Exception) t;
+ else
+ setStateException = new Exception(t);
}
+ finally
+ {
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
}
public void receive(org.jgroups.Message msg)
@@ -1173,6 +1276,11 @@
return result;
}
+
+ protected void setDistributedState(DistributedState distState)
+ {
+ this.dsManager = distState;
+ }
public void logHistory (String message)
{
More information about the jboss-cvs-commits
mailing list