[Jboss-cvs] JBossAS SVN: r55359 - branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Aug 6 08:48:00 EDT 2006
Author: bstansberry at jboss.com
Date: 2006-08-06 08:47:53 -0400 (Sun, 06 Aug 2006)
New Revision: 55359
Modified:
branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
Log:
[JBAS-2950] HAPartition start should fail if state transfer does
Modified: branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2006-08-06 12:45:10 UTC (rev 55358)
+++ branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2006-08-06 12:47:53 UTC (rev 55359)
@@ -295,10 +295,9 @@
channel.setOpt(Channel.GET_STATE_EVENTS, new Boolean(true));
channel.setOpt(Channel.AUTO_RECONNECT, new Boolean(true));
channel.setOpt(Channel.AUTO_GETSTATE, new Boolean(true));
+
log.debug("Creating HAPartition");
- partition = new HAPartitionImpl(partitionName, channel, deadlock_detection, getServer());
- partition.setStateTransferTimeout(this.state_transfer_timeout);
- partition.setMethodCallTimeout(this.method_call_timeout);
+ partition = createPartition();
// JBAS-2769 Init partition in create
log.debug("Initing HAPartition: " + partition);
@@ -306,6 +305,18 @@
log.debug("HAPartition initialized");
}
+
+ /**
+ * Extension point meant for test cases; instantiates the HAPartitionImpl.
+ * Test cases can instantiate their own subclass of HAPartitionImpl.
+ */
+ protected HAPartitionImpl createPartition() throws Exception
+ {
+ HAPartitionImpl result = new HAPartitionImpl(partitionName, channel, deadlock_detection, getServer());
+ result.setStateTransferTimeout(this.state_transfer_timeout);
+ result.setMethodCallTimeout(this.method_call_timeout);
+ return result;
+ }
protected void startService()
throws Exception
Modified: branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
===================================================================
--- branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java 2006-08-06 12:45:10 UTC (rev 55358)
+++ branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java 2006-08-06 12:47:53 UTC (rev 55359)
@@ -130,6 +130,17 @@
/** Number of ms to wait for state */
protected long state_transfer_timeout=60000;
+ /**
+ * True if state was initialized during start-up.
+ */
+ protected boolean isStateSet = false;
+
+ /**
+ * An exception occuring upon fetch state.
+ */
+ protected Exception setStateException;
+ private final Object stateLock = new Object();
+
// Static --------------------------------------------------------
/**
@@ -265,12 +276,7 @@
// We must now synchronize new state transfer subscriber
//
- log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
- boolean rc = channel.getState(null, this.state_transfer_timeout);
- if (rc)
- log.debug("State was retrieved successfully");
- else
- log.debug("State could not be retrieved, (must be first member of group)");
+ fetchState();
// We start now able to start our DRM and DS
//
@@ -285,6 +291,68 @@
this.bind("/HAPartition/" + partitionName, this, HAPartitionImpl.class, ctx);
}
+
+ protected void fetchState() throws Exception
+ {
+ log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
+ long start, stop;
+ isStateSet = false;
+ start = System.currentTimeMillis();
+ boolean rc = channel.getState(null, this.state_transfer_timeout);
+ if (rc)
+ {
+ synchronized (stateLock)
+ {
+ while (!isStateSet)
+ {
+ if (setStateException != null)
+ throw setStateException;
+
+ try
+ {
+ stateLock.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ }
+ stop = System.currentTimeMillis();
+ log.debug("state was retrieved successfully (in " + (stop - start) + " milliseconds)");
+ }
+ else
+ {
+ // No one provided us with state.
+ // We need to find out if we are the coordinator, so we must
+ // block until viewAccepted() is called at least once
+
+ synchronized (members)
+ {
+ while (members.size() == 0)
+ {
+ log.debug("waiting on viewAccepted()");
+ try
+ {
+ members.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ }
+
+ if (isCurrentNodeCoordinator())
+ {
+ log.debug("State could not be retrieved (we are the first member in group)");
+ }
+ else
+ {
+ throw new IllegalStateException("Initial state transfer failed: " +
+ "Channel.getState() returned false");
+ }
+ }
+ }
+
public void closePartition() throws Exception
{
logHistory ("Closing partition");
@@ -441,7 +509,28 @@
HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
if (subscriber != null)
{
- subscriber.setCurrentState((java.io.Serializable)someState);
+ try
+ {
+ subscriber.setCurrentState((java.io.Serializable)someState);
+ }
+ catch (Exception e)
+ {
+ // Don't let issues with one subscriber affect others
+ // unless it is DRM or DS, which are really internal
+ // functions of the HAPartition
+ if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key)
+ || DistributedStateImpl.SERVICE_NAME.equals(key))
+ {
+ if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new RuntimeException(e);
+ }
+ else
+ {
+ log.error("Caught exception setting state to " + subscriber, e);
+ }
+ }
}
else
{
@@ -453,11 +542,25 @@
log.debug("received a state of " + state_size + " bytes; expanded memory by " +
(used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
", used memory after: " + used_mem_after + ")");
+
+ isStateSet = true;
}
- catch (Exception ex)
+ catch (Throwable t)
{
- log.error("setState failed", ex);
+ log.error("failed setting state", t);
+ if (t instanceof Exception)
+ setStateException = (Exception) t;
+ else
+ setStateException = new Exception(t);
}
+ finally
+ {
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
}
public void receive(org.jgroups.Message msg)
More information about the jboss-cvs-commits
mailing list