[Jboss-cvs] JBossAS SVN: r55359 - branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Aug 6 08:48:00 EDT 2006


Author: bstansberry at jboss.com
Date: 2006-08-06 08:47:53 -0400 (Sun, 06 Aug 2006)
New Revision: 55359

Modified:
   branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
   branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
Log:
[JBAS-2950] HAPartition start should fail if state transfer does

Modified: branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2006-08-06 12:45:10 UTC (rev 55358)
+++ branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2006-08-06 12:47:53 UTC (rev 55359)
@@ -295,10 +295,9 @@
       channel.setOpt(Channel.GET_STATE_EVENTS, new Boolean(true));
       channel.setOpt(Channel.AUTO_RECONNECT, new Boolean(true));
       channel.setOpt(Channel.AUTO_GETSTATE, new Boolean(true));
+      
       log.debug("Creating HAPartition");
-      partition = new HAPartitionImpl(partitionName, channel, deadlock_detection, getServer());
-      partition.setStateTransferTimeout(this.state_transfer_timeout);
-      partition.setMethodCallTimeout(this.method_call_timeout);
+      partition = createPartition();
       
       // JBAS-2769 Init partition in create
       log.debug("Initing HAPartition: " + partition);
@@ -306,6 +305,18 @@
       log.debug("HAPartition initialized");
       
    }
+   
+   /**
+    * Extension point meant for test cases; instantiates the HAPartitionImpl.
+    * Test cases can instantiate their own subclass of HAPartitionImpl.
+    */
+   protected HAPartitionImpl createPartition() throws Exception
+   {
+      HAPartitionImpl result = new HAPartitionImpl(partitionName, channel, deadlock_detection, getServer());
+      result.setStateTransferTimeout(this.state_transfer_timeout);
+      result.setMethodCallTimeout(this.method_call_timeout);
+      return result;
+   }
 
    protected void startService() 
       throws Exception

Modified: branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
===================================================================
--- branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java	2006-08-06 12:45:10 UTC (rev 55358)
+++ branches/Branch_4_0/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java	2006-08-06 12:47:53 UTC (rev 55359)
@@ -130,6 +130,17 @@
    /** Number of ms to wait for state */
    protected long state_transfer_timeout=60000;
 
+   /**
+    * True if state was initialized during start-up.
+    */
+   protected boolean isStateSet = false;
+
+   /**
+    * An exception occuring upon fetch state.
+    */
+   protected Exception setStateException;
+   private final Object stateLock = new Object();
+
    // Static --------------------------------------------------------
    
    /**
@@ -265,12 +276,7 @@
 
       // We must now synchronize new state transfer subscriber
       //
-      log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
-      boolean rc = channel.getState(null, this.state_transfer_timeout);
-      if (rc)
-         log.debug("State was retrieved successfully");
-      else
-         log.debug("State could not be retrieved, (must be first member of group)");
+      fetchState();
       
       // We start now able to start our DRM and DS
       //
@@ -285,6 +291,68 @@
       this.bind("/HAPartition/" + partitionName, this, HAPartitionImpl.class, ctx);
    }
 
+
+   protected void fetchState() throws Exception
+   {
+      log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
+      long start, stop;
+      isStateSet = false;
+      start = System.currentTimeMillis();
+      boolean rc = channel.getState(null, this.state_transfer_timeout);
+      if (rc)
+      {
+         synchronized (stateLock)
+         {
+            while (!isStateSet)
+            {
+               if (setStateException != null)
+                  throw setStateException;
+
+               try
+               {
+                  stateLock.wait();
+               }
+               catch (InterruptedException iex)
+               {
+               }
+            }
+         }
+         stop = System.currentTimeMillis();
+         log.debug("state was retrieved successfully (in " + (stop - start) + " milliseconds)");
+      }
+      else
+      {
+         // No one provided us with state.
+         // We need to find out if we are the coordinator, so we must
+         // block until viewAccepted() is called at least once
+
+         synchronized (members)
+         {
+            while (members.size() == 0)
+            {
+               log.debug("waiting on viewAccepted()");
+               try
+               {
+                  members.wait();
+               }
+               catch (InterruptedException iex)
+               {
+               }
+            }
+         }
+
+         if (isCurrentNodeCoordinator())
+         {
+            log.debug("State could not be retrieved (we are the first member in group)");
+         }
+         else
+         {
+            throw new IllegalStateException("Initial state transfer failed: " +
+               "Channel.getState() returned false");
+         }
+      }
+   }
+
    public void closePartition() throws Exception
    {
       logHistory ("Closing partition");
@@ -441,7 +509,28 @@
             HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
             if (subscriber != null)
             {
-               subscriber.setCurrentState((java.io.Serializable)someState);
+               try
+               {
+                  subscriber.setCurrentState((java.io.Serializable)someState);
+               }
+               catch (Exception e)
+               {
+                  // Don't let issues with one subscriber affect others
+                  // unless it is DRM or DS, which are really internal 
+                  // functions of the HAPartition
+                  if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key) 
+                        || DistributedStateImpl.SERVICE_NAME.equals(key))
+                  {
+                     if (e instanceof RuntimeException)
+                        throw (RuntimeException) e;
+                     else
+                        throw new RuntimeException(e);
+                  }
+                  else
+                  {
+                     log.error("Caught exception setting state to " + subscriber, e);
+                  }
+               }
             }
             else
             {
@@ -453,11 +542,25 @@
          log.debug("received a state of " + state_size + " bytes; expanded memory by " +
                (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
                ", used memory after: " + used_mem_after + ")");
+         
+         isStateSet = true;
       }
-      catch (Exception ex)
+      catch (Throwable t)
       {
-         log.error("setState failed", ex);
+         log.error("failed setting state", t);
+         if (t instanceof Exception)
+            setStateException = (Exception) t;
+         else
+            setStateException = new Exception(t);
       }
+      finally
+      {
+         synchronized (stateLock)
+         {
+            // Notify wait that state has been set.
+            stateLock.notifyAll();
+         }
+      }
    }
    
    public void receive(org.jgroups.Message msg)




More information about the jboss-cvs-commits mailing list