[Jboss-cvs] JBossAS SVN: r56088 - trunk/cluster/src/main/org/jboss/ha/framework/server

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Aug 18 19:04:37 EDT 2006


Author: bstansberry at jboss.com
Date: 2006-08-18 19:04:37 -0400 (Fri, 18 Aug 2006)
New Revision: 56088

Modified:
   trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
Log:
[JBAS-3532] Inject DS into ClusterPartition
[JBAS-2950] Don't start if state transfer fails

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java	2006-08-18 23:03:05 UTC (rev 56087)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java	2006-08-18 23:04:37 UTC (rev 56088)
@@ -119,7 +119,7 @@
    /** The cluster replicant manager */
    protected DistributedReplicantManagerImpl replicantManager;
    /** The cluster state manager */
-   protected DistributedStateImpl dsManager;
+   protected DistributedState dsManager;
    /** The cluster instance log category */
    protected Logger log;
    protected Logger clusterLifeCycleLog;   
@@ -130,6 +130,17 @@
    /** Number of ms to wait for state */
    protected long state_transfer_timeout=60000;
 
+   /**
+    * True if state was initialized during start-up.
+    */
+   protected boolean isStateSet = false;
+
+   /**
+    * An exception occuring upon fetch state.
+    */
+   protected Exception setStateException;
+   private final Object stateLock = new Object();
+
    // Static --------------------------------------------------------
    
    /**
@@ -213,11 +224,11 @@
       this.replicantManager.init();
 
       
-      // Create the DS and link it to this HAPartition
-      log.debug("create distributed state service");
-      this.dsManager = new DistributedStateImpl(this, this.server);
-      log.debug("init distributed state service");
-      this.dsManager.init();
+//      // Create the DS and link it to this HAPartition
+//      log.debug("create distributed state service");
+//      this.dsManager = new DistributedStateImpl(this, this.server);
+//      log.debug("init distributed state service");
+//      this.dsManager.init();
 
       // Create the asynchronous handler for view changes
       asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
@@ -258,16 +269,11 @@
       this.currentViewId = view.getVid().getId();
 
       // We must now synchronize new state transfer subscriber
-      log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds)");
-      boolean rc = channel.getState(null, this.state_transfer_timeout);
-      if (rc)
-         log.debug("State was retrieved successfully");
-      else
-         log.debug("State could not be retrieved, (must be first member of group)");
+      //
+      fetchState();
       
       // We are now able to start our DRM and DS
       this.replicantManager.start();
-      this.dsManager.start();
 
       // Start the asynch listener handler thread
       asynchHandler.start();
@@ -279,6 +285,68 @@
       
    }
 
+
+   protected void fetchState() throws Exception
+   {
+      log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
+      long start, stop;
+      isStateSet = false;
+      start = System.currentTimeMillis();
+      boolean rc = channel.getState(null, this.state_transfer_timeout);
+      if (rc)
+      {
+         synchronized (stateLock)
+         {
+            while (!isStateSet)
+            {
+               if (setStateException != null)
+                  throw setStateException;
+
+               try
+               {
+                  stateLock.wait();
+               }
+               catch (InterruptedException iex)
+               {
+               }
+            }
+         }
+         stop = System.currentTimeMillis();
+         log.debug("state was retrieved successfully (in " + (stop - start) + " milliseconds)");
+      }
+      else
+      {
+         // No one provided us with state.
+         // We need to find out if we are the coordinator, so we must
+         // block until viewAccepted() is called at least once
+
+         synchronized (members)
+         {
+            while (members.size() == 0)
+            {
+               log.debug("waiting on viewAccepted()");
+               try
+               {
+                  members.wait();
+               }
+               catch (InterruptedException iex)
+               {
+               }
+            }
+         }
+
+         if (isCurrentNodeCoordinator())
+         {
+            log.debug("State could not be retrieved (we are the first member in group)");
+         }
+         else
+         {
+            throw new IllegalStateException("Initial state transfer failed: " +
+               "Channel.getState() returned false");
+         }
+      }
+   }
+
    public void closePartition() throws Exception
    {
       logHistory ("Closing partition");
@@ -304,14 +372,14 @@
          log.error("operation failed", e);
       }
 
-      try
-      {
-         this.dsManager.stop();
-      }
-      catch (Exception e)
-      {
-         log.error("operation failed", e);
-      }
+//      try
+//      {
+//         this.dsManager.stop();
+//      }
+//      catch (Exception e)
+//      {
+//         log.error("operation failed", e);
+//      }
 
 //    NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
 //    add the destroyPartition() step
@@ -354,14 +422,14 @@
          log.error("operation failed", e);
       }
 
-      try
-      {
-         this.dsManager.destroy();
-      }
-      catch (Exception e)
-      {
-         log.error("operation failed", e);
-      }
+//      try
+//      {
+//         this.dsManager.destroy();
+//      }
+//      catch (Exception e)
+//      {
+//         log.error("operation failed", e);
+//      }
       
       
       try
@@ -436,7 +504,28 @@
             HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
             if (subscriber != null)
             {
-               subscriber.setCurrentState((java.io.Serializable)someState);
+               try
+               {
+                  subscriber.setCurrentState((java.io.Serializable)someState);
+               }
+               catch (Exception e)
+               {
+                  // Don't let issues with one subscriber affect others
+                  // unless it is DRM, which is really an internal function
+                  // of the HAPartition
+                  // FIXME remove this once DRM is JBC-based
+                  if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
+                  {
+                     if (e instanceof RuntimeException)
+                        throw (RuntimeException) e;
+                     else
+                        throw new RuntimeException(e);
+                  }
+                  else
+                  {
+                     log.error("Caught exception setting state to " + subscriber, e);
+                  }
+               }
             }
             else
             {
@@ -448,11 +537,25 @@
          log.debug("received a state of " + state_size + " bytes; expanded memory by " +
                (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
                ", used memory after: " + used_mem_after + ")");
+         
+         isStateSet = true;
       }
-      catch (Exception ex)
+      catch (Throwable t)
       {
-         log.error("setState failed", ex);
+         log.error("failed setting state", t);
+         if (t instanceof Exception)
+            setStateException = (Exception) t;
+         else
+            setStateException = new Exception(t);
       }
+      finally
+      {
+         synchronized (stateLock)
+         {
+            // Notify wait that state has been set.
+            stateLock.notifyAll();
+         }
+      }
    }
    
    public void receive(org.jgroups.Message msg)
@@ -1173,6 +1276,11 @@
 
       return result;
    }
+   
+   protected void setDistributedState(DistributedState distState)
+   {
+      this.dsManager = distState;
+   }
 
    public void logHistory (String message)
    {




More information about the jboss-cvs-commits mailing list